DataX的python3使用

datax这东西本身是python2写的,这导致python3,就各种语法报错,问题是,现在的工程都是python3搞的,这就很难受....

网上找到一篇帖子,可以解决这个问题:

原帖:python3执行datax报错问题解决办法 - 卷心菜的奇妙历险 - 博客园

怕回头原帖看不了了,再记录一次,这,很无奈.....之前一些帖子,突然就看不了了....

解决办法

路径 datax/bin/  下

需要替换的三个文件,注意备份:

1.datax.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-import sys
import os
import signal
import subprocess
import time
import re
import socket
import json
from optparse import OptionParser
from optparse import OptionGroup
from string import Template
import codecs
import platformdef isWindows():return platform.system() == 'Windows'DATAX_HOME = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))DATAX_VERSION = 'DATAX-OPENSOURCE-3.0'
if isWindows():codecs.register(lambda name: name == 'cp65001' and codecs.lookup('utf-8') or None)CLASS_PATH = ("%s/lib/*") % (DATAX_HOME)
else:CLASS_PATH = ("%s/lib/*:.") % (DATAX_HOME)
LOGBACK_FILE = ("%s/conf/logback.xml") % (DATAX_HOME)
DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s" % (DATAX_HOME, LOGBACK_FILE)
ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s  ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (DEFAULT_PROPERTY_CONF, CLASS_PATH)
REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"RET_STATE = {"KILL": 143,"FAIL": -1,"OK": 0,"RUN": 1,"RETRY": 2
}def getLocalIp():try:return socket.gethostbyname(socket.getfqdn(socket.gethostname()))except:return "Unknown"def suicide(signum, e):global child_processprint >> sys.stderr, "[Error] DataX receive unexpected signal %d, starts to suicide." % (signum)if child_process:child_process.send_signal(signal.SIGQUIT)time.sleep(1)child_process.kill()print >> sys.stderr, "DataX Process was killed ! you did ?"sys.exit(RET_STATE["KILL"])def register_signal():if not isWindows():global child_processsignal.signal(2, suicide)signal.signal(3, suicide)signal.signal(15, suicide)def getOptionParser():usage = "usage: %prog [options] job-url-or-path"parser = OptionParser(usage=usage)prodEnvOptionGroup = OptionGroup(parser, "Product Env Options","Normal user use these options to set jvm parameters, job runtime mode etc. ""Make sure these options can be used in Product Env.")prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",default=DEFAULT_JVM, help="Set jvm parameters if necessary.")prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",help="Set job unique id when running by Distribute/Local Mode.")prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",action="store", default="standalone",help="Set job runtime mode such as: standalone, local, distribute. ""Default mode is standalone.")prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",action="store", dest="params",help='Set job parameter, eg: the source tableName you want to set it by command, ''then you can use like this: -p"-DtableName=your-table-name", ''if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".''Note: you should config in you job tableName with ${tableName}.')prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",action="store", dest="reader",type="string",help='View job config[reader] template, eg: mysqlreader,streamreader')prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",action="store", dest="writer",type="string",help='View job config[writer] template, eg: mysqlwriter,streamwriter')parser.add_option_group(prodEnvOptionGroup)devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options","Developer use these options to trace more details of DataX.")devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",help="Set to remote debug mode.")devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",default="info", help="Set log level such as: debug, info, all etc.")parser.add_option_group(devEnvOptionGroup)return parserdef generateJobConfigTemplate(reader, writer):readerRef = "Please refer to the %s document:\n     https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n" % (reader,reader,reader)writerRef = "Please refer to the %s document:\n     https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n " % (writer,writer,writer)print(readerRef)print(writerRef)jobGuid = 'Please save the following configuration as a json file and  use\n     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json \nto run the job.\n'print(jobGuid)jobTemplate={"job": {"setting": {"speed": {"channel": ""}},"content": [{"reader": {},"writer": {}}]}}readerTemplatePath = "%s/plugin/reader/%s/plugin_job_template.json" % (DATAX_HOME,reader)writerTemplatePath = "%s/plugin/writer/%s/plugin_job_template.json" % (DATAX_HOME,writer)try:readerPar = readPluginTemplate(readerTemplatePath);except Exception as e:print("Read reader[%s] template error: can\'t find file %s" % (reader,readerTemplatePath))try:writerPar = readPluginTemplate(writerTemplatePath);except Exception as e:print("Read writer[%s] template error: : can\'t find file %s" % (writer,writerTemplatePath))jobTemplate['job']['content'][0]['reader'] = readerPar;jobTemplate['job']['content'][0]['writer'] = writerPar;print(json.dumps(jobTemplate, indent=4, sort_keys=True))def readPluginTemplate(plugin):with open(plugin, 'r') as f:return json.load(f)def isUrl(path):if not path:return Falseassert (isinstance(path, str))m = re.match(r"^http[s]?://\S+\w*", path.lower())if m:return Trueelse:return Falsedef buildStartCommand(options, args):commandMap = {}tempJVMCommand = DEFAULT_JVMif options.jvmParameters:tempJVMCommand = tempJVMCommand + " " + options.jvmParametersif options.remoteDebug:tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIGprint('local ip: ', getLocalIp())if options.loglevel:tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))if options.mode:commandMap["mode"] = options.mode# jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)jobResource = args[0]if not isUrl(jobResource):jobResource = os.path.abspath(jobResource)if jobResource.lower().startswith("file://"):jobResource = jobResource[len("file://"):]jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))if options.params:jobParams = jobParams + " " + options.paramsif options.jobid:commandMap["jobid"] = options.jobidcommandMap["jvm"] = tempJVMCommandcommandMap["params"] = jobParamscommandMap["job"] = jobResourcereturn Template(ENGINE_COMMAND).substitute(**commandMap)def printCopyright():print('''
DataX (%s), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
''' % DATAX_VERSION)sys.stdout.flush()if __name__ == "__main__":printCopyright()parser = getOptionParser()options, args = parser.parse_args(sys.argv[1:])if options.reader is not None and options.writer is not None:generateJobConfigTemplate(options.reader,options.writer)sys.exit(RET_STATE['OK'])if len(args) != 1:parser.print_help()sys.exit(RET_STATE['FAIL'])startCommand = buildStartCommand(options, args)# print startCommandchild_process = subprocess.Popen(startCommand, shell=True)register_signal()(stdout, stderr) = child_process.communicate()sys.exit(child_process.returncode)

2、dxprof.py

#! /usr/bin/env python
# vim: set expandtab tabstop=4 shiftwidth=4 foldmethod=marker nu:import re
import sys
import timeREG_SQL_WAKE = re.compile(r'Begin\s+to\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
REG_SQL_DONE = re.compile(r'Finished\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
REG_SQL_PATH = re.compile(r'from\s+(\w+)(\s+where|\s*$)', re.IGNORECASE)
REG_SQL_JDBC = re.compile(r'jdbcUrl:\s*\[(.+?)\]', re.IGNORECASE)
REG_SQL_UUID = re.compile(r'(\d+\-)+reader')
REG_COMMIT_UUID = re.compile(r'(\d+\-)+writer')
REG_COMMIT_WAKE = re.compile(r'begin\s+to\s+commit\s+blocks', re.IGNORECASE)
REG_COMMIT_DONE = re.compile(r'commit\s+blocks\s+ok', re.IGNORECASE)# {{{ function parse_timestamp() #
def parse_timestamp(line):try:ts = int(time.mktime(time.strptime(line[0:19], '%Y-%m-%d %H:%M:%S')))except:ts = 0return ts# }}} ## {{{ function parse_query_host() #
def parse_query_host(line):ori = REG_SQL_JDBC.search(line)if (not ori):return ''ori = ori.group(1).split('?')[0]off = ori.find('@')if (off > -1):ori = ori[off+1:len(ori)]else:off = ori.find('//')if (off > -1):ori = ori[off+2:len(ori)]return ori.lower()
# }}} ## {{{ function parse_query_table() #
def parse_query_table(line):ori = REG_SQL_PATH.search(line)return (ori and ori.group(1).lower()) or ''
# }}} ## {{{ function parse_reader_task() #
def parse_task(fname):global LAST_SQL_UUIDglobal LAST_COMMIT_UUIDglobal DATAX_JOBDICTglobal DATAX_JOBDICT_COMMITglobal UNIXTIMELAST_SQL_UUID = ''DATAX_JOBDICT = {}LAST_COMMIT_UUID = ''DATAX_JOBDICT_COMMIT = {}UNIXTIME = int(time.time())with open(fname, 'r') as f:for line in f.readlines():line = line.strip()if (LAST_SQL_UUID and (LAST_SQL_UUID in DATAX_JOBDICT)):DATAX_JOBDICT[LAST_SQL_UUID]['host'] = parse_query_host(line)LAST_SQL_UUID = ''if line.find('CommonRdbmsReader$Task') > 0:parse_read_task(line)elif line.find('commit blocks') > 0:parse_write_task(line)else:continue
# }}} ## {{{ function parse_read_task() #
def parse_read_task(line):ser = REG_SQL_UUID.search(line)if not ser:returnLAST_SQL_UUID = ser.group()if REG_SQL_WAKE.search(line):DATAX_JOBDICT[LAST_SQL_UUID] = {'stat' : 'R','wake' : parse_timestamp(line),'done' : UNIXTIME,'host' : parse_query_host(line),'path' : parse_query_table(line)}elif ((LAST_SQL_UUID in DATAX_JOBDICT) and REG_SQL_DONE.search(line)):DATAX_JOBDICT[LAST_SQL_UUID]['stat'] = 'D'DATAX_JOBDICT[LAST_SQL_UUID]['done'] = parse_timestamp(line)
# }}} ## {{{ function parse_write_task() #
def parse_write_task(line):ser = REG_COMMIT_UUID.search(line)if not ser:returnLAST_COMMIT_UUID = ser.group()if REG_COMMIT_WAKE.search(line):DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID] = {'stat' : 'R','wake' : parse_timestamp(line),'done' : UNIXTIME,}elif ((LAST_COMMIT_UUID in DATAX_JOBDICT_COMMIT) and REG_COMMIT_DONE.search(line)):DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['stat'] = 'D'DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['done'] = parse_timestamp(line)
# }}} ## {{{ function result_analyse() #
def result_analyse():def compare(a, b):return b['cost'] - a['cost']tasklist = []hostsmap = {}statvars = {'sum' : 0, 'cnt' : 0, 'svr' : 0, 'max' : 0, 'min' : int(time.time())}tasklist_commit = []statvars_commit = {'sum' : 0, 'cnt' : 0}for idx in DATAX_JOBDICT:item = DATAX_JOBDICT[idx]item['uuid'] = idx;item['cost'] = item['done'] - item['wake']tasklist.append(item);if (not (item['host'] in hostsmap)):hostsmap[item['host']] = 1statvars['svr'] += 1if (item['cost'] > -1 and item['cost'] < 864000):statvars['sum'] += item['cost']statvars['cnt'] += 1statvars['max'] = max(statvars['max'], item['done'])statvars['min'] = min(statvars['min'], item['wake'])for idx in DATAX_JOBDICT_COMMIT:itemc = DATAX_JOBDICT_COMMIT[idx]itemc['uuid'] = idxitemc['cost'] = itemc['done'] - itemc['wake']tasklist_commit.append(itemc)if (itemc['cost'] > -1 and itemc['cost'] < 864000):statvars_commit['sum'] += itemc['cost']statvars_commit['cnt'] += 1ttl = (statvars['max'] - statvars['min']) or 1idx = float(statvars['cnt']) / (statvars['sum'] or ttl)tasklist.sort(compare)for item in tasklist:print('%s\t%s.%s\t%s\t%s\t% 4d\t% 2.1f%%\t% .2f' %(item['stat'], item['host'], item['path'],time.strftime('%H:%M:%S', time.localtime(item['wake'])),(('D' == item['stat']) and time.strftime('%H:%M:%S', time.localtime(item['done']))) or '--',item['cost'], 100 * item['cost'] / ttl, idx * item['cost']))if (not len(tasklist) or not statvars['cnt']):returnprint('\n--- DataX Profiling Statistics ---')print('%d task(s) on %d server(s), Total elapsed %d second(s), %.2f second(s) per task in average' %(statvars['cnt'],statvars['svr'], statvars['sum'], float(statvars['sum']) / statvars['cnt']))print('Actually cost %d second(s) (%s - %s), task concurrency: %.2f, tilt index: %.2f' %(ttl,time.strftime('%H:%M:%S', time.localtime(statvars['min'])),time.strftime('%H:%M:%S', time.localtime(statvars['max'])),float(statvars['sum']) / ttl, idx * tasklist[0]['cost']))idx_commit = float(statvars_commit['cnt']) / (statvars_commit['sum'] or ttl)tasklist_commit.sort(compare)print('%d task(s) done odps comit, Total elapsed %d second(s), %.2f second(s) per task in average, tilt index: %.2f' % (statvars_commit['cnt'],statvars_commit['sum'], float(statvars_commit['sum']) / statvars_commit['cnt'],idx_commit * tasklist_commit[0]['cost']))# }}} #if (len(sys.argv) < 2):print("Usage: %s filename" %(sys.argv[0]))quit(1)
else:parse_task(sys.argv[1])result_analyse()

3、perftrace.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-"""Life's short, Python more.
"""import re
import os
import sys
import json
import uuid
import signal
import time
import subprocess
from optparse import OptionParser
reload(sys)
sys.setdefaultencoding('utf8')##begin cli & help logic
def getOptionParser():usage = getUsage()parser = OptionParser(usage = usage)#rdbms reader and writerparser.add_option('-r', '--reader', action='store', dest='reader', help='trace datasource read performance with specified !json! string')parser.add_option('-w', '--writer', action='store', dest='writer', help='trace datasource write performance with specified !json! string')parser.add_option('-c', '--channel',  action='store', dest='channel', default='1', help='the number of concurrent sync thread, the default is 1')parser.add_option('-f', '--file',   action='store', help='existing datax configuration file, include reader and writer params')parser.add_option('-t', '--type',   action='store', default='reader', help='trace which side\'s performance, cooperate with -f --file params, need to be reader or writer')parser.add_option('-d', '--delete',   action='store', default='true', help='delete temporary files, the default value is true')#parser.add_option('-h', '--help',   action='store', default='true', help='print usage information')return parserdef getUsage():return '''
The following params are available for -r --reader:[these params is for rdbms reader, used to trace rdbms read performance, it's like datax's key]*datasourceType: datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2 etc...*jdbcUrl:        datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database*username:       username for datasource*password:       password for datasource*table:          table name for read datacolumn:          column to be read, the default value is ['*']splitPk:         the splitPk column of rdbms tablewhere:           limit the scope of the performance data setfetchSize:       how many rows to be fetched at each communicate[these params is for stream reader, used to trace rdbms write performance]reader-sliceRecordCount:  how man test data to mock(each channel), the default value is 10000reader-column          :  stream reader while generate test data(type supports: string|long|date|double|bool|bytes; support constant value and random function),demo: [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]
The following params are available for -w --writer:[these params is for rdbms writer, used to trace rdbms write performance, it's like datax's key]datasourceType:  datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2|ads etc...*jdbcUrl:        datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database*username:       username for datasource*password:       password for datasource*table:          table name for write datacolumn:          column to be writed, the default value is ['*']batchSize:       how many rows to be storeed at each communicate, the default value is 512preSql:          prepare sql to be executed before write data, the default value is ''postSql:         post sql to be executed end of write data, the default value is ''url:             required for ads, pattern is ip:portschme:           required for ads, ads database name[these params is for stream writer, used to trace rdbms read performance]writer-print:           true means print data read from source datasource, the default value is false
The following params are available global control:-c --channel:    the number of concurrent tasks, the default value is 1-f --file:       existing completely dataX configuration file path-t --type:       test read or write performance for a datasource, couble be reader or writer, in collaboration with -f --file-h --help:       print help message
some demo:
perftrace.py --channel=10 --reader='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "where":"", "splitPk":"", "writer-print":"false"}'
perftrace.py --channel=10 --writer='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "reader-sliceRecordCount": "10000", "reader-column": [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]}'
perftrace.py --file=/tmp/datax.job.json --type=reader --reader='{"writer-print": "false"}'
perftrace.py --file=/tmp/datax.job.json --type=writer --writer='{"reader-sliceRecordCount": "10000", "reader-column": [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]}'
some example jdbc url pattern, may help:
jdbc:oracle:thin:@ip:port:database
jdbc:mysql://ip:port/database
jdbc:sqlserver://ip:port;DatabaseName=database
jdbc:postgresql://ip:port/database
warn: ads url pattern is ip:port
warn: test write performance will write data into your table, you can use a temporary table just for test.
'''def printCopyright():DATAX_VERSION = 'UNKNOWN_DATAX_VERSION'print('''
DataX Util Tools (%s), From Alibaba !
Copyright (C) 2010-2016, Alibaba Group. All Rights Reserved.''' % DATAX_VERSION)sys.stdout.flush()def yesNoChoice():yes = set(['yes','y', 'ye', ''])no = set(['no','n'])choice = raw_input().lower()if choice in yes:return Trueelif choice in no:return Falseelse:sys.stdout.write("Please respond with 'yes' or 'no'")
##end cli & help logic##begin process logic
def suicide(signum, e):global childProcessprint >> sys.stderr, "[Error] Receive unexpected signal %d, starts to suicide." % (signum)if childProcess:childProcess.send_signal(signal.SIGQUIT)time.sleep(1)childProcess.kill()print >> sys.stderr, "DataX Process was killed ! you did ?"sys.exit(-1)def registerSignal():global childProcesssignal.signal(2, suicide)signal.signal(3, suicide)signal.signal(15, suicide)def fork(command, isShell=False):global childProcesschildProcess = subprocess.Popen(command, shell = isShell)registerSignal()(stdout, stderr) = childProcess.communicate()#阻塞直到子进程结束childProcess.wait()return childProcess.returncode
##end process logic##begin datax json generate logic
#warn: if not '': -> true;   if not None: -> true
def notNone(obj, context):if not obj:raise Exception("Configuration property [%s] could not be blank!" % (context))def attributeNotNone(obj, attributes):for key in attributes:notNone(obj.get(key), key)def isBlank(value):if value is None or len(value.strip()) == 0:return Truereturn Falsedef parsePluginName(jdbcUrl, pluginType):import re#warn: drdsname = 'pluginName'mysqlRegex = re.compile('jdbc:(mysql)://.*')if (mysqlRegex.match(jdbcUrl)):name = 'mysql'postgresqlRegex = re.compile('jdbc:(postgresql)://.*')if (postgresqlRegex.match(jdbcUrl)):name = 'postgresql'oracleRegex = re.compile('jdbc:(oracle):.*')if (oracleRegex.match(jdbcUrl)):name = 'oracle'sqlserverRegex = re.compile('jdbc:(sqlserver)://.*')if (sqlserverRegex.match(jdbcUrl)):name = 'sqlserver'db2Regex = re.compile('jdbc:(db2)://.*')if (db2Regex.match(jdbcUrl)):name = 'db2'return "%s%s" % (name, pluginType)def renderDataXJson(paramsDict, readerOrWriter = 'reader', channel = 1):dataxTemplate = {"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "","parameter": {"username": "","password": "","sliceRecordCount": "10000","column": ["*"],"connection": [{"table": [],"jdbcUrl": []}]}},"writer": {"name": "","parameter": {"print": "false","connection": [{"table": [],"jdbcUrl": ''}]}}}]}}dataxTemplate['job']['setting']['speed']['channel'] = channeldataxTemplateContent = dataxTemplate['job']['content'][0]pluginName = ''if paramsDict.get('datasourceType'):pluginName = '%s%s' % (paramsDict['datasourceType'], readerOrWriter)elif paramsDict.get('jdbcUrl'):pluginName = parsePluginName(paramsDict['jdbcUrl'], readerOrWriter)elif paramsDict.get('url'):pluginName = 'adswriter'theOtherSide = 'writer' if readerOrWriter == 'reader' else 'reader'dataxPluginParamsContent = dataxTemplateContent.get(readerOrWriter).get('parameter')dataxPluginParamsContent.update(paramsDict)dataxPluginParamsContentOtherSide = dataxTemplateContent.get(theOtherSide).get('parameter')if readerOrWriter == 'reader':dataxTemplateContent.get('reader')['name'] = pluginNamedataxTemplateContent.get('writer')['name'] = 'streamwriter'if paramsDict.get('writer-print'):dataxPluginParamsContentOtherSide['print'] = paramsDict['writer-print']del dataxPluginParamsContent['writer-print']del dataxPluginParamsContentOtherSide['connection']if readerOrWriter == 'writer':dataxTemplateContent.get('reader')['name'] = 'streamreader'dataxTemplateContent.get('writer')['name'] = pluginNameif paramsDict.get('reader-column'):dataxPluginParamsContentOtherSide['column'] = paramsDict['reader-column']del dataxPluginParamsContent['reader-column']if paramsDict.get('reader-sliceRecordCount'):dataxPluginParamsContentOtherSide['sliceRecordCount'] = paramsDict['reader-sliceRecordCount']del dataxPluginParamsContent['reader-sliceRecordCount']del dataxPluginParamsContentOtherSide['connection']if paramsDict.get('jdbcUrl'):if readerOrWriter == 'reader':dataxPluginParamsContent['connection'][0]['jdbcUrl'].append(paramsDict['jdbcUrl'])else:dataxPluginParamsContent['connection'][0]['jdbcUrl'] = paramsDict['jdbcUrl']if paramsDict.get('table'):dataxPluginParamsContent['connection'][0]['table'].append(paramsDict['table'])traceJobJson = json.dumps(dataxTemplate, indent = 4)return traceJobJsondef isUrl(path):if not path:return Falseif not isinstance(path, str):raise Exception('Configuration file path required for the string, you configure is:%s' % path)m = re.match(r"^http[s]?://\S+\w*", path.lower())if m:return Trueelse:return Falsedef readJobJsonFromLocal(jobConfigPath):jobConfigContent = NonejobConfigPath = os.path.abspath(jobConfigPath)file = open(jobConfigPath)try:jobConfigContent = file.read()finally:file.close()if not jobConfigContent:raise Exception("Your job configuration file read the result is empty, please check the configuration is legal, path: [%s]\nconfiguration:\n%s" % (jobConfigPath, str(jobConfigContent)))return jobConfigContentdef readJobJsonFromRemote(jobConfigPath):import urllibconn = urllib.urlopen(jobConfigPath)jobJson = conn.read()return jobJsondef parseJson(strConfig, context):try:return json.loads(strConfig)except Exception as e:import tracebacktraceback.print_exc()sys.stdout.flush()print >> sys.stderr, '%s %s need in line with json syntax' % (context, strConfig)sys.exit(-1)def convert(options, args):traceJobJson = ''if options.file:if isUrl(options.file):traceJobJson = readJobJsonFromRemote(options.file)else:traceJobJson = readJobJsonFromLocal(options.file)traceJobDict = parseJson(traceJobJson, '%s content' % options.file)attributeNotNone(traceJobDict, ['job'])attributeNotNone(traceJobDict['job'], ['content'])attributeNotNone(traceJobDict['job']['content'][0], ['reader', 'writer'])attributeNotNone(traceJobDict['job']['content'][0]['reader'], ['name', 'parameter'])attributeNotNone(traceJobDict['job']['content'][0]['writer'], ['name', 'parameter'])if options.type == 'reader':traceJobDict['job']['content'][0]['writer']['name'] = 'streamwriter'if options.reader:traceReaderDict = parseJson(options.reader, 'reader config')if traceReaderDict.get('writer-print') is not None:traceJobDict['job']['content'][0]['writer']['parameter']['print'] = traceReaderDict.get('writer-print')else:traceJobDict['job']['content'][0]['writer']['parameter']['print'] = 'false'else:traceJobDict['job']['content'][0]['writer']['parameter']['print'] = 'false'elif options.type == 'writer':traceJobDict['job']['content'][0]['reader']['name'] = 'streamreader'if options.writer:traceWriterDict = parseJson(options.writer, 'writer config')if traceWriterDict.get('reader-column'):traceJobDict['job']['content'][0]['reader']['parameter']['column'] = traceWriterDict['reader-column']if traceWriterDict.get('reader-sliceRecordCount'):traceJobDict['job']['content'][0]['reader']['parameter']['sliceRecordCount'] = traceWriterDict['reader-sliceRecordCount']else:columnSize = len(traceJobDict['job']['content'][0]['writer']['parameter']['column'])streamReaderColumn = []for i in range(columnSize):streamReaderColumn.append({"type": "long", "random": "2,10"})traceJobDict['job']['content'][0]['reader']['parameter']['column'] = streamReaderColumntraceJobDict['job']['content'][0]['reader']['parameter']['sliceRecordCount'] = 10000else:pass#do nothingreturn json.dumps(traceJobDict, indent = 4)elif options.reader:traceReaderDict = parseJson(options.reader, 'reader config')return renderDataXJson(traceReaderDict, 'reader', options.channel)elif options.writer:traceWriterDict = parseJson(options.writer, 'writer config')return renderDataXJson(traceWriterDict, 'writer', options.channel)else:print(getUsage())sys.exit(-1)#dataxParams = {}#for opt, value in options.__dict__.items():#    dataxParams[opt] = value
##end datax json generate logicif __name__ == "__main__":printCopyright()parser = getOptionParser()options, args = parser.parse_args(sys.argv[1:])#print options, argsdataxTraceJobJson = convert(options, args)#由MAC地址、当前时间戳、随机数生成,可以保证全球范围内的唯一性dataxJobPath = os.path.join(os.getcwd(), "perftrace-" + str(uuid.uuid1()))jobConfigOk = Trueif os.path.exists(dataxJobPath):print("file already exists, truncate and rewrite it? %s" % dataxJobPath)if yesNoChoice():jobConfigOk = Trueelse:print("exit failed, because of file conflict")sys.exit(-1)fileWriter = open(dataxJobPath, 'w')fileWriter.write(dataxTraceJobJson)fileWriter.close()print("trace environments:")print("dataxJobPath:  %s" % dataxJobPath)dataxHomePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))print("dataxHomePath: %s" % dataxHomePath)dataxCommand = "%s %s" % (os.path.join(dataxHomePath, "bin", "datax.py"), dataxJobPath)print("dataxCommand:  %s" % dataxCommand)returncode = fork(dataxCommand, True)if options.delete == 'true':os.remove(dataxJobPath)sys.exit(returncode)

ok,把这三个文件给换成python3版本,就可以正常使用了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/33903.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数组的介绍

1.数组的概念 数组是一组相同类型元素的集合&#xff0c;从这个描述中我们知道&#xff1a; 数组中存放1个或多个数据&#xff0c;但是数组的元素个数不为0。数组中存放的多个数据&#xff0c;类型是相同的。 数组分为一维数组和多维数组&#xff0c;多维数组一般比较多见的…

蓝桥杯 17110抓娃娃

问题描述 小明拿了 n 条线段练习抓娃娃。他将所有线段铺在数轴上&#xff0c;第 i 条线段的左端点在 li&#xff0c;右端点在 ri​。小明用 m 个区间去框这些线段&#xff0c;第 i个区间的范围是 [Li​, Ri​]。如果一个线段有 至少一半 的长度被包含在某个区间内&#xff0c;…

linux ptrace 图文详解(二) PTRACE_TRACEME 跟踪程序

目录 一、基础介绍 二、PTRACE_TRACE 实现原理 三、代码实现 四、总结 &#xff08;代码&#xff1a;linux 6.3.1&#xff0c;架构&#xff1a;arm64&#xff09; One look is worth a thousand words. —— Tess Flanders 一、基础介绍 GDB&#xff08;GNU Debugger&…

记录致远OA服务器硬盘升级过程

前言 日常使用中OA系统突然卡死&#xff0c;刷新访问进不去系统&#xff0c;ping服务器地址正常&#xff0c;立马登录服务器检查&#xff0c;一看磁盘爆了。 我大脑直接萎缩了&#xff0c;谁家OA系统配400G的空间啊&#xff0c;过我手的服务器没有50也是30台&#xff0c;还是…

电网电压暂态扰动机理与工业设备抗失压防护策略研究

什么是晃电&#xff1f; 国标GB/T 30137-2013 中定义:工频电压方均根值突然降至额定值的90%~10%&#xff0c;持续时间为10ms~1min后恢复正常的现象。Acrel8757V 晃电的原因 1.系统侧因素 短路故障&#xff1a;雷击、线路接地、设备误碰等导致电网短路&#xff0c;故障点电压…

Linux监控网络状态

一、基本介绍 1、基本语法 netstat [选项] 2、常用选项 选项 说明 -a 显示所有连接和监听的套接字&#xff08;包括TCP、UDP&#xff09;。 -t 显示 TCP 连接。 -u 显示 UDP 连接。 -l 显示正在监听的套接字&#xff08;server端&#xff09;。 -n 显示数字格式的…

UE5以插件的形式加载第三方库

之前在UE中加载第三方库的形式是以静态或者动态链接的形式加载但是不太容易复用。就想着能不能以插件的形式加载第三方库&#xff0c;这样直接把插件打包发行就可以复用了&#xff0c;之前也找过相应的教程但是很难找到比较简单易懂的教程&#xff0c;要么是比较复杂&#xff0…

Go执行当前package下的所有方法

需求&#xff1a;需要一个文件一个定时任务方法&#xff0c;当项目初始化完毕后&#xff0c;自动加载并执行这些定时任务方法 项目目录架构 main.go 初始化 package mainimport ("sql_demo/schedule" )func main() {/***** 其他初始化完毕后的操作**/// 定时任务sc…

AnyAnomaly: 基于大型视觉语言模型的零样本可定制视频异常检测

文章目录 速览摘要1. 引言2. 相关工作视频异常检测大型视觉语言模型&#xff08;LVLMs&#xff09; 3. 方法3.1. 总览3.2. 关键帧选择模块3.3. 上下文生成基于 WinCLIP 的注意力机制网格图像生成 3.4. 异常检测提示词设计异常评分 4. 实验4.1. 数据集4.2. 评估标准4.3. 结果4.4…

【AWS入门】2025 AWS亚马逊云科技账户注册指南

【AWS入门】2025 AWS亚马逊云科技账户注册指南 A Guide To Register a New account on AWS By JacksonML 0. AWS亚马逊云科技简介 Amazon Web Service(AWS) 即亚马逊云科技&#xff0c;其在全球Cloud Computing(云计算)市场占有最为重要的地位。 AWS连续13年被Gartner评为…

Spring 中 SmartInitializingSingleton 的作用和示例

一、 接口定义 SmartInitializingSingleton 是 Spring 框架提供的一个 单例 Bean 全局初始化回调接口&#xff0c;用于在 所有非延迟单例 Bean 初始化完成后 执行自定义逻辑。 核心方法&#xff1a; public interface SmartInitializingSingleton {void afterSingletonsInsta…

element tree树形结构默认展开全部

背景&#xff1a; el-tree树形结构&#xff0c;默认展开全部&#xff0c;使用属性default-expand-all【是否默认展开所有节点】&#xff1b;默认展开一级&#xff0c;设置default-expanded-keys【默认展开的节点的 key 的数组】属性值为数组。 因为我这里的数据第一级是四川【省…

大数据-spark3.5安装部署之local模式

spark&#xff0c;一个数据处理框架和计算引擎。 下载 local模式即本地模式&#xff0c;就是不需要任何其他节点资源就可以在本地执行spark代码的环境。用于练习演示。 上传解压 使用PortX将文件上传至/opt 进入/opt目录&#xff0c;创建目录module&#xff0c;解压文件至/o…

Discuz建站教程之论坛头部logo跳转链接怎么修改?

在修改头部logo跳转链接前&#xff0c;我们需要知道对应代码在哪个文件目录&#xff0c;进入宝塔或是服务器&#xff0c;找到文件&#xff1a;\template\default\common\header.htm&#xff0c;编辑器打开&#xff0c;搜索以下代码&#xff0c;大概在135行 <a href"{i…

【FreeRTOS】FreeRTOS操作系统在嵌入式单片机上裸机移植

目录 一 RTOS概述 二 FreeRTOS移植 三 FreeRTOS使用 四 附录 一 RTOS概述 先了解一些基础概念,以下内容摘自FreeRTOS官网(FreeRTOS™ - FreeRTOS™): 【1】RTOS基础知识 实时操作系统 (RTOS) 是一种体积小巧、确定性强的计算机操作系统。 RTOS 通常用于需要在严格时间限…

编译支持 RKmpp 和 RGA 的 ffmpeg 源码

一、前言 RK3588 支持VPU硬件解码&#xff0c;需要rkmpp进行调用&#xff1b;支持2D图像加速&#xff0c;需要 RGA 进行调用。 这两个库均能通过 ffmpeg-rockchip 进行间接调用&#xff0c;编译时需要开启对应的功能。 二、依赖安装 编译ffmpeg前需要编译 rkmpp 和 RGA&#xf…

深度学习基础:线性代数本质2——线性组合、张成的空间与基

目录 一、线性组合 1. 用一个有趣的角度看向量坐标 2. 如果我们选择不同的基向量会怎样&#xff1f; 3. 线性组合 4. 张成的空间 ① 二维向量的张成的空间 ② 三维向量的张成的空间​编辑 5.线性相关 6.线性无关 7. 基的定义 一、线性组合 1. 用一个有趣的角度看向量坐…

openharmony5.0中HDF驱动框架源码梳理-服务管理接口

要想大概了解一个公司&#xff0c;我们可能只需要知道它的运行逻辑即可&#xff0c;例如我们只需要知道它有财务有研发有运营等&#xff0c;财务报销、研发负责产品等即可&#xff0c;但是如果想深入具体的了解的话我们就要了解都有什么部门(对象)、各部门都包含哪些职责(对象方…

Go语言环境搭建并执行第一个Go程序

目录 一、Windows环境搭建 二、vscode安装插件 三、运行第一个go程序 一、Windows环境搭建 下载Go&#xff1a;All releases - The Go Programming Language 这里是Windows搭建&#xff0c;选择的是windows-amd64.msi&#xff0c;也可以选择zip直接解压缩到指定目录 选择msi…

Netty基础—4.NIO的使用简介一

大纲 1.Buffer缓冲区 2.Channel通道 3.BIO编程 4.伪异步IO编程 5.改造程序以支持长连接 6.NIO三大核心组件 7.NIO服务端的创建流程 8.NIO客户端的创建流程 9.NIO优点总结 10.NIO问题总结 1.Buffer缓冲区 (1)Buffer缓冲区的作用 (2)Buffer缓冲区的4个核心概念 (3)使…