Commit e0181a22 authored by Wallen姚文辉's avatar Wallen姚文辉

添加安卓自动化测试工具

parent aeedd0d6
......@@ -11,6 +11,9 @@ andriodStatusSearchUser=[]
iosStatusSearchUser=[]
appiumLogUser=[]
def usbInfoFlush(data):
......@@ -40,6 +43,12 @@ def IosConnectInfo(data):
print(data)
for each in iosStatusSearchUser:
server.socketio.emit('iosinfo',data,to=each)
def sendappiumLog(data):
print("发送日志")
print(data)
for each in appiumLogUser:
server.socketio.emit('sendappiumLog',data,to=each)
onEvent=[
["主机服务","列表刷新",usbInfoFlush],
......@@ -66,6 +75,9 @@ def disconnect():
request.sid in globalStatusSearchUSer and globalStatusSearchUSer.remove(request.sid)
request.sid in andriodStatusSearchUser and andriodStatusSearchUser.remove(request.sid)
request.sid in iosStatusSearchUser and iosStatusSearchUser.remove(request.sid)
request.sid in appiumLogUser and appiumLogUser.remove(request.sid)
if not appiumLogUser:
server.listener.off("主机服务","appium日志",sendappiumLog)
@server.socketio.on('usbInfo')
def usbInfo():
......@@ -103,43 +115,64 @@ def leaveAdbinfo():
def getStatus():
iosStatusSearchUser.append(request.sid)
# @server.socketio.on('connectUsb')
# def connectUsb(key):
# print(key)
# server.listener.emit("连接usb",key)
@server.socketio.on('connectUsb')
def connectUsb(key):
print(key)
server.listener.emit("连接usb",key)
# @server.socketio.on('killproxy')
# def killproxy():
# server.listener.emit("杀死转发链路")
@server.socketio.on('killproxy')
def killproxy():
server.listener.emit("杀死转发链路")
# @server.socketio.on('restartAppium')
# def restartAppium():
# server.listener.emit("重启appium")
@server.socketio.on('restartAppium')
def restartAppium():
'''
重启appium
'''
server.listener.emit("重启appium")
print("重启appium")
# @server.socketio.on('startAppium')
# def restartAppium():
# print("启动appium")
# server.listener.emit("启动appium")
@server.socketio.on('startAppium')
def restartAppium():
print("启动appium")
server.listener.emit("启动appium")
# @server.socketio.on('connectIOS')
# def connectIos(WDAID):
# print(WDAID)
# server.listener.emit("连接ios",WDAID)
@server.socketio.on('connectIOS')
def connectIos(WDAID):
print(WDAID)
server.listener.emit("连接ios",WDAID)
# @server.socketio.on('disconnectIOS')
# def connectIos(WDAID):
# server.listener.emit("断开ios连接",WDAID)
@server.socketio.on('disconnectIOS')
def connectIos(WDAID):
server.listener.emit("断开ios连接",WDAID)
@server.socketio.on('appiumLog')
def appiumLog():
if not appiumLogUser:
print("添加监听")
server.listener.on("主机服务","appium日志",sendappiumLog)
appiumLogUser.append(request.sid)
print(appiumLogUser)
@server.socketio.on('leaveappiumLog')
def appiumLog():
request.sid in appiumLogUser and appiumLogUser.remove(request.sid)
if not appiumLogUser:
print("移除监听")
server.listener.off("主机服务","appium日志",sendappiumLog)
print(appiumLogUser)
# server.listener.addevent("连接usb","触发usbip绑定事件,参数为需要连接的key")
# server.listener.addevent("杀死转发链路","杀死ssh隧道转发")
# server.listener.addevent("重启appium","重启appium服务")
# server.listener.addevent("启动appium","启动appium服务")
# server.listener.addevent("连接ios","连接ios,参数为webdriveragentId")
# server.listener.addevent("断开ios连接","断开所有ios的连接")
server.listener.addevent("连接usb","触发usbip绑定事件,参数为需要连接的key")
server.listener.addevent("杀死转发链路","杀死ssh隧道转发")
server.listener.addevent("重启appium","重启appium服务")
server.listener.addevent("启动appium","启动appium服务")
server.listener.addevent("连接ios","连接ios,参数为webdriveragentId")
server.listener.addevent("断开ios连接","断开所有ios的连接")
# for each in onEvent:
# server.listener.on(*each)
for each in onEvent:
server.listener.on(*each)
......@@ -5,15 +5,14 @@ from flask_socketio import SocketIO
from datetime import timedelta
from manager.tools import get_config
from jenkins import Jenkins
# from lisenterClinet import server as lisenterServer
from lisenterClinet import server as lisenterServer
config=get_config()
mysqlConf=config["mysql"]
Jenkinsconf=config["jenkins"]
# centerPath=config['other']['listenCenterPath']
centerPath=config['other']['listenCenterPath']
# print(centerPath)
class server():
app = Flask(__name__, template_folder='auto_test/html', static_folder='')
......@@ -26,7 +25,7 @@ class server():
socketio = SocketIO()
socketio.init_app(app, cors_allowed_origins='*')
auto_jenkins=Jenkins(Jenkinsconf["address"],Jenkinsconf["user"],Jenkinsconf["password"])
# listener=lisenterServer('测试中心',centerPath)
listener=lisenterServer('测试中心',centerPath)
......
import subprocess
import time
import os
def test(path):
bath_path='~'
def strftime(type,str):
'''
type 1-包含年 0-不包含
'''
a=type and time.strptime(str, "%b-%d-%Y") or time.strptime("Jun-17 18:58", "%b-%d %H:%M")
return time.strftime("%m-%d %H:%M:%S", a)
k=lambda x:':' in x and strftime(0,x) or strftime(1,x)
a=subprocess.getoutput('cd %s && ls -l' % os.path.join(bath_path,path))
b=a.split('\n')[1:]
for i in range(len(b)):
b[i]=b[i].split()
s=list(map(lambda x:{'floder':x[0][0]=='d' and True or False,'size':x[4],'update': k(x[5]+'-'+x[6]+(':' in x[7] and ' '+x[7] or '-'+x[7])),'name':x[8]},b))
return s
print(test('project-other/source'))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment