背景
需求将MongoDB数据入仓MaxCompute
环境说明
MongoDB
100+个Collections:orders_1、orders_2、…、orders_100
前期准备
1、MongoDB数据源配置
需要先保证DW和MongoDB网络是能够联通的,需要现在集成任务中配置MongoDB的数据源信息。
具体可以查看我的另外一篇:
2、赋值节点
选择赋值节点,赋值节点新增后打开,可以看到有Python、shell、ODPS SQL
Python 读取最后一次Print字符串,Shell读取最后一次echo输出的字符串,如”orders_1,order_2“ 就按照”,“逗号被拆分成2个元素用于后续循环
ODPS SQL 则是每一行是遍历的一个元素
每一次循环都会传入遍历的元素,如python :
print "orders_1,orders_2";
则会当做[“orders_1”,“orders_2”]数组进行遍历,每次一个元素会传入到遍历的循环中执行
实操界面:
print "orders_1,orders_2";
赋值节点会自动出现一个outputs给后面的节点读取
3、循环任务
新增完毕后进入到循环内部,会看到一个start 和end节点,这个时候我们再选择一个离线同步任务,将流程串起来
点开离线集成任务,切换到离线集成任务的脚本模式,赋值节点的collectionName会以”${dag.foreach.current}“ 参数传入到循环内部的流程中。
在集成任务脚本中,将对应的collectionName替换为 ${dag.foreach.current} 即可
{"transform": false,"type": "job","version": "2.0","steps": [{"stepType": "mongodb","parameter": {"objectIdOutputType": "json","useSplitVector": false,"datasource": "你的mongodb数据源名称","envType": 1,"cursorTimeoutInMs": "3600000","column": [{"name": "col_combine","type": "combine"}],"tableComment": "This kind of datasource dosen't support get table comment. This is a comment produced by di.","batchSize": "1000","collectionName": "${dag.foreach.current}"},"name": "Reader","category": "reader"},{"stepType": "odps","parameter": {"partition": "col=${dag.foreach.current}","truncate": true,"datasource": "你输出数据表的MaxCompute空间名称","envType": 1,"isSupportThreeModel": false,"tunnelQuota": "default","column": ["你的ODPS表的字段,因为我这里是想要将所有数据放在一个字段,所以这里就只预留了一个字段"],"emptyAsNull": false,"tableComment": "","table": "你的ODPS表","consistencyCommit": false},"name": "Writer","category": "writer"},{"copies": 1,"parameter": {"nodes": [],"edges": [],"groups": [],"version": "2.0"},"name": "Processor","category": "processor"}],"setting": {"errorLimit": {"record": "0"},"locale": "zh_CN","speed": {"throttle": false,"concurrent": 1}},"order": {"hops": [{"from": "Reader","to": "Writer"}]}
}
整个循环流程,点击右侧打开配置进行相关调度配置,最下方需要配置节点上下文 loopDataArray这个参数是读取外部的赋值节点,是必须配置的参数
日志
循环节点无法在dataworks的开发界面直接运营进行测试,只能发布以后在运维中心进行查看
最终效果
后期拓展
这里因为业务需求所以没有循环的参数是通过python print写死输出的
优雅一些的方式就是通过数据表维护,就可以动态读取数据表的内容,然后作为循环参数传入了
相关文档
for-each节点由哪些组成,应用逻辑是什么_大数据开发治理平台 DataWorks(DataWorks)-阿里云帮助中心. (2021, August 18). Aliyun.com. https://help.aliyun.com/zh/dataworks/user-guide/logic-of-for-each-nodes?spm=a2c4g.11186623.4.5.20a4d43aNd6b0E&scm=20140722.H_299261._.ID_299261-OR_rec-V_1#section-50c-r2v-mhd
赋值节点的操作步骤_大数据开发治理平台 DataWorks(DataWorks)-阿里云帮助中心. (2019, September 10). Aliyun.com. https://help.aliyun.com/zh/dataworks/user-guide/configure-an-assignment-node?spm=a2c4g.11186623.0.0.2947b24b0wmXD7#task-2485378
for-each节点由哪些组成,应用逻辑是什么_大数据开发治理平台 DataWorks(DataWorks)-阿里云帮助中心. (2021, August 18). Aliyun.com. https://help.aliyun.com/zh/dataworks/user-guide/logic-of-for-each-nodes?spm=a2c4g.11186623.0.0.45634a14sGs7jS