背景
目前在职的公司,维护着Spring Cloud分布式微服务项目有25+个。其中有10个左右微服务都写有定时任务逻辑,采用Spring @Scheduled这种方式。
Spring @Scheduled定时任务的缺点:
- 不支持集群:为避免重复执行,需引入分布式锁
- 死板不灵活:不支持手动执行,单次执行,补偿执行,修改任务参数,暂停任务,删除任务,修改调度时间,失败重试
- 无报警机制:任务失败之后没有报警机制,逻辑执行异常记录ERROR日志接入Prometheus告警这种方式不算,这算是日志层面的告警,而不是任务层面的告警机制
- 不支持分片任务:处理有序数据时,多机器分片执行任务处理不同数据
- ……
基于此,考虑引入轻量级分布式定时调度框架XXL-JOB,即把定时任务迁到XXL-JOB平台。
关于XXL-JOB,可参考之前的blog。
设计方案
考虑到我们有10+个SC分布式应用,30+个定时任务。如果每个应用都需要迁移改造的话,则每个应用都需要配置XXL-JOB相关的信息。当然,这可以通过Apollo namespace共享继承机制来实现。题外话:有空的话,后面会写一篇Apollo namespace配置继承的blog。
也就是说,我可以在一个应用里(一个应用对应着一个Apollo namespace)的Apollo里维护好XXL-JOB的配置信息,其他应用通过复用此应用(的Apollo)来实现配置复用。
但是每个应用还得新增一个配置类,配置类怎么实现复用呢?这也能解决。解决方案就是在commons组件库里维护配置类(需要引入Spring @Configuration注解,即引入spring-context
依赖包),然后每个应用的Spring Boot启动类里需要扫描到此配置类。
还得改造一下30+个定时任务对应的30+个@@Component定时任务类,所有的定时任务应用都需要引入maven依赖。
还得手动在XXL-JOB里新增定时任务类。
看起来还不错的方案,但是不排除不同的应用有同名的配置,遇到同名的配置,则需要修改配置命名。Spring Boot启动类改造可能会带来未知的问题。
最后的最后,考虑到我们所有的应用都需要经过Gateway网关服务来转发,不管是对内的应用,还是对外的应用,对外的应用有包括C端,B端,和第三方客户。故而有下面的最终方案。
实现方案
在对内的网关应用里,引入maven依赖:
<dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.4.0</version>
</dependency>
新增如下XXL-JOB配置类:
@Slf4j
@Configuration
public class XxlJobConfig {@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.executor.appname}")private String appName;@Value("${xxl.job.executor.port:9999}")private int port;@Value("${xxl.job.accessToken:default_token}")private String accessToken;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {log.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor executor = new XxlJobSpringExecutor();executor.setAdminAddresses(adminAddresses);executor.setAppname(appName);executor.setPort(port);executor.setAccessToken(accessToken);return executor;}
}
对应的,需要在Apollo里新增如下配置。其中有些配置是固定不变的,可以放在本地配置文件里;未来有可能变化的,放在Apollo里。
这里的appname实际上就是XXL-JOB的执行器:
gateway服务是以pod形式运行在k8s集群里,不言而喻,采用自动注册这种方式。
网关服务里新增定时任务解析,请求转发配置类:
@Slf4j
@Component
public class XxlJobLogicConfig {private static final String URL = "url:";private static final String METHOD = "method:";private static final String DATA = "data:";private static final String GET = "GET";private static final String POST = "POST";@XxlJob("httpJobHandler")public void httpJobHandler() {// 参数解析及校验String jobParam = XxlJobHelper.getJobParam();if (StringUtils.isBlank(jobParam)) {XxlJobHelper.log("param[" + jobParam + "] invalid");XxlJobHelper.handleFail();return;}String[] httpParams = jobParam.split("\n");String url = "";String method = "";String data = "null";for (String httpParam : httpParams) {if (httpParam.startsWith(URL)) {url = httpParam.substring(httpParam.indexOf(URL) + URL.length()).trim();}if (httpParam.startsWith(METHOD)) {method = httpParam.substring(httpParam.indexOf(METHOD) + METHOD.length()).trim().toUpperCase();}if (httpParam.startsWith(DATA)) {data = httpParam.substring(httpParam.indexOf(DATA) + DATA.length()).trim();}}if (StringUtils.isBlank(url)) {XxlJobHelper.log("url[" + url + "] invalid");XxlJobHelper.handleFail();return;}if (!GET.equals(method) && !POST.equals(method)) {XxlJobHelper.log("method[" + method + "] invalid");XxlJobHelper.handleFail();return;}log.info("xxlJob调度请求url={},请求method={},请求数据data={}", url, method, data);// 判断是否为POST请求boolean isPostMethod = POST.equals(method);HttpURLConnection connection = null;BufferedReader bufferedReader = null;try {URL realUrl = new URL(url);connection = (HttpURLConnection) realUrl.openConnection();// 设置具体的方法,也就是具体的定时任务connection.setRequestMethod(method);// POST请求需要outputconnection.setDoOutput(isPostMethod);connection.setDoInput(true);connection.setUseCaches(false);connection.setReadTimeout(900 * 1000);connection.setConnectTimeout(600 * 1000);// connection:Keep-Alive 表示在一次http请求中,服务器进行响应后,不再直接断开TCP连接,而是将TCP连接维持一段时间。// 在这段时间内,如果同一客户端再次向服务端发起http请求,便可以复用此TCP连接,向服务端发起请求。connection.setRequestProperty("connection", "keep_alive");// Content-Type 表示客户端向服务端发送的数据的媒体类型(MIME类型)connection.setRequestProperty("content-type", "application/json;charset=UTF-8");// Accept-Charset 表示客户端希望服务端返回的数据的媒体类型(MIME类型)connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");// gateway请求转发到其他应用connection.connect();// 如果是POST请求,则判断定时任务是否含有执行参数if (isPostMethod && StringUtils.isNotBlank(data)) {DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());// 写参数dataOutputStream.write(data.getBytes(Charset.defaultCharset()));dataOutputStream.flush();dataOutputStream.close();}int responseCode = connection.getResponseCode();// 判断请求转发、定时任务触发是否成功if (responseCode != 200) {throw new RuntimeException("Http Request StatusCode(" + responseCode + ") Invalid");}bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), Charset.defaultCharset()));StringBuilder stringBuilder = new StringBuilder();String line;while ((line = bufferedReader.readLine()) != null) {stringBuilder.append(line);}String responseMsg = stringBuilder.toString();log.info("xxlJob调度执行返回数据={}", responseMsg);XxlJobHelper.log(responseMsg);} catch (Exception e) {XxlJobHelper.log(e);XxlJobHelper.handleFail();} finally {try {if (bufferedReader != null) {bufferedReader.close();}if (connection != null) {connection.disconnect();}} catch (Exception e) {XxlJobHelper.log(e);}}}
}
稍微有点麻烦的是,每个Spring Cloud应用都需要手动新增一个ScheduleController:
/*** 定时任务入口,所有服务的@RequestMapping满足/schedule/appName这种格式,方便统一管理**/
@RestController
@RequestMapping("/schedule/search")
public class ScheduleController {@Resourceprivate ChineseEnglishStoreSchedule chineseEnglishStoreSchedule;@GetMapping("/chineseEnglishStoreSchedule")public Response<Boolean> chineseEnglishStoreSchedule() {chineseEnglishStoreSchedule.execute();return Response.success(true);}
}
另外,需要在gateway网关服务里新增路由转发规则:
每个有定时任务,且准备接入XXL-JOB平台的SC微服务,都需要新增类似上面截图里的4条配置信息。
优点:所有带有定时任务的服务一目了然,方便统一维护和管理。
这种方案无需改造具体的某个Schedule类:
@JobHander(value = "autoJobHandler")
public class AutoJobHandler extends IJobHandler {@Overridepublic ReturnT<String> execute(String... params) {try {// 既有的业务逻辑// 执行成功return ReturnT.SUCCESS;} catch (Exception e) {logger.error("execute error id:{}, error info:{}", id, e);return ReturnT.FAIL;}return ReturnT.SUCCESS;}
}
最后都省却不了的一个步骤,在XXL-JOB admin管理平台新增一个个任务:
验证
任务调度的执行日志:
ELK日志查询平台里也可以搜索到逻辑代码里打印的日志。