前提
需要拥有管理员账号或获得开发者权限
准备
钉钉方面
1. 创建钉钉应用
钉钉开放平台——应用开发——钉钉应用——创建应用
2. 记录应用凭证
记录应用的AgentId、AppKey、AppSecret
3. 打开应用权限
打开需要获取数据的接口权限
开发环境方面
1.安装依赖库
dingtalk-sdk 消息加解密同时兼容 cryptography 和 PyCrypto, 优先使用 cryptography 库。 需先自行安装 cryptography 或者 PyCrypto 库:
# 安装 cryptography
pip install cryptography>=0.8.2# 或者安装 PyCrypto
pip install pycrypto>=2.6.1
2.安装 dingtalk-sdk
pip install dingtalk-sdk# with cryptography
pip install dingtalk-sdk[cryptography]# with pycrypto
pip install dingtalk-sdk[pycrypto]
升级 dingtalk-sdk 到新版本
pip install -U dingtalk-sdk
ding_talk-sdk 使用文档https://dingtalk-sdk.readthedocs.io/zh_CN/latest/index.html
基础接口
1. 获得access_token
dingtalk-sdk方法
from dingtalk import SecretClient, AppKeyClientclient = SecretClient('corp_id', 'secret') # 旧 access_token 获取方式
client = AppKeyClient('corp_id', 'app_key', 'app_secret') # 新 access_token 获取方式
自建接口方法
# 获得access_token
def access_token():response = requests.get("https://oapi.dingtalk.com/gettoken?appkey=%s&appsecret=%s" % (AppKey, AppSecret))if response.status_code == 200:str_res = response.texttoken = (json.loads(str_res)).get('access_token')return token
2. 获得部门列表
dingtalk-sdk方法
departments = client.department.list()
自建接口方法
# 获得部门列表
def get_department_list(dept_id=1):"""查询部门列表:param dept_id: 部门id,默认为1(根部门),可输入任意部门id"""post_url = "https://oapi.dingtalk.com/topapi/v2/department/listsub?access_token=%s" % (access_token())data = {"dept_id": dept_id,}response = requests.post(post_url, data)str_res = response.textresult = (json.loads(str_res)).get('result')return result
注意,无论是 dingtalk-sdk 方法还是自建接口方法,都只能获取一层部门的列表,比如总公司下面有市场部和技术部,市场部下面有研究部和发展部,技术部下面有研究部和开发部,当我们调用接口的时候,只能获取到市场部和技术部,无法获取下面的子部门。想全部获取则需要做循环。
3. 获得部门员工
dingtalk-sdk方法
from dingtalk.client.api import Userobject = User(client=client) # 传入上文的token,实例化对象
data = object.listbypage(department_id, offset=0, size=100, order='custom', lang='zh_CN')
自建接口方法
我懒得分别调用每个部门,直接找了个获取全部在职员工的接口,写成了生成器,在外面直接遍历获取就行
# 获得在职员工列表
def get_user_list():post_url = "https://oapi.dingtalk.com/topapi/smartwork/hrm/employee/queryonjob?access_token=%s" % (access_token())# 初始化offsetoffset = 0while True:# 请求参数data = {"status_list": "2,3,5,-1","offset": offset,"size": 50}# 发送post请求response = requests.post(post_url, data)str_res = response.text# json解析result = (json.loads(str_res)).get('result')# 返回当前页列表yield result["data_list"]try:# 下一页(下50条)offset = result["next_cursor"]continue# 退出条件except KeyError:break
4. 获得用户详情
dingtalk-sdk方法
from dingtalk.client.api import Userobject = User(client=client) # 传入上文的token,实例化对象
data = object.get(userid, lang='zh_CN')
自建接口方法
# 获得员工详情(user_id)
def get_user_info(user_id):""":param user_id::return :"""post_url = "https://oapi.dingtalk.com/topapi/v2/user/get?access_token=%s" % (access_token())data = {"userid": user_id,}response = requests.post(post_url, data)str_res = response.textresult = (json.loads(str_res)).get('result')return result["name"], result["mobile"], result["userid"]
考勤统计接口
SDK方法已经没有啦~开始手撸~~~
1. 获取考勤报表列名称
我这里是把我需要的列的名称和id返回了,各位各取所需哈
def get_attcolumns():post_url = "https://oapi.dingtalk.com/topapi/attendance/getattcolumns?access_token=%s" % (access_token())response = requests.post(post_url)str_res = response.textresult = (json.loads(str_res)).get('result')name_list = ["应出勤天数", "出勤天数", "休息天数", "迟到次数", "迟到时长", "早退次数", "早退时长", "上班缺卡次数", "下班缺卡次数", "补卡次数", "旷工天数","加班-审批单统计", "加班总时长", "休息日加班", "工作日加班", "节假日加班"]response_data = {}for i in result["columns"]:try:print(i["name"] + ":" + str(i["id"]))except:print(i["name"] + ":NONE")if i["name"] in name_list:response_data[i["name"]] = i["id"]print(response_data)return response_data
2. 获取考勤报表列值
各取所需各取所需
def get_columnval(userid: str, name: str, column_id: str, from_date: str, to_date: str):"""from_date和to_date格式为:'2022-01-01 00:00:00'这样式的column_id为列名称id"""post_url = "https://oapi.dingtalk.com/topapi/attendance/getcolumnval?access_token=%s" % (access_token())data = {"userid": userid,"column_id_list": column_id,"from_date": from_date,"to_date": to_date}response = requests.post(post_url, data)str_res = response.textnum = 0try:result = (json.loads(str_res)).get('result').get("column_vals")for value in result[0]["column_vals"]:num += int(float(value["value"]))except:passreturn {name: num}
3. 获取员工考勤详情
这个接口有个问题,它两个时间的间隔不能超过七天..我就顺手也给时间搞定了。
还有个小建议,当你在外面调用接口的时候,肯定会有同时获取好多人考勤详情的情况,可以在调用的时候使用多线程,然后一次只传一个人的userid,效率会很高,不会搞线程的可以找我哈
# 查询员工考勤情况
def attendance_information(workDateFrom, workDateTo, userIdList):"""查询员工考勤情况:param workDateFrom: 查询开始时间 格式:"2022-03-10 00:00:00":param workDateTo: 查询结束时间 格式:"2022-03-10 00:00:00":param userIdList: 员工id列表:return: 员工考勤情况 type:dict"""num1 = int(time.mktime(time.strptime(workDateFrom, "%Y-%m-%d %H:%M:%S")))num2 = int(time.mktime(time.strptime(workDateTo, "%Y-%m-%d %H:%M:%S")))if num2 - num1 >= 604800:# 604800为七天的秒数response_data = []while True:if num1 >= num2:breakif num2 - num1 < 604800:end_num = num2else:end_num = num1 + 604799startTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(num1))endTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_num))data = attendance_info(startTime, endTime, userIdList)response_data += datanum1 = end_num + 1else:response_data = attendance_info(workDateFrom, workDateTo, userIdList)return response_data# 获取员工打卡结果
def attendance_info(workDateFrom, workDateTo, userIdList):post_url = "https://oapi.dingtalk.com/attendance/list?access_token=%s" % (access_token())data = {"workDateFrom": workDateFrom,"workDateTo": workDateTo,"userIdList": userIdList,"offset": 0,"limit": 50}response = requests.post(post_url, json.dumps(data))str_res = response.textrecordresult = (json.loads(str_res)).get('recordresult')return recordresult
4. 审批接口(请假、加班等)
总共分为两个步骤:
1. 获取审批实例ID列表
2. 获取审批实例详情
在调用第一个接口的时候,需要传入一个process_code参数,这个参数要在钉钉管理后台的审批模板编辑页面的URL中获取,这需要拿到OA审批的权限:
比如这个请假,点击最右侧的编辑,进入页面后,process_code就在上面这个位置,记录下来
有了这个code,就可以写接口调用了,这边我封装了一个类:
"""
请假processCode=PROC-EF6YNNYRN2-CRQI0Q3ZSC7L2TDQEKQO1-25SLJQZI-ST
加班processCode=PROC-EF6YNNYRN2-CRQI0Q3ZSC7L2TDQEKQO1-XZRLJQZI-GT
补卡processCode=PROC-EF6YNNYRN2-CRQI0Q3ZSC7L2TDQEKQO1-V2SLJQZI-NT
"""# 获取审批信息
class ProcessInstance:def __init__(self, process_code: str, start_time: int, end_time: int, userid: str):self.process_code = process_codeself.start_time = start_timeself.end_time = end_timeself.userid = userid# 获取审批实例id列表def processinstance_list(self):post_url = "https://oapi.dingtalk.com/topapi/processinstance/listids?access_token=%s" % (access_token())data = {"process_code": self.process_code,"start_time": self.start_time - 2678400000,"end_time": self.end_time + 2678400000,"userid_list": self.userid,"size": 20,"cursor": 0,}response = requests.post(post_url, data)str_res = response.textresult = (json.loads(str_res)).get('result').get("list")return result# 获取审批实例详情def getprocessinstance_details(self):lists = self.processinstance_list()response_list = []for i in lists:post_url = "https://oapi.dingtalk.com/topapi/processinstance/get?access_token=%s" % (access_token())data = {"process_instance_id": i,}response = requests.post(post_url, data)str_res = response.textresult = (json.loads(str_res)).get('process_instance')status = self.status_CN(result.get("status"))form_component_values = result.get("form_component_values")types = [x.get('value') for x in form_component_values if x.get("component_type") == "DDSelectField"][0]s_times = [(json.loads(x.get('value')))for x in form_component_values if x.get("component_type") == "DDDateRangeField"][0][0]if self.start_time <= self.date_to_UNIX(s_times) <= self.end_time:e_times = [(json.loads(x.get('value')))for x in form_component_values if x.get("component_type") == "DDDateRangeField"][0][1]days = [(json.loads(x.get('value')))for x in form_component_values if x.get("component_type") == "DDDateRangeField"][0][2]text = [x.get('value') for x in form_component_values if x.get("component_type") == "TextareaField"][0]response_list.append({"status": status, "types": types, "s_times": s_times, "e_times": e_times, "days": days,"text": text})print(response_list)return response_list# 获取天数def getprocessinstance_days(self):lists = self.processinstance_list()days = 0for i in lists:post_url = "https://oapi.dingtalk.com/topapi/processinstance/get?access_token=%s" % (access_token())data = {"process_instance_id": i,}response = requests.post(post_url, data)str_res = response.texttry:form_component_values = (json.loads(str_res)).get('process_instance').get("form_component_values")num = self.date_to_UNIX([(json.loads(x.get('value')))for x in form_component_values ifx.get("component_type") == "DDDateRangeField"][0][0])if self.start_time <= num <= self.end_time:days += float([(json.loads(x.get('value')))for x in form_component_values if x.get("component_type") == "DDDateRangeField"][0][2])except AttributeError:passreturn days@staticmethoddef date_to_UNIX(date: str):return int(time.mktime(time.strptime(date, "%Y-%m-%d"))) * 1000@staticmethoddef status_CN(status: str):if status == "NEW":return "新创建"elif status == "RUNNING":return "审批中"elif status == "COMPLETED":return "已完成"elif status == "CANCELED":return "已取消"
因为审批单发起的时间可能与审批单实际的时间不符,比如我明天要请假,我今天发起了请假审批,那我们传入明天的时间就会导致查不到这条审批,于是我就把传入的时间扩大了31天,等查到这个人在这个时间段内发起的审批单后,再根据审批单实际的时间把不符合查询时间筛选出去,这样就能最大程度的保证不遗漏。但是这样也有一个缺点,就是代码处理的时间变长了。
目前我用到的接口就是这些,然后就把数据处理成自己想要的样子,再放到前端页面上显示就可以啦。如果以后用到别的借口,还会继续在这个文章更新。可以简单看一下目前实现的效果