测试管理_利用python连接禅道数据库并统计bug数据到钉钉
这篇不多赘述,直接上代码文件。
另文章基础参考博文:参考博文
加以我自己的需求优化而成。
统计的前提
以下代码统计的前提是禅道的提bug流程应规范化
- bug未解决不删除
- bug未关闭不删除
db_config.py
连接数据库的操作
#coding=utf-8
#!/usr/bin/python3class mysql_config():'''def __init__(self,name):#print 'aaaa'self.name = nameprint name'''def get_config(self, name):#建立一个配置,后续通过这个配置和数据库建立连接,创建游标self.name = nameconfig = {'zentao': {'host': '192.****.****.*','user': '****','passwd': '','db': 'zentao','port': ****,'charset': 'utf8' # 这里是解决中文乱码情况},}return config[name]
mysql.py
连接禅道数据库并统计的操作
#coding=utf-8
#!/usr/bin/python3#import MySQLdb
import re
import pymysql
import datetime
from db_config import mysql_config
m_config = mysql_config()#实例化mysql_config类class db_mysql():def __init__(self):print('class:db_mysql -import -true')# 离职人员u.deleted=1# 连接mysqldef connect(self, name):# self.sql = sqlself.name = nameconfig = m_config.get_config(name) # 引用配置的获取链接方法db = pymysql.connect(**config)#建立连接cursor = db.cursor()#建立游标return cursor# 执行Sql语句def execute(self, cursor, sql):cursor.execute(sql)return cursor# 获取全部结果def fetchall(self, cursor):data = cursor.fetchall()# print("获取所有结果fetchall:"+data)return data# 获取一个结果def fetchone(self, cursor):data1=cursor.fetchone()# print("获取单个结果fetchall:" + data1)return data1# 查询bug汇总(年度)就只统计个总数,总新增bug总数包含已关闭的# 本年度新增bug总情况,如效果图1def bug_total(self, cursor, now,begin_day,total_day):"""最近总的BUG情况统计统计:param: day 根据输入天数:return:"""#在"%s" and "%s"期间 新增bug数new_near_bug_sql = """SELECT COUNT(*) as new_near_bug from zt_bug where openedDate between "%s" and "%s" and deleted='0';""" % (begin_day, now)cursor.execute(new_near_bug_sql)new_near_bug = cursor.fetchone()# print("新增bug数")# print(new_near_bug)# 在"%s" and "%s"期间已解决bug数close_bug_sql = """SELECT COUNT(*) as close_bug from zt_bug where status = "closed" and openedDate between "%s" and "%s" and deleted='0';""" % (begin_day, now)#"%s"表示这里将被替换成一个新的字符串,依次为recent_sevenday, nowcursor.execute(close_bug_sql)close_bug = cursor.fetchone()# print("已解决bug数")# print(close_bug)# 在"%s" and "%s"期间未解决bug数open_bug_sql = """SELECT COUNT(*) as open_bug from zt_bug where status = "active" and openedDate between "%s" and "%s" and deleted='0';""" % (begin_day, now)cursor.execute(open_bug_sql)open_bug = cursor.fetchone()# print("没解决bug数",)# print(open_bug)# 在"%s" and "%s"期间已解决待验证bug数close_unbug_sql = """SELECT COUNT(*) as close_unbug from zt_bug where status = "resolved" and openedDate between "%s" and "%s" and deleted='0';""" % (begin_day, now)cursor.execute(close_unbug_sql)close_unbug = cursor.fetchone()# print("已解决待验证bug数", )# print(close_unbug)# 在"%s" and "%s"期间,研发人员发生bug数排行。条件:时间,不剔除已删除的bug,如效果图3dev_bug_sql = """SELECT COUNT(*) num,realname FROM zt_bug b INNER JOIN zt_user u ON u.account = b.resolvedByWHERE DATE_FORMAT(b.openedDate, '%%Y-%%m-%%d') between '%s' and '%s'AND u.deleted='0' AND u.role='dev' GROUP BY b.resolvedBy ORDER BY num DESC;""" % (total_day, now)cursor.execute(dev_bug_sql)dev_bug = cursor.fetchall()print("研发人员发生bug数降序排行", )# print(dev_bug)add_str_dev_bug = '' # 空字符串#让降序排行一行一人for _tuple in dev_bug:a = ' \n \n {0}'.format(_tuple)add_str_dev_bug += str(a)# print(add_str_dev_bug)# 在"%s" and "%s"期间研发人员BUG被激活次数(非一次性修复),不剔除已删除的bug,如效果图4activation_bug_sql = """SELECT SUM(激活次数) 激活次数,中文姓名 FROM (SELECT b.id,COUNT(*) 激活次数,u.realname 中文姓名 FROMzt_bug b INNER JOIN zt_action a ON a.objectID = b.id INNER JOIN zt_user u ON u.account = b.resolvedByWHERE DATE_FORMAT(b.openedDate, '%%Y-%%m-%%d') between "%s" and "%s" AND a.objectType = 'bug' AND a.action = 'activated'AND u.deleted='0' and u.role='dev'GROUP BY b.id ORDER BY 激活次数 DESC) tem GROUP BY tem.中文姓名 ORDER BY 激活次数 DESC;""" % (total_day, now)cursor.execute(activation_bug_sql)activation_bug = cursor.fetchall()print("研发人员BUG被激活次数(非一次性修复)", )print(activation_bug)add_str_activation_bug = '' # 空字符串#让降序排行一行一人for _tuple in activation_bug:a = ' \n \n {0}'.format(_tuple)#(Decimal('19')中的(Decimal('')去掉a = re.sub('[(Decimal('')]', '', a)# print(a)add_str_activation_bug += str(a)# print("拆开元组,研发人员BUG被激活次数(非一次性修复)", )# print(add_str_activation_bug)# 提BUG、指派、转派【已解决待验证】、改BUG,=====================\n\n研发人员BUG被激活次数(非一次性修复):{5}\n\nstatistics_bug = "本年度新增bug总情况({6}~{7}) \n\n 新增BUG数:{0} \n\n 未关闭BUG数:{1} \n\n 已关闭BUG数:{2} \n\n 已解决待验证BUG数:{3}\n\n =====================\n\n研发人员今年(2023)BUG数倒序:\n\n{4}\n\n =====================\n\n研发人员今年BUG被激活次数(非一次性修复):\n\n{5}\n\n".format(new_near_bug[0], open_bug[0], close_bug[0], close_unbug[0], add_str_dev_bug, add_str_activation_bug, total_day, now)# print("bug的统计汇总:"+statistics_bug)return statistics_bug# 查询bug汇总(本周),就只统计个总数,总新增bug总数包含已关闭的# 本周新增bug总情况def bug_total_week(self, cursor, begin_day, now):"""最近总的BUG情况统计统计:param: day 根据输入天数:return:"""# 在"%s" and "%s"期间新增bug数new_near_bug_sql = """SELECT COUNT(*) as new_near_bug from zt_bug where openedDate between "%s" and "%s" and deleted='0';""" % (begin_day, now)cursor.execute(new_near_bug_sql)new_near_bug = cursor.fetchone()# print("新增bug数")# print(new_near_bug)# 在"%s" and "%s"期间已解决bug数close_bug_sql = """SELECT COUNT(*) as close_bug from zt_bug where status = "closed" and openedDate between "%s" and "%s" and deleted='0';""" % (begin_day, now) # "%s"表示这里将被替换成一个新的字符串,依次为recent_sevenday, nowcursor.execute(close_bug_sql)close_bug = cursor.fetchone()# print("已解决bug数")# print(close_bug)# 在"%s" and "%s"期间未解决bug数open_bug_sql = """SELECT COUNT(*) as open_bug from zt_bug where status = "active" and openedDate between "%s" and "%s" and deleted='0';""" % (begin_day, now)cursor.execute(open_bug_sql)open_bug = cursor.fetchone()# print("没解决bug数",)# print(open_bug)# 在"%s" and "%s"期间已解决待验证bug数close_unbug_sql = """SELECT COUNT(*) as close_unbug from zt_bug where status = "resolved" and openedDate between "%s" and "%s" and deleted='0';""" % (begin_day, now)cursor.execute(close_unbug_sql)close_unbug = cursor.fetchone()# print("已解决待验证bug数", )# print(close_unbug)# 提BUG、指派、转派、改BUG,=====================\n\n研发人员BUG被激活次数(非一次性修复):{5}\n\n# print("bug的统计汇总:"+statistics_bug)statistics_bug_week = "=====================\n\n本周新增bug总情况({4}~{5}) \n\n 新增BUG数:{0} \n\n 未关闭BUG数:{1} \n\n 已关闭BUG数:{2} \n\n 已解决待验证BUG数:{3}\n\n =====================\n\n".format(new_near_bug[0], open_bug[0], close_bug[0], close_unbug[0], begin_day, now)return statistics_bug_week# 查询bug明细,具体以人为维度,统计每个人的bug明细,以年为维度def bug_detail_year(self, cursor, this_year, now):"""包含姓名的年度明细最近总的BUG情况统计明细数据:param: day 根据输入天数:return:"""cursor.execute("""select "%s" as 开始时间 ,"%s" as 结束时间,u.realname as 姓名 ,count(*) as 总bug数, sum(case when b.status="active" then 1 else 0 end) as 未解决bug数 from zt_bug b left join zt_user u on b.assignedTo = u.account AND u.deleted='0' and u.role='dev' where b.deleted='0' AND b.status and b.openedDate BETWEEN "%s" and "%s" group by b.assignedTo order by 未解决bug数 desc;""" % (this_year, now, this_year, now))data = cursor.fetchall() # 把sql执行的结果赋值给data,data的格式为元组,元组套元组data2 = data[1:] # 取除列表第一个外的其他数据# print(data2)add_str_year = '' # 空字符串for _tuple in data2:# print("_tuple:"+str(_tuple))if _tuple[2] is None:print("姓名为空不输出")elif _tuple[4]==0:print("累计未解决等于0,不输出")else:a = '\n \n 姓名:{0},累计未解决bug数:{1}'.format(_tuple[2], _tuple[4])add_str_year += aadd_str_year="\n \n =====================\n \n历史遗留:未解决bug数统计 "+add_str_year# add_str_1="以下是研发人员现存bug统计明细 \n \n"+add_str# print("add_str--year,bug明细:" + add_str_year)return cursor, add_str_year# 查询bug明细,具体以人为维度,统计每个人的bug明细,如效果图2def bug_detail(self, cursor, recent_sevenday,now):"""本周的带姓名明细最近总的BUG情况统计明细数据:param: day 根据输入天数:return:"""db = db_mysql()cursor.execute("""select "%s" as 开始时间 ,"%s" as 结束时间,u.realname as 姓名 ,count(*) as 总bug数, sum(case when b.status="active" then 1 else 0 end) as 未解决bug数,sum(case when b.status="resolved" then 1 else 0 end) as 已解决待验证bug数,sum(case when b.status="closed" then 1 else 0 end) as 已解决bug数 from zt_bug b left join zt_user u on b.assignedTo = u.account AND u.deleted='0' where b.deleted='0' AND b.status and b.openedDate BETWEEN "%s" and "%s" group by b.assignedTo order by 总bug数 desc;""" % (recent_sevenday, now, recent_sevenday, now))data = cursor.fetchall()#把sql执行的结果赋值给data,data的格式为元组,元组套元组# data2=data[1:]# print(data2)add_str = ''#空字符串for _tuple in data:# print("_tuple:"+str(_tuple))if _tuple[2] is None:print("空值不输出")else:name = _tuple[2]a = '\n \n =====================\n \n 姓名:{2}\n \n开始时间:{0}\n \n结束时间:{1}\n \n总bug数:{3}\n \n未解决bug数:{4}\n \n已解决待验证bug数:{5}\n \n'.format(_tuple[0], _tuple[1], name, _tuple[3], _tuple[4],_tuple[5])add_str += a# add_str=add_str+db.bug_detail_year(db.connect('zentao'), "2018-01-01", "2023-04-13")[1]+a# add_str_1="以下是研发人员现存bug统计明细 \n \n"+add_strprint("add_str:"+add_str)return cursor, add_str#统计bug明细详情titledef bug_detail_title_delay(self, cursor, now):"""包含姓名的年度明细,bug明细详情,bug标题最近总的BUG情况统计明细数据:param: day 根据输入天数:return:"""cursor.execute("""SELECT u.realname as 姓名 ,b.title AS 已延期bug标题FROM zt_bug b left join zt_user u on b.assignedTo = u.account WHERE DATEDIFF(b.deadline,"%s") <=0 and b.status="active" and u.deleted="0" and u.role="dev" and b.deleted="0";"""% (now))data = cursor.fetchall() # 把sql执行的结果赋值给data,data的格式为元组,元组套元组# # data2 = data[1:] # 取除列表第一个外的其他数据data2 = data[0:]print(data2)add_str_year = '' # 空字符串for _tuple in data2:print("_tuple:"+str(_tuple))a = '\n \n 姓名:{0},bug标题:{1} \n '.format(_tuple[0], _tuple[1])add_str_year += aadd_str_year="\n \n =====================\n \n =====================\n \n =====================\n \n 延期未处理bug明细如下: "+add_str_yearprint(add_str_year)return add_str_year# 统计延期bug明细详情titledef bug_detail_title_year(self, cursor, this_year, now):"""包含姓名的年度明细,bug明细详情,bug标题最近总的BUG情况统计明细数据:param: day 根据输入天数:return:"""cursor.execute("""select "%s" as 开始时间 ,"%s" as 结束时间,u.realname as 姓名 ,b.title AS bug标题 from zt_bug b left join zt_user u on b.assignedTo = u.account where u.deleted='0' AND b.deleted='0' AND b.`status`='active' and b.openedDate BETWEEN "%s" and "%s" order by 姓名 desc;""" % (this_year, now,# now,this_year, now))# '''data = cursor.fetchall() # 把sql执行的结果赋值给data,data的格式为元组,元组套元组# # data2 = data[1:] # 取除列表第一个外的其他数据data2 = data[0:]# print("data2~~~")# print(data2)add_str_year = '' # 空字符串for _tuple in data2:print("_tuple:" + str(_tuple))if _tuple[2] is None:print("姓名为空不输出")# elif _tuple[4]==0:# print("累计未解决等于0,不输出")else:# # a = '\n \n 姓名:{0},累计未解决bug数:{1}'.format(_tuple[2], _tuple[4])a = '\n \n \n \n 姓名:{0},标题:{1}'.format(_tuple[2], _tuple[3])#add_str_year += aadd_str_year = "\n \n =====================\n \n历史遗留:bug明细 " + add_str_year# add_str_1="以下是研发人员现存bug统计明细 \n \n"+add_strprint("add_str--year:" + add_str_year)print("历史bug_title" + str(data))return add_str_year#关于年度未解决bug数的html表,效果如下图:点击查看历史遗留bug详情效def bug_detail_title_year2(self, cursor, this_year, now):"""包含姓名的年度明细,bug明细详情,bug标题最近总的BUG情况统计明细数据:param: day 根据输入天数:return:"""cursor.execute(
"""
select p.name AS 项目名称 ,"%s" as 开始时间 ,"%s" as 结束时间,u.realname as 姓名 ,b.title AS bug标题,b.id AS bug_id
from zt_bug b
left join zt_user u on b.assignedTo = u.account
left join zt_project p on p.id = b.project
where u.deleted='0' AND b.deleted='0' AND b.`status`='active'
and b.openedDate BETWEEN "%s" and "%s" order by 姓名 desc;
"""% (this_year, now,# now,this_year, now))data = cursor.fetchall()return data# 关于延期未解决bug数的html表def bug_delay_bug_year(self, cursor, now):"""包含姓名的年度明细,bug明细详情,bug标题最近总的BUG情况统计明细数据:param: day 根据输入天数:return:"""cursor.execute(
"""SELECT p.name AS 项目名称 ,u.realname as 姓名 ,b.title AS 已延期bug标题, b.keywords AS 原因
FROM zt_bug b left join zt_user u on b.assignedTo = u.account
left join zt_project p on p.id = b.project
WHERE DATEDIFF(b.deadline,"%s") <=0 and b.status="active" and u.deleted="0"
and u.role="dev" and b.deleted="0" group by 已延期bug标题 order by 项目名称 desc;
"""% (now))data = cursor.fetchall()return data#以下为加入参数测试上述sql统计
now = datetime.datetime.now().strftime("%Y-%m-%d")
db=db_mysql()
# db.bug_detail_year(db.connect('zentao'), "2020-04-07", "2023-08-25")
#禅道bug标题明细
# db.bug_detail_title_year(db.connect('zentao'), "2023-01-01", now)
#禅道延期bug明细
# db.bug_detail_title_delay(db.connect('zentao'), now)
# db.bug_detail(db.connect('zentao'), "2023-09-18", "2023-09-25")
# db.bug_total(db.connect('zentao'),now,"2023-01-01", "2023-01-01")
一周统计
每个开发维度的统计
所有开发bug总数的倒序
所有开发bug激活数的倒序
dingtalk.py
统计结果发送到钉钉的操作
#coding=utf-8
#!/usr/bin/python3import json
import urllib.request
import datetime
from dingtalkchatbot.chatbot import DingtalkChatbot, FeedLinkclass ding_talk():def send_bug(self,url, data_file,sign_mark):#设置发送的信息样式,效果如下截图''':param url: 钉钉机器人的webhook:param data_file: 查看详情中的markdown信息:param sign_mark: 用户而可以自定义为本周、还是本月的禅道BUG情况统计:return:'''xiaoding = DingtalkChatbot(url)# Markdown消息@所有人now = datetime.datetime.now().strftime("%Y%m%d")# print(now)xiaoding.send_markdown(title='BUG统计%s' % (now),text='**禅道BUG情况统计**\n\n 各位同事,以下是禅道BUG情况统计。统计结果供各组组长参考,烦请做好督促,尽快处理bug!\n\n=====================\n\n {0} \n\n[查看详情](http://192.****.****.*/zentao/html/bug_detail_{1}.html) \n'.format(data_file,now), is_at_all=False)def bug_html(self, lis, html_file):"""对查询bug明细转html文件:param lis:param html_file"""conten_title = []for key in lis.description:conten_title.append(key[0])a = "</th><th>".join(conten_title)con_title = "<tr><th>" + a + "</th></tr>"conten_val = ""con = ""lis_arr = lis.fetchall()for i in range(0, len(lis_arr)):for index, v in enumerate(lis_arr[i]):if index == 0:conten_val = "<tr><td>" + lis_arr[i][index] + "</td><td>"con = con + conten_val;continuecon = con + str(lis_arr[i][index]) + "</td><td>"con = con[0:-2] + "r>"con = con + "\n"head = """<meta charset="utf-8"><style type="text/css">table.tftable {font-size:12px;color:#333333;width:100%;border-width: 1px;border-color: #9dcc7a;border-collapse: collapse;}table.tftable th {font-size:12px;background-color:#abd28e;border-width: 1px;padding: 8px;border-style: solid;border-color: #9dcc7a;text-align:left;}table.tftable tr {background-color:#ffffff;}table.tftable td {font-size:12px;border-width: 1px;padding: 8px;border-style: solid;border-color: #9dcc7a;}</style>\n<table id="tfhover" class="tftable" border="1">\n"""last = "</table>"htm = head + con_title + con + lastwith open(html_file, "w", encoding="utf-8") as f:f.write(htm)
钉钉通知效果
禅道BUG情况统计各位同事,以下是禅道BUG情况统计。统计结果供各组组长参考,烦请做好督促,尽快处理bug!==========================================本周新增bug总情况(2023-01-28~2023-02-04)新增BUG数:19未关闭BUG数:1已关闭BUG数:17已解决待验证BUG数:1=====================本年度新增bug总情况(2023-01-01~2023-02-04)新增BUG数:2231未关闭BUG数:3已关闭BUG数:2210已解决待验证BUG数:18=====================研发人员今年(2023)BUG数倒序:'27', '周**''24', '吴**''18', '郑**''12', '王**'=====================研发人员今年BUG被激活次数(非一次性修复):'27', '周**''24', '吴**''18', '郑**''12', '王**'=====================姓名:周**开始时间:2023-01-28结束时间:2023-02-04总bug数:1未解决bug数:0已解决待验证bug数:1=====================姓名:吴**开始时间:2023-01-28结束时间:2023-02-04总bug数:1未解决bug数:1已解决待验证bug数:0=====================历史遗留:未解决bug数统计姓名:吴**,累计未解决bug数:1查看历史遗留bug详情
点击查看历史遗留bug详情效果如下
run.py
所有代码文件执行入口
#coding=utf-8
#!/usr/bin/python3import datetimefrom mysql import db_mysql
from dingtalk import ding_talk
ding_ding = ding_talk()
mysql_obj = db_mysql()#获取时间:
now = datetime.datetime.now().strftime("%Y-%m-%d")now1=(datetime.datetime.now() + datetime.timedelta(days=1)).strftime("%Y-%m-%d")#当前时间加一天,并展示为2023-01-01形式;如果只获取当前时间,则只统计到当前时间凌晨0点
print("~~~now1的,时间"+now1)
recent_sevenday = (datetime.datetime.now() - datetime.timedelta(days=7)).strftime("%Y-%m-%d")
print("~~~一周开始的时间"+recent_sevenday)
this_year = "2023-01-01"
print("~~~今年的时间"+this_year)#bug明细写入html
cursor_connect = mysql_obj.connect('zentao')#先连接数据库
sign_mark="本周"
#因为bug_detail有两个返回值,所以要赋值给两个变量。如果只取一个就用索引recent_sevenday,#查询延期未处理bug明细,姓名+标题
# add_str_delay = mysql_obj.bug_detail_title_delay(cursor_connect, now)
#查询本周bug明细,具体到每个人有多少未处理
cursor_execute, add_str = mysql_obj.bug_detail(cursor_connect, recent_sevenday, now)
#查询历史bug明细,只显示姓名和bug数
cursor_execute_year, add_str_year = mysql_obj.bug_detail_year(cursor_connect, "2015-01-01", now)
#以下详细的明细。统计开发人员未解决bug数,已关闭bug数,已解决待关闭bug数的统计时间,bug总数降序和bug被激活的时间,print("seven_count:"+seven_count)
seven_count = mysql_obj.bug_total(cursor_connect, now, this_year, this_year)#最近总的BUG情况统计,第一个是连接数据库;第二个是当日时间;
#以下简单的数量统计。统计本周新增bug总情况,print("seven_count_week:"+seven_count_week)
seven_count_week=mysql_obj.bug_total_week(cursor_connect, recent_sevenday, now)#统计历史bug的标题明细
# 生成html
seven_count_title2 = mysql_obj.bug_detail_title_year2(cursor_connect, this_year, now)
# 不生成html + seven_count_title历史bug的标题明细
# seven_count_title = mysql_obj.bug_detail_title_year(cursor_connect, this_year, now)
#全部都展示出来
seven_count=seven_count_week + seven_count + add_str + add_str_year# +add_str_delay+seven_count_title#只展示
# seven_count=add_str_delay+add_str#将统计生成的html放在windows的路径,如要换在服务器上,要换对应路径
# html = r"D:\03code\zentao -git版本 - 增加bug明细\test.html"
#将统计生成的html放在 服务器的指定路径,上传git版本
html = "/home/tomcat/tomcat7/webapps/test/bugdetail.html".format(now)
ding_ding.bug_html2(seven_count_title2, html)print("run_week的输出~~~~~~~~~~~~~~~~~~~~"+seven_count)
#测试组1
# url="https://oapi.dingtalk.com/robot/send?access_token=****"
#研发组
url="https://oapi.dingtalk.com/robot/send?access_token=****"
ding_ding.send_bug(url, seven_count, sign_mark)#ding_talk的send_bug方法,发送send_markdown;def send_bug(self,url, data_file,sign_mark)# +add_str_delay+seven_count_title#只展示
# seven_count=add_str_delay+add_str#将统计生成的html放在windows的路径,如要换在服务器上,要换对应路径
# html = r"D:\03code\zentao -git版本 - 增加bug明细\test.html"
#将统计生成的html放在 服务器的指定路径,上传git版本
html = "/home/tomcat/tomcat7/webapps/test/bugdetail.html".format(now)
ding_ding.bug_html2(seven_count_title2, html)print("run_week的输出~~~~~~~~~~~~~~~~~~~~"+seven_count)
#测试组1
# url="https://oapi.dingtalk.com/robot/send?access_token=****"
#研发组
url="https://oapi.dingtalk.com/robot/send?access_token=****"
ding_ding.send_bug(url, seven_count, sign_mark)#ding_talk的send_bug方法,发送send_markdown;def send_bug(self,url, data_file,sign_mark)