camunda 与 pycamunda
相关链接:
camunda 官方社区:https://docs.camunda.org/manual/7.17/
官方社区提供的REST_API:https://docs.camunda.org/manual/7.17/reference/rest/
GITHUB 社区:https://github.com/camunda-community-hub
GitHub开源库:https://github.com/camunda-community-hub
pycamunda的官方文档:https://pycamunda.readthedocs.io/en/latest/index.html
pycamunda相关介绍
1.介绍
主要使用到的功能分为3部分,创建用户,将camunda modeler文档部署到camunda服务器上,启动流程定义和流程实例,进行流程任务的管理
配置方式说明:
URL_CAMUNDA_ENGINE = "http://"+IP_ADDR+":8080/engine-rest"
其中的IP_ADDR需要换成camunda引擎部署机的IP,在多个函数中均会用到该内容
camunda文件的部署到启动过程包含:部署的创建,添加相关的资源文件,进行部署并获取部署的流程定义id,根据流程定义生成对应的流程实例,运行流程实例即开启了一个流程。
部署
在当前代码中采用的部署方式为单独创建部署,并且记录部署文件的"proc_key",
部署camunda操作使用:script.camunda_execute文件进行操作
每个camunda model文件包含一个唯一的proc_key
在生成一个部署后将该id 记录,每一个作业票的流程有对应的流程id
启动流程
在前端点击按键发起一个作业申请时,后端会执行启动一个流程的功能。启动流程的功能在下面进行介绍
完成相关任务
在前端页面有确认按键,以及同意或者拒绝,都会调用后端的端口,后端的实现是通过完成相关的任务实现的。
要启动流程需要知道一个流程的proc_def_id以及启动文件时所需的变量,
获取peoc_def_id 的方式:
def getlist_proc_def(key_name):#key = key_namegetlist = processdef.GetList(url=URL_CAMUNDA_ENGINE,key=key_name,latest_version=True,sort_by='version',ascending=False)proc_def = getlist()[0]return proc_def.id_
启动流程核心代码位于:utils.camunda_func.start_proc2(proc_def_id,variable),其中填写的proc_def_id 是由
获取任务列表
当前端访问接口时,要获取用户当前所需处理的任务,就需要访问获取某个用户需要执行的任务的函数 获取到的任务就是需要执行的任务,其他没有执行的任务的可以从对应的停送电日志表,维修日志表,质量检测日志表中获取到,其中的内容包含对应的状态
2.创建用户函数
pycamunda提供的函数
from pycamunda import user
def create_user(user_list):for users in user_list:create = user.Create(url=url, id_=users, first_name=users,last_name=users, password="123456")create()
# 说明:user.Create()在实例化后,需要进行执行才能创建出用户,create()不能省 url 为 URL_CAMUNDA_ENGINE = "http://"+IP_ADDR+":8080/engine-rest" 以下不再赘述
生成假数据的相关函数:
位置:script.postgresql_create_data 包含多个函数:添加部门,用户数据,以及将数据导入文件等操作
3.部署与启动流程相关
部署相关
from pycamunda import deployment
create_file = deployment.Create(url=url, name=filepath.split(".")[0], )
create_file.add_resource(content)
deploy = create_file()
def create_deploy_single(bpmnfile):filepath = bpmnfilewith open(filepath, "rb") as ff:content = ff.read()create_file = deployment.Create(url=url, name=filepath.split(".")[0], )create_file.add_resource(content)deploy_dic = {}try:deploy = create_file()pro_dic = deploy.deployed_process_definitionsdeploy_id = deploy.id_deploy_dic.update({"deploy_id": deploy_id})#print(pro_dic)# print(deploy_dic)# items = pro_dic.items()# print(len(pro_dic),pro_dic,type(pro_dic))for key, value in pro_dic.items():pro_id = value.id_deploy_dic.update({"pro_id": pro_id,"key":value.key,"name":value.name})return deploy_dicexcept Exception as e:print(e,sys._getframe().f_code.co_name)return deploy_dic
流程定义和流程实例
获取流程定义id
from utils import camunda_func
proc_def_id = camunda_func.getlist_proc_def(key_name=proc_key)
根据流程定义id生成和启动流程实例
from pycamunda import processdef,processinst
def start_proc2(proc_def_id,variables):# 部署后生成的proc_def_idtry:"""name: str, value: typing.Any, type_: str = None, value_info: str = None"""key = str(uuid.uuid4())#print(key)start_pro = processdef.StartInstance(url=url, id_=proc_def_id, business_key=key)for variable in variables:start_pro.add_variable(name=variable.get("name"),value=variable.get("value"),type_=variable.get("type"))process1 = start_pro()processinst.Activate(url=url, id_=process1.id_)#print(process1.id_)# camunda_api_url = url + '/process-definition/{}/form-variables'.format(str(proc_id))#return json.dumps({"code": 0, "message": "start instance successful","proc_id":process1.id_})#print( {"proc_inst_id":process1.id_})return {"code": 0, "message": "start instance successful","proc_inst_id":process1.id_}except Exception as e:print(e,sys._getframe().f_code.co_name)return {"code":1,"message":"start instance fail"}
其中涉及到了启动一个流程定义的实例,添加变量,对实例进行激活的操作
分别对应processdef.StartInstance,add_variables,
processinst.Activate
4.任务处理相关
任务列表获取
from pycamunda import task
def get_list_application_imformation2(assignee,taskname=None):getlist = task.GetList(url=URL_CAMUNDA_ENGINE,assignee=assignee,process_definition_key=setup.power_proc_key)tasktur = getlist()
代码实现在 utils.camunda_func.get_list_application_imformation2
任务完成
from pycamunda import taskresolve = task.Resolve(url=URL_CAMUNDA_ENGINE, id_=taskid)
var_label = item.get("label")
var = imformation_dic.get(var_label)
print(item,"#*"*30)
#status_judge(proc_inst_id,item,reason)
resolve.add_variable(name=var.get("id"), value=item.get("value"),
type_=var.get("type"),
value_info={})
resolve()
在代码中实现在utils.camunda_func.task_complete_variable
任务用户的委任
from pycamunda import task
set_assignee = task.SetAssignee(url=url,id_=task_id,user_id=user_name)
set_assignee()
代码实现在utils.set_assignee.user_task_set_assignee_complete函数以及该文件内其他函数中实现
5.定时设置任务实现
获取任务的执行部门的实现方式
代码位置:utils.camunda_func
def get_task_list_key(department,duty):getlist = task.GetList(url=URL_CAMUNDA_ENGINE,process_definition_key=setup.test_proc_key)tasktur = getlist()# print(tasktur)# print(duty)for item in tasktur:task_id = item.id_task_assignee = item.assigneeif task_assignee == department:mm = task.SetAssignee(url=URL_CAMUNDA_ENGINE,id_=task_id,user_id=duty)mm()
代码位置:script.set_assignee
namelist = ["生产管理部"]
url = "http://192.168.8.99:9000/department/monitor/"def set_assignee(namelist):for name in namelist:# 查看生产管理部当班人员是谁post_json = {"department": name}response_json = request_post(url, post_json)#print(response_json)monitor = response_json.get("账号")# 查看哪些任务的当前委任者为生产管理部# 修改任务的委任者为当值人员get_task_list_key(name, monitor)