Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
test_platform
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Wallen姚文辉
test_platform
Commits
834651de
Commit
834651de
authored
Apr 01, 2025
by
Wallen姚文辉
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
新增邮件回复功能
parent
ffa65ccf
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
78 additions
and
7 deletions
+78
-7
Tool.py
controller/Tool.py
+78
-7
No files found.
controller/Tool.py
View file @
834651de
import
email.parser
import
re
import
time
import
uuid
...
...
@@ -17,6 +18,10 @@ from flask_socketio import emit,send
from
sqlalchemy.sql
import
alias
from
sqlalchemy
import
func
import
yagmail
import
email
from
imapclient
import
IMAPClient
import
datetime
from
base64
import
b64decode
db
=
server
.
db
tool
=
Blueprint
(
"tools"
,
__name__
,
url_prefix
=
'/tool'
)
...
...
@@ -35,7 +40,26 @@ def getheader(id_):
}
return
headers
def
decode_mime
(
msg
):
#此处封装的目的是在循环处理MIME数据块时重复利用代码(找不到这段代码的出处了,日后找到再补上出处。)
res
=
''
if
msg
.
is_multipart
():
#如果 MIME数据的格式是multi-part
parts
=
msg
.
get_payload
()
for
part
in
parts
:
#则遍历挨个解析
res
+=
decode_mime
(
part
)
#拆分出的单个数据块再次传入自定义函数decode_mime(相当于创建了一个新的实例,与当前的循环不冲突)
return
res
else
:
#处理每一个MIME数据块
str_content_type
=
msg
.
get_content_type
()
#当前数据块的Content-Type
#print('str_content_type=%s'%str_content_type)
str_charset
=
msg
.
get_content_charset
(
failobj
=
None
)
#当前数据块的编码信息
#print('str_charset=',str_charset)
if
str_content_type
in
(
'text/plain'
,
'text/html'
):
#这里只处理纯文本、html两类数据,关于附件和其他类型数据的处理方法,请继续询问度娘
bytes_content
=
msg
.
get_payload
(
decode
=
True
)
#读取当前数据块中的正文内容,返回bytes
return
res
+
bytes_content
.
decode
(
str_charset
)
#解码
else
:
return
''
@
tool
.
route
(
'/testport/sendport'
,
methods
=
[
"POST"
])
def
sendport
():
data
=
request
.
json
...
...
@@ -85,14 +109,59 @@ def iteration(view):
else
:
return
jsonify
({
"code"
:
402
,
"message"
:
"未找到相关项目,请确认jira信息是否正确"
}),
402
@
tool
.
route
(
'/testport/getMail'
,
methods
=
[
"GET"
])
def
getMail
():
date_
=
request
.
args
.
get
(
'date'
)
imap
=
IMAPClient
(
'imap.exmail.qq.com'
)
myinfo
=
db
.
session
.
query
(
User
)
.
filter_by
(
id
=
session
.
get
(
"id"
))
.
first
()
db
.
session
.
close
()
# print(my)
try
:
imap
.
login
(
myinfo
.
email_address
,
myinfo
.
email_password
)
except
:
return
jsonify
({
"code"
:
501
,
"message"
:
"请打开当前账户绑定邮箱的imap功能"
}),
501
my_date
=
datetime
.
datetime
.
date
(
datetime
.
datetime
.
strptime
(
date_
,
"
%
Y-
%
m-
%
d"
))
my_date_1
=
datetime
.
datetime
.
date
(
datetime
.
datetime
.
strptime
(
date_
,
"
%
Y-
%
m-
%
d"
))
+
datetime
.
timedelta
(
days
=
1
)
imap
.
select_folder
(
"INBOX"
)
messages_ids
=
imap
.
search
([
u'SINCE'
,
my_date
,
u'BEFORE'
,
my_date_1
])
result
=
[]
for
i
in
messages_ids
:
info
=
imap
.
fetch
(
i
,[
'ENVELOPE'
])
sub
=
str
(
b64decode
(
info
[
i
][
b
'ENVELOPE'
]
.
subject
.
decode
(
'utf-8'
)
.
replace
(
'=?GBK?B?'
,
''
)
.
replace
(
'?='
,
''
)),
'GBK'
)
from_
=
str
(
b64decode
(
info
[
i
][
b
'ENVELOPE'
]
.
from_
[
0
]
.
name
.
decode
(
'utf-8'
)
.
replace
(
'=?GBK?B?'
,
''
)
.
replace
(
'?='
,
''
)),
'GBK'
)
date_
=
info
[
i
][
b
'ENVELOPE'
]
.
date
date_
=
datetime
.
datetime
.
strftime
(
date_
,
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
)
result
.
append
({
"id"
:
i
,
"sub"
:
sub
,
"from"
:
from_
,
"receive_time"
:
date_
})
imap
.
logout
()
return
jsonify
({
"code"
:
200
,
"data"
:
result
}),
200
@
tool
.
route
(
'/testport/getMailDeatil/<id_>'
,
methods
=
[
"GET"
])
def
getMailDeatil
(
id_
):
imap
=
IMAPClient
(
'imap.exmail.qq.com'
)
myinfo
=
db
.
session
.
query
(
User
)
.
filter_by
(
id
=
session
.
get
(
"id"
))
.
first
()
db
.
session
.
close
()
try
:
imap
.
login
(
myinfo
.
email_address
,
myinfo
.
email_password
)
except
:
return
jsonify
({
"code"
:
501
,
"message"
:
"请打开当前账户绑定邮箱的imap功能"
}),
501
imap
.
select_folder
(
"INBOX"
)
print
(
id_
)
info
=
email
.
parser
.
Parser
()
.
parsestr
(
imap
.
fetch
(
int
(
id_
),
'BODY[]'
)[
int
(
id_
)][
b
'BODY[]'
]
.
decode
(
'utf-8'
))
result
=
''
for
i
in
info
.
walk
():
# print(i["content-type"])
if
(
'text/html'
in
i
[
"content-type"
]):
result
=
i
.
get_payload
(
decode
=
True
)
.
decode
(
i
.
get_content_charset
())
if
(
'text/plain'
in
i
[
"content-type"
])
and
not
result
:
result
=
i
.
get_payload
(
decode
=
True
)
.
decode
(
i
.
get_content_charset
())
imap
.
logout
()
return
jsonify
({
"code"
:
200
,
"data"
:
result
}),
200
@
tool
.
route
(
'/testport/reportinfo'
,
methods
=
[
"GET"
])
def
reportinfo
():
# project=request.args.get("project")
iteration
=
request
.
args
.
get
(
"iteration"
)
id_
=
request
.
args
.
get
(
"view"
)
print
headers
=
getheader
(
session
.
get
(
"id"
))
# id_=requests.request("get",jiraAddress+"/rest/agile/1.0/board?projectKeyOrId="+project,headers=headers).json().get("values")[0].get("id")
result
=
requests
.
request
(
"get"
,
jiraAddress
+
"/rest/greenhopper/1.0/xboard/plan/backlog/data.json?rapidViewId="
+
str
(
id_
),
headers
=
headers
)
.
json
()
...
...
@@ -296,7 +365,7 @@ def runstart(app,file_name,project,view,upid,user_id):
requests
.
request
(
"post"
,
jiraAddress
+
"/secure/CreateTestSuite.jspa?projectId="
+
str
(
projectId
),
data
=
{
"inline"
:
True
,
"decorator"
:
"dialog"
,
"testSuiteName"
:
suiteName
},
headers
=
headers
)
text
=
requests
.
request
(
"get"
,
jiraAddress
+
"/secure/ShowLinkTestSuiteInPanel.jspa"
,
params
=
params
,
headers
=
headers
)
.
content
soup
=
BeautifulSoup
(
text
,
"html.parser"
)
suiteId
=
soup
.
find
(
"strong"
,
string
=
suiteName
)
.
parent
.
find
(
"input"
)
.
attrs
.
get
(
"value"
)
suiteId
=
soup
.
find
(
"strong"
,
string
=
suiteName
.
strip
()
)
.
parent
.
find
(
"input"
)
.
attrs
.
get
(
"value"
)
if
type
==
"name"
:
string
=
each
.
get
(
"title"
)
if
"tc:"
in
each
.
get
(
"title"
)
or
"tc:"
in
each
.
get
(
"title"
)
or
"tc:"
in
each
.
get
(
"title"
):
...
...
@@ -319,9 +388,10 @@ def runstart(app,file_name,project,view,upid,user_id):
"name"
:
"测试用例"
},
}
},
headers
=
headers
)
.
json
()[
"id"
]
requests
.
request
(
"post"
,
jiraAddress
+
"/rest/synapse/1.0/testStep/addTestStep"
,
json
=
{
"tcId"
:
testId
,
"step"
:
oldinfo
.
get
(
"step"
),
"expectedResult"
:
oldinfo
.
get
(
"expect"
),
"sequenceNumber"
:
None
},
headers
=
headers
)
requests
.
request
(
"post"
,
jiraAddress
+
"/rest/synapse/latest/testSuite/addTestCaseMemberToTestSuites"
,
json
=
{
"testSuiteIds"
:[
suiteId
],
"testCaseId"
:
testId
},
headers
=
headers
)
},
headers
=
headers
)
.
json
()
.
get
(
"id"
)
if
testId
:
requests
.
request
(
"post"
,
jiraAddress
+
"/rest/synapse/1.0/testStep/addTestStep"
,
json
=
{
"tcId"
:
testId
,
"step"
:
oldinfo
.
get
(
"step"
),
"expectedResult"
:
oldinfo
.
get
(
"expect"
),
"sequenceNumber"
:
None
},
headers
=
headers
)
requests
.
request
(
"post"
,
jiraAddress
+
"/rest/synapse/latest/testSuite/addTestCaseMemberToTestSuites"
,
json
=
{
"testSuiteIds"
:[
suiteId
],
"testCaseId"
:
testId
},
headers
=
headers
)
sums
=
sums
+
1
return
sums
...
...
@@ -347,6 +417,7 @@ def runstart(app,file_name,project,view,upid,user_id):
db
.
session
.
commit
()
server
.
socketio
.
emit
(
"infoupdate"
,
room
=
a
.
id
)
content
=
xmindparser
.
xmind_to_dict
(
par_path
+
"/uploadfile/"
+
file_name
)
sums
=
0
try
:
sums
=
up
(
None
,
None
,
content
,{},
"name"
)
requests
.
request
(
"delete"
,
jiraAddress
+
"/rest/api/2/issue/"
+
globalcaseId
,
headers
=
headers
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment