Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
T
test_platform
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Wallen姚文辉
test_platform
Commits
7042130b
Commit
7042130b
authored
Jun 26, 2024
by
Wallen姚文辉
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
暂存加审核
parent
45cbba8f
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
130 additions
and
34 deletions
+130
-34
Tool.py
controller/Tool.py
+130
-34
No files found.
controller/Tool.py
View file @
7042130b
...
...
@@ -4,7 +4,7 @@ import time
import
uuid
from
flask
import
Blueprint
,
request
,
jsonify
,
session
,
current_app
from
manager.serverCenter
import
server
from
model.Model
import
User
,
Emails
,
Uploadecase
from
model.Model
import
User
,
Emails
,
Uploadecase
,
Testreport
,
Audituser
import
base64
import
requests
from
bs4
import
BeautifulSoup
...
...
@@ -14,6 +14,8 @@ from copy import deepcopy
from
manager.tools
import
par_path
from
threading
import
Thread
from
flask_socketio
import
emit
,
send
from
sqlalchemy.sql
import
alias
from
sqlalchemy
import
func
db
=
server
.
db
tool
=
Blueprint
(
"tools"
,
__name__
,
url_prefix
=
'/tool'
)
...
...
@@ -134,7 +136,7 @@ def reportinfo():
continue
elif
"【前端】"
in
m
.
get
(
"summary"
):
peoples
[
"qian"
]
.
append
(
k
)
elif
"【后端】"
in
m
.
get
(
"summary"
):
elif
"【后端】"
in
m
.
get
(
"summary"
)
or
"【算法】"
in
m
.
get
(
"summary"
)
:
peoples
[
"hou"
]
.
append
(
k
)
elif
(
"测试"
in
m
.
get
(
"summary"
)
or
"用例"
in
m
.
get
(
"summary"
)):
peoples
[
"test"
]
.
append
(
k
)
...
...
@@ -428,36 +430,6 @@ def test():
a
=
server
.
auto_jenkins
.
get_job_info
(
'app自动化测试'
,
8
)
return
jsonify
({
"code"
:
200
,
"message"
:
"请求成功"
,
"data"
:
a
}),
200
# @tool.route('/testport/getapitestinfo', methods=["POST"])
# def getapitestinfo():
# info=request.json.get("data")
# try:
# runinfo={"运行列表":[]}
# soup=BeautifulSoup(info,"html.parser")
# a=soup.find(class_="container")
# path=lambda x:f'div[class="card"]>div[class="card-body"]>div[class="row"]>div:-soup-contains("{x}")+div'
# onlyinfolist=["测试场景","运行时间","总耗时","总返回数据","接口请求耗时","平均接口请求耗时","通过率","失败率","未测率"]
# twoinfolist=["循环数","HTTP 接口请求数","断言数"]
# [runinfo.update({i:soup.select(path(i))[0].text}) for i in onlyinfolist]
# [runinfo.update({i:[soup.select(path(i))[0].text,soup.select(path(i)+'+div')[0].text]}) for i in twoinfolist]
# for each in a.select("div[class='']>div[class='collapse']>div[class='']"):
# this={"step":[]}
# k=each.find_all('div',recursive=False)
# this["name"]=k[0].text.strip()
# for item in k[1].find_all('div',recursive=False):
# stepinfo={"断言详情":[]}
# stepinfo["name"]=item.find('h5').text.strip()
# onlyinfolist=["Method","URL","耗时","返回数据","HTTP 状态码","断言通过数","断言失败数"]
# path=lambda x:f'div[class="card-body"]>div>div[class="row"]>div:-soup-contains("{x}")+div'
# [stepinfo.update({i:item.select(path(i))[0].text}) for i in onlyinfolist]
# stepinfo["断言详情"].append(list(map(lambda x:x.text.strip(),item.select('div[class="card-body"]>div>div[class="row"]>div:-soup-contains("断言详情")+div>table>tbody>tr>td'))))
# this["step"].append(stepinfo)
# runinfo["运行列表"].append(this)
# return jsonify({"code": 200, "message": "请求成功","data":runinfo}),200
# except:
# return jsonify({"code": 403, "message": "文件内容有误,请确认"}),403
def
updateapitoken
():
user
,
password
=
'wallen.ywh@galaxyoversea.com'
,
'ywh940509'
global
apifox_token
...
...
@@ -486,7 +458,131 @@ def getapitestinfo():
return
jsonify
({
"code"
:
503
,
"message"
:
"读取项目权限不够,请联系开发配置访问账号权限"
}),
503
@
tool
.
route
(
'/testport/staging'
,
methods
=
[
"POST"
])
def
staging
():
info
=
request
.
json
.
get
(
"report_data"
)
if
not
info
:
return
jsonify
({
"code"
:
401
,
"message"
:
"缺少存储数据"
}),
401
create_time
=
time
.
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
,
time
.
localtime
(
time
.
time
()))
if
db
.
session
.
query
(
Testreport
)
.
filter
(
Testreport
.
create_user
==
session
.
get
(
"id"
),
Testreport
.
report_data
==
info
)
.
count
()
>
0
:
db
.
session
.
close
()
return
jsonify
({
"code"
:
503
,
"message"
:
"已存储相同数据"
}),
503
insert
=
Testreport
(
create_time
=
create_time
,
update_time
=
create_time
,
create_user
=
session
.
get
(
"id"
),
project
=
request
.
json
.
get
(
"project"
),
story
=
request
.
json
.
get
(
"sprint"
),
save_remark
=
request
.
json
.
get
(
"save_remark"
),
type
=
request
.
json
.
get
(
"type"
),
report_status
=
request
.
json
.
get
(
"report_status"
),
report_data
=
request
.
json
.
get
(
"report_data"
))
db
.
session
.
add
(
insert
)
db
.
session
.
commit
()
db
.
session
.
close
()
return
jsonify
({
"code"
:
200
,
"message"
:
"保存成功"
}),
200
@
tool
.
route
(
'/testport/updatereport'
,
methods
=
[
"POST"
])
def
updatereport
():
if
not
request
.
json
.
get
(
"report_data"
):
return
jsonify
({
"code"
:
401
,
"message"
:
"缺少存储数据"
}),
401
updateinfo
=
request
.
json
updateinfo
.
update
({
"update_time"
:
time
.
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S'
,
time
.
localtime
(
time
.
time
()))})
updateinfo
[
"story"
]
=
updateinfo
[
"sprint"
]
del
updateinfo
[
"sprint"
]
db
.
session
.
query
(
Testreport
)
.
filter_by
(
id
=
request
.
json
.
get
(
"id"
))
.
update
(
updateinfo
)
db
.
session
.
commit
()
db
.
session
.
close
()
return
jsonify
({
"code"
:
200
,
"message"
:
"更新成功"
}),
200
@
tool
.
route
(
'/testport/reportlist'
,
methods
=
[
"GET"
])
def
reportlist
():
data
=
request
.
args
page_size
=
int
(
data
.
get
(
"page_size"
))
page_num
=
int
(
data
.
get
(
"page_num"
))
res
=
db
.
session
.
query
(
Testreport
.
id
,
Testreport
.
project
,
Testreport
.
story
,
Testreport
.
save_remark
,
func
.
date_format
(
Testreport
.
update_time
,
"
%
Y-
%
m-
%
d
%
H:
%
i:
%
s"
)
.
label
(
'update_time'
),
Testreport
.
report_status
)
.
filter
(
Testreport
.
create_user
==
session
.
get
(
"id"
),
Testreport
.
type
==
0
)
if
data
.
get
(
"project"
):
res
=
res
.
filter
(
Testreport
.
project
.
like
(
"
%
"
+
data
.
get
(
"project"
)
+
"
%
"
))
if
data
.
get
(
"story"
):
res
=
res
.
filter
(
Testreport
.
project
.
like
(
"
%
"
+
data
.
get
(
"story"
)
+
"
%
"
))
if
data
.
get
(
"remark"
):
res
=
res
.
filter
(
Testreport
.
project
.
like
(
"
%
"
+
data
.
get
(
"remark"
)
+
"
%
"
))
res
=
res
.
order_by
(
Testreport
.
report_status
.
asc
(),
Testreport
.
update_time
.
desc
())
result
=
res
.
paginate
(
page
=
page_num
,
per_page
=
page_size
)
total_sum
=
result
.
total
res
=
result
.
items
db
.
session
.
close
()
return
jsonify
({
"code"
:
200
,
"message"
:
"请求成功"
,
"data"
:
{
"rows"
:
sqlOrmToJson
(
res
),
"total"
:
total_sum
}})
@
tool
.
route
(
'/testport/reportauditlist'
,
methods
=
[
"GET"
])
def
reportauditlist
():
data
=
request
.
args
page_size
=
int
(
data
.
get
(
"page_size"
))
page_num
=
int
(
data
.
get
(
"page_num"
))
users
=
alias
(
User
,
'users'
)
auditors
=
alias
(
User
,
'auditors'
)
if
(
db
.
session
.
query
(
Audituser
)
.
filter_by
(
user_id
=
session
.
get
(
"id"
))
.
count
()
==
1
):
res
=
db
.
session
.
query
(
Testreport
.
id
,
Testreport
.
project
,
Testreport
.
story
,
Testreport
.
audit_remark
,
func
.
date_format
(
Testreport
.
create_time
,
"
%
Y-
%
m-
%
d
%
H:
%
i:
%
s"
)
.
label
(
'create_time'
),
Testreport
.
create_user
,
users
.
c
.
name
.
label
(
'create_user_name'
),
func
.
date_format
(
Testreport
.
audit_time
,
"
%
Y-
%
m-
%
d
%
H:
%
i:
%
s"
)
.
label
(
'audit_time'
),
Testreport
.
audit_user
,
auditors
.
c
.
name
.
label
(
'audit_user_name'
),
Testreport
.
audit_status
,
Testreport
.
report_status
)
.
join
(
users
,
users
.
c
.
id
==
Testreport
.
create_user
,
isouter
=
True
)
.
join
(
auditors
,
auditors
.
c
.
id
==
Testreport
.
audit_user
,
isouter
=
True
)
.
filter
(
Testreport
.
type
==
1
)
else
:
res
=
db
.
session
.
query
(
Testreport
.
id
,
Testreport
.
project
,
Testreport
.
story
,
Testreport
.
audit_remark
,
func
.
date_format
(
Testreport
.
create_time
,
"
%
Y-
%
m-
%
d
%
H:
%
i:
%
s"
)
.
label
(
'create_time'
),
Testreport
.
create_user
,
users
.
c
.
name
.
label
(
'create_user_name'
),
func
.
date_format
(
Testreport
.
audit_time
,
"
%
Y-
%
m-
%
d
%
H:
%
i:
%
s"
)
.
label
(
'audit_time'
),
Testreport
.
audit_user
,
auditors
.
c
.
name
.
label
(
'audit_user_name'
),
Testreport
.
audit_status
,
Testreport
.
report_status
)
.
join
(
users
,
users
.
c
.
id
==
Testreport
.
create_user
,
isouter
=
True
)
.
join
(
auditors
,
auditors
.
c
.
id
==
Testreport
.
audit_user
,
isouter
=
True
)
.
filter
(
Testreport
.
create_user
==
session
.
get
(
"id"
),
Testreport
.
type
==
1
)
if
data
.
get
(
"project"
):
res
=
res
.
filter
(
Testreport
.
project
.
like
(
"
%
"
+
data
.
get
(
"project"
)
+
"
%
"
))
if
data
.
get
(
"story"
):
res
=
res
.
filter
(
Testreport
.
project
.
like
(
"
%
"
+
data
.
get
(
"story"
)
+
"
%
"
))
if
data
.
get
(
"remark"
):
res
=
res
.
filter
(
Testreport
.
project
.
like
(
"
%
"
+
data
.
get
(
"remark"
)
+
"
%
"
))
res
=
res
.
order_by
(
Testreport
.
audit_status
.
asc
(),
Testreport
.
create_time
.
desc
())
result
=
res
.
paginate
(
page
=
page_num
,
per_page
=
page_size
)
total_sum
=
result
.
total
res
=
result
.
items
db
.
session
.
close
()
return
jsonify
({
"code"
:
200
,
"message"
:
"请求成功"
,
"data"
:
{
"rows"
:
sqlOrmToJson
(
res
),
"total"
:
total_sum
}})
@
tool
.
route
(
'/testport/setauditor'
,
methods
=
[
"POST"
])
def
setauditor
():
ids
=
request
.
json
.
get
(
"ids"
)
res
=
db
.
session
.
query
(
Audituser
.
user_id
)
.
all
()
oldids
=
map
(
lambda
x
:
x
.
user_id
,
res
)
adds
=
[
i
for
i
in
ids
if
i
not
in
oldids
]
dels
=
[
i
for
i
in
oldids
if
i
not
in
ids
]
db
.
session
.
query
(
Audituser
)
.
filter
(
Audituser
.
user_id
.
in_
(
dels
))
.
delete
()
[
db
.
session
.
add
(
Audituser
(
user_id
=
i
))
for
i
in
adds
]
db
.
session
.
commit
()
db
.
session
.
close
()
return
jsonify
({
"code"
:
200
,
"message"
:
"更新成功"
})
@
tool
.
route
(
'/testport/getauditors'
,
methods
=
[
"GET"
])
def
getauditors
():
res
=
db
.
session
.
query
(
User
.
id
,
User
.
name
)
.
filter
(
Audituser
.
user_id
==
User
.
id
)
.
all
()
res
=
list
(
map
(
lambda
x
:
x
.
id
,
res
))
db
.
session
.
close
()
return
jsonify
({
"code"
:
200
,
"message"
:
"请求成功"
,
"data"
:
res
})
@
tool
.
route
(
'/testport/audit'
,
methods
=
[
"POST"
])
def
audit
():
if
db
.
session
.
query
(
Audituser
)
.
filter_by
(
user_id
=
session
.
get
(
"id"
))
.
count
()
!=
1
:
db
.
session
.
close
()
return
jsonify
({
"code"
:
403
,
"message"
:
"你无权操作"
}),
403
db
.
session
.
query
(
Testreport
)
.
filter
(
Testreport
.
id
==
request
.
json
.
get
(
"id"
))
.
update
(
request
.
json
)
db
.
session
.
commit
()
db
.
session
.
close
()
return
jsonify
({
"code"
:
200
,
"message"
:
"操作成功"
})
@
tool
.
route
(
'/testport/getreport/<id_>'
,
methods
=
[
"GET"
])
def
getreport
(
id_
):
res
=
db
.
session
.
query
(
Testreport
.
report_data
,
Testreport
.
type
,
Testreport
.
audit_status
,
Testreport
.
save_remark
,
Testreport
.
create_user
)
.
filter_by
(
id
=
id_
)
.
first
()
self_
=
res
.
create_user
==
session
.
get
(
"id"
)
and
True
or
False
db
.
session
.
close
()
res
=
sqlOrmToJson
(
res
)
del
res
[
"create_user"
]
res
[
"self"
]
=
self_
return
jsonify
({
"code"
:
200
,
"message"
:
"请求成功"
,
"data"
:
res
})
@
tool
.
route
(
'/testport/delreport/<id_>'
,
methods
=
[
"DELETE"
])
def
delreport
(
id_
):
res
=
db
.
session
.
query
(
Testreport
)
.
filter_by
(
id
=
id_
)
if
res
.
first
()
.
create_user
!=
session
.
get
(
"id"
):
db
.
session
.
close
()
return
jsonify
({
"code"
:
403
,
"message"
:
"你无权操作"
}),
403
res
.
delete
()
db
.
session
.
commit
()
db
.
session
.
close
()
return
jsonify
({
"code"
:
200
,
"message"
:
"删除成功"
})
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment