ElasticJob
基本使用直接参考官网, 有中文. https://shardingsphere.apache.org/elasticjob/current/cn/overview/
ElasticJob 封装
基于3.0.0-beta
版本, 对 ElasticJob 进行封装, 结合 SpringBoot, 使用annotation即可使用.
核心代码注解解析
/**
* 解析es-job配置。要考虑在什么时机进行解析。在所有的spring bean都实例化完成之后
*
* @author DevLGQ
* @version 1.0
*/
public class ElasticJobCfgParser implements ApplicationListener<ApplicationReadyEvent> {
private static final Logger log = LoggerFactory.getLogger(ElasticJobCfgParser.class);
public static final String PROTOTYPE = "prototype";
private final CoordinatorRegistryCenter zookeeperRegistryCenter;
public ElasticJobCfgParser(CoordinatorRegistryCenter zookeeperRegistryCenter) {
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
ApplicationContext applicationContext = event.getApplicationContext();
Map<String, Object> beanMap = applicationContext.getBeansWithAnnotation(ElasticJobConfig.class);
beanMap.forEach((key, value) -> {
Class<?> clz = value.getClass();
// 如果类中有内部类
int outClzIndex = clz.getName().indexOf("$");
if (outClzIndex > 0) {
String clzName = clz.getName();
try {
clz = Class.forName(clzName.substring(0, outClzIndex));
} catch (ClassNotFoundException e) {
log.error("elastic job 启动异常, 系统强制退出", e);
System.exit(1);
}
}
// 获取配置项 ElasticJobConfig
ElasticJobConfig conf = clz.getAnnotation(ElasticJobConfig.class);
// 获取配置内容
String jobType = conf.jobType();
String jobName = conf.jobName();
String cron = conf.cron();
String jobParameter = conf.jobParameter();
String shardingItemParameters = conf.shardingItemParameters();
String description = conf.description();
String jobErrorHandlerType = conf.jobErrorHandlerType();
String jobExecutorServiceHandlerType = conf.jobExecutorServiceHandlerType();
String jobShardingStrategyType = conf.jobShardingStrategyType();
boolean failover = conf.failover();
boolean misfire = conf.misfire();
boolean overwrite = conf.overwrite();
boolean disabled = conf.disabled();
boolean monitorExecution = conf.monitorExecution();
int shardingTotalCount = conf.shardingTotalCount();
int maxTimeDiffSeconds = conf.maxTimeDiffSeconds();
int reconcileIntervalMinutes = conf.reconcileIntervalMinutes();
// 实例化 es_job 相关的通用配置
JobConfiguration.Builder jobCfgBuilder = JobConfiguration
.newBuilder(jobName, shardingTotalCount)
.shardingItemParameters(shardingItemParameters)
.description(description)
.failover(failover)
.jobParameter(jobParameter)
.misfire(misfire)
.cron(cron)
.overwrite(overwrite)
.maxTimeDiffSeconds(maxTimeDiffSeconds)
.reconcileIntervalMinutes(reconcileIntervalMinutes)
.disabled(disabled)
.monitorExecution(monitorExecution)
.jobShardingStrategyType(jobShardingStrategyType)
.jobErrorHandlerType(jobErrorHandlerType)
.jobExecutorServiceHandlerType(jobExecutorServiceHandlerType);
// 创建一个Spring的beanDefinition
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(ScheduleJobBootstrap.class);
// 多例
factory.setInitMethodName("schedule")
.setScope(PROTOTYPE);
// 1.添加注册中心
factory.addConstructorArgValue(this.zookeeperRegistryCenter);
// 2. 添加自己的真实的任务实现类, 判断是什么类型的
if (ElasticJobType.SCRIPT.equals(jobType)) {
// 脚本任务
boolean streamingProcess = conf.streamingProcess();
String scriptCommandLine = conf.scriptCommandLine();
jobCfgBuilder
.setProperty(DataflowJobProperties.STREAM_PROCESS_KEY, Boolean.toString(streamingProcess))
.setProperty(ScriptJobProperties.SCRIPT_KEY, scriptCommandLine);
factory.addConstructorArgValue(ElasticJobType.SCRIPT);
} else if (ElasticJobType.HTTP.equals(jobType)) {
// http任务
String httpUrl = conf.httpUrl();
String httpMethod = conf.httpMethod();
String httpData = conf.httpData();
String httpConnectTimeoutMilliseconds = conf.httpConnectTimeoutMilliseconds();
String httpReadTimeoutMilliseconds = conf.httpReadTimeoutMilliseconds();
String httpContentType = conf.httpContentType();
jobCfgBuilder
.setProperty(HttpJobProperties.URI_KEY, httpUrl)
.setProperty(HttpJobProperties.METHOD_KEY, httpMethod)
.setProperty(HttpJobProperties.DATA_KEY, httpData)
.setProperty(HttpJobProperties.CONNECT_TIMEOUT_KEY, httpConnectTimeoutMilliseconds)
.setProperty(HttpJobProperties.READ_TIMEOUT_KEY, httpReadTimeoutMilliseconds)
.setProperty(HttpJobProperties.CONTENT_TYPE_KEY, httpContentType);
factory.addConstructorArgValue(ElasticJobType.HTTP);
} else {
factory.addConstructorArgValue(value);
}
// 3. 添加配置
factory.addConstructorArgValue(jobCfgBuilder.build());
// 把 factory 也就是 ScheduleJobBootstrap 注入到 Spring 容器中
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();
String registerBeanName = conf.jobName() + ".ScheduleJobBootstrap";
defaultListableBeanFactory.registerBeanDefinition(registerBeanName, factory.getBeanDefinition());
ScheduleJobBootstrap scheduler = (ScheduleJobBootstrap) applicationContext.getBean(registerBeanName);
scheduler.schedule();
log.info("启动 elastic-job 作业:{}", jobName);
});
log.info("共启动作业数: {}", beanMap.values().size());
}
}
完整代码可以在我的Git仓库查看.
ElasticJob 管理后台安装
Zookeeper 安装
wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz
mv apache-zookeeper-3.6.2 /usr/local/zookeeper-3.6.2
cd /usr/local/zookeeper-3.6.2/conf
mv zoo_simple.cfg zoo.cfg
vim zoo.cfg
mkdir /etc/zookeeper/data -p
mkdir /etc/zookeeper/data/log -p
# 后台启动
./zkServer.sh start
# 前台启动
./zkServer.sh start-foreground
# 使用客户端连接 zookeeper
./zkCli.sh -server 192.168.123.26:2181
# 一些常用指令
# 查看节点状态
stat /
# 查看指定路径下的节点
ls /
# 获取节点信息
get /node
# 退出客户端
quit
旧版本编译使用 elastic-job-lite-console
# 使用 2.1.5 版本
# Caused by: java.lang.ClassNotFoundException: com.sun.tools.javac.code.TypeTags
# 修改 lombok 版本
# Caused by: java.io.IOException: The javadoc executable 'D:\Program Files\Java\jdk-11.0.9+11\..\bin\javadoc.exe' doesn't exist or is not a file. Verify the <javadocExecutable/> parameter.
# 关闭 doc 生成
mvn install -Dmaven.javadoc.skip=true -Dmaven.test.skip=true -X
# 编译后在 elastic-job-lite\elastic-job-lite-console\target 文件夹下面
# auth.properties 认证配置文件
# 默认端口 8899
添加注册中心,注意 namespace
的添加
出现数据库连接不上的情况,可能因为mysql8.0的原因,修改源码,把数据库驱动改为 com.mysql.cj.jdbc.Driver
.
新版本使用 shardingsphere-elasticjob-ui
拉取源码
git clone https://github.com.cnpmjs.org/apache/shardingsphere-elasticjob-ui.git
cd shardingsphere-elasticjob-ui
# 添加 Event Trace DataSource,两种方式
# 第一种 修改 shardingsphere-elasticjob-lite-ui/shardingsphere-elasticjob-lite-ui-backend/pom.xml 文件,添加 jdbc driver
# 第二种 在 ext-lib 目录下面添加 jdbc driver mysql-connector-java-8.0.11.jar
mvn clean package -Prelease
# 编译目标在 shardingsphere-elasticjob-ui/shardingsphere-elasticjob-ui-distribution/shardingsphere-elasticjob-lite-ui-bin-distribution/target/apache-shardingsphere-${latest.release.version}-shardingsphere-elasticjob-lite-ui-bin.tar.gz
# 编译过程中遇到的问题
# Caused by: java.lang.ClassNotFoundException: com.sun.tools.javac.code.TypeTags
# lombok 版本过低,不支持高版本的 java,提升版本号即可.
# 程序包javax.xml.bind.annotation不存在
# jdk 版本修改为 1.8
# 进入目录 启动 注意启动环境要使用 jdk 1.8
# 去掉shell脚本的 \r
sed -i 's/\r//'
bin/start.sh
# 访问 192.168.123.197:8899
ERROR: Failed to download Chromium r686378! Set "PUPPETEER_SKIP_CHROMIUM_DOWNLOAD"
错误 和 npm 下载慢的问题
<execution>
<id>npm config set</id>
<goals>
<goal>npm</goal>
</goals>
<configuration>
<arguments>config set registry https://registry.npm.taobao.org</arguments>
</configuration>
</execution>
<execution>
<id>npm config puppeteer</id>
<goals>
<goal>npm</goal>
</goals>
<configuration>
<arguments>config set puppeteer_download_host=https://npm.taobao.org/mirrors</arguments>
</configuration>
</execution>