抱歉,您的浏览器无法访问本站

本页面需要浏览器支持(启用)JavaScript


了解详情 >

ElasticJob

基本使用直接参考官网, 有中文. https://shardingsphere.apache.org/elasticjob/current/cn/overview/

ElasticJob 封装

基于3.0.0-beta版本, 对 ElasticJob 进行封装, 结合 SpringBoot, 使用annotation即可使用.

核心代码注解解析

/**
 * 解析es-job配置。要考虑在什么时机进行解析。在所有的spring bean都实例化完成之后
 *
 * @author DevLGQ
 * @version 1.0
 */
public class ElasticJobCfgParser implements ApplicationListener<ApplicationReadyEvent> {

    private static final Logger log = LoggerFactory.getLogger(ElasticJobCfgParser.class);
    public static final String PROTOTYPE = "prototype";
    private final CoordinatorRegistryCenter zookeeperRegistryCenter;

    public ElasticJobCfgParser(CoordinatorRegistryCenter zookeeperRegistryCenter) {
        this.zookeeperRegistryCenter = zookeeperRegistryCenter;
    }

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {

        ApplicationContext applicationContext = event.getApplicationContext();
        Map<String, Object> beanMap = applicationContext.getBeansWithAnnotation(ElasticJobConfig.class);
        beanMap.forEach((key, value) -> {
            Class<?> clz = value.getClass();
            // 如果类中有内部类
            int outClzIndex = clz.getName().indexOf("$");
            if (outClzIndex > 0) {
                String clzName = clz.getName();
                try {
                    clz = Class.forName(clzName.substring(0, outClzIndex));
                } catch (ClassNotFoundException e) {
                    log.error("elastic job 启动异常, 系统强制退出", e);
                    System.exit(1);
                }
            }

            // 获取配置项 ElasticJobConfig
            ElasticJobConfig conf = clz.getAnnotation(ElasticJobConfig.class);
            // 获取配置内容
            String jobType = conf.jobType();
            String jobName = conf.jobName();
            String cron = conf.cron();
            String jobParameter = conf.jobParameter();
            String shardingItemParameters = conf.shardingItemParameters();
            String description = conf.description();
            String jobErrorHandlerType = conf.jobErrorHandlerType();
            String jobExecutorServiceHandlerType = conf.jobExecutorServiceHandlerType();
            String jobShardingStrategyType = conf.jobShardingStrategyType();

            boolean failover = conf.failover();
            boolean misfire = conf.misfire();
            boolean overwrite = conf.overwrite();
            boolean disabled = conf.disabled();
            boolean monitorExecution = conf.monitorExecution();

            int shardingTotalCount = conf.shardingTotalCount();
            int maxTimeDiffSeconds = conf.maxTimeDiffSeconds();
            int reconcileIntervalMinutes = conf.reconcileIntervalMinutes();

            // 实例化 es_job 相关的通用配置
            JobConfiguration.Builder jobCfgBuilder = JobConfiguration
                    .newBuilder(jobName, shardingTotalCount)
                    .shardingItemParameters(shardingItemParameters)
                    .description(description)
                    .failover(failover)
                    .jobParameter(jobParameter)
                    .misfire(misfire)
                    .cron(cron)
                    .overwrite(overwrite)
                    .maxTimeDiffSeconds(maxTimeDiffSeconds)
                    .reconcileIntervalMinutes(reconcileIntervalMinutes)
                    .disabled(disabled)
                    .monitorExecution(monitorExecution)
                    .jobShardingStrategyType(jobShardingStrategyType)
                    .jobErrorHandlerType(jobErrorHandlerType)
                    .jobExecutorServiceHandlerType(jobExecutorServiceHandlerType);

            // 创建一个Spring的beanDefinition
            BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(ScheduleJobBootstrap.class);
            // 多例
            factory.setInitMethodName("schedule")
                    .setScope(PROTOTYPE);

            // 1.添加注册中心
            factory.addConstructorArgValue(this.zookeeperRegistryCenter);

            // 2. 添加自己的真实的任务实现类, 判断是什么类型的
            if (ElasticJobType.SCRIPT.equals(jobType)) {
                // 脚本任务
                boolean streamingProcess = conf.streamingProcess();
                String scriptCommandLine = conf.scriptCommandLine();
                jobCfgBuilder
                        .setProperty(DataflowJobProperties.STREAM_PROCESS_KEY, Boolean.toString(streamingProcess))
                        .setProperty(ScriptJobProperties.SCRIPT_KEY, scriptCommandLine);
                factory.addConstructorArgValue(ElasticJobType.SCRIPT);

            } else if (ElasticJobType.HTTP.equals(jobType)) {
                // http任务
                String httpUrl = conf.httpUrl();
                String httpMethod = conf.httpMethod();
                String httpData = conf.httpData();
                String httpConnectTimeoutMilliseconds = conf.httpConnectTimeoutMilliseconds();
                String httpReadTimeoutMilliseconds = conf.httpReadTimeoutMilliseconds();
                String httpContentType = conf.httpContentType();
                jobCfgBuilder
                        .setProperty(HttpJobProperties.URI_KEY, httpUrl)
                        .setProperty(HttpJobProperties.METHOD_KEY, httpMethod)
                        .setProperty(HttpJobProperties.DATA_KEY, httpData)
                        .setProperty(HttpJobProperties.CONNECT_TIMEOUT_KEY, httpConnectTimeoutMilliseconds)
                        .setProperty(HttpJobProperties.READ_TIMEOUT_KEY, httpReadTimeoutMilliseconds)
                        .setProperty(HttpJobProperties.CONTENT_TYPE_KEY, httpContentType);
                factory.addConstructorArgValue(ElasticJobType.HTTP);
            } else {
                factory.addConstructorArgValue(value);
            }

            // 3. 添加配置
            factory.addConstructorArgValue(jobCfgBuilder.build());

            // 把 factory 也就是 ScheduleJobBootstrap 注入到 Spring 容器中
            DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();
            String registerBeanName = conf.jobName() + ".ScheduleJobBootstrap";
            defaultListableBeanFactory.registerBeanDefinition(registerBeanName, factory.getBeanDefinition());

            ScheduleJobBootstrap scheduler = (ScheduleJobBootstrap) applicationContext.getBean(registerBeanName);
            scheduler.schedule();

            log.info("启动 elastic-job 作业:{}", jobName);

        });

        log.info("共启动作业数: {}", beanMap.values().size());

    }

}

完整代码可以在我的Git仓库查看.

ElasticJob 管理后台安装

Zookeeper 安装

wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz
mv apache-zookeeper-3.6.2 /usr/local/zookeeper-3.6.2
cd /usr/local/zookeeper-3.6.2/conf
mv zoo_simple.cfg zoo.cfg
vim zoo.cfg
mkdir /etc/zookeeper/data -p
mkdir /etc/zookeeper/data/log -p

# 后台启动
./zkServer.sh start
# 前台启动
./zkServer.sh start-foreground

# 使用客户端连接 zookeeper
./zkCli.sh -server 192.168.123.26:2181

# 一些常用指令
# 查看节点状态
stat /
# 查看指定路径下的节点
ls /
# 获取节点信息
get /node
# 退出客户端
quit

旧版本编译使用 elastic-job-lite-console

# 使用 2.1.5 版本
# Caused by: java.lang.ClassNotFoundException: com.sun.tools.javac.code.TypeTags
# 修改 lombok 版本
# Caused by: java.io.IOException: The javadoc executable 'D:\Program Files\Java\jdk-11.0.9+11\..\bin\javadoc.exe' doesn't exist or is not a file. Verify the <javadocExecutable/> parameter.
# 关闭 doc 生成
mvn install -Dmaven.javadoc.skip=true -Dmaven.test.skip=true -X
# 编译后在 elastic-job-lite\elastic-job-lite-console\target 文件夹下面
# auth.properties 认证配置文件
# 默认端口 8899

添加注册中心,注意 namespace 的添加

出现数据库连接不上的情况,可能因为mysql8.0的原因,修改源码,把数据库驱动改为 com.mysql.cj.jdbc.Driver.

新版本使用 shardingsphere-elasticjob-ui

拉取源码

git clone https://github.com.cnpmjs.org/apache/shardingsphere-elasticjob-ui.git
cd shardingsphere-elasticjob-ui
# 添加 Event Trace DataSource,两种方式
# 第一种 修改 shardingsphere-elasticjob-lite-ui/shardingsphere-elasticjob-lite-ui-backend/pom.xml 文件,添加 jdbc driver
# 第二种 在 ext-lib 目录下面添加 jdbc driver mysql-connector-java-8.0.11.jar
mvn clean package -Prelease
# 编译目标在 shardingsphere-elasticjob-ui/shardingsphere-elasticjob-ui-distribution/shardingsphere-elasticjob-lite-ui-bin-distribution/target/apache-shardingsphere-${latest.release.version}-shardingsphere-elasticjob-lite-ui-bin.tar.gz

# 编译过程中遇到的问题
# Caused by: java.lang.ClassNotFoundException: com.sun.tools.javac.code.TypeTags
# lombok 版本过低,不支持高版本的 java,提升版本号即可. 

# 程序包javax.xml.bind.annotation不存在
# jdk 版本修改为 1.8

# 进入目录 启动 注意启动环境要使用 jdk 1.8
# 去掉shell脚本的 \r 
sed -i 's/\r//'
bin/start.sh
# 访问 192.168.123.197:8899

ERROR: Failed to download Chromium r686378! Set "PUPPETEER_SKIP_CHROMIUM_DOWNLOAD" 错误 和 npm 下载慢的问题

<execution>
    <id>npm config set</id>
    <goals>
        <goal>npm</goal>
    </goals>
    <configuration>
        <arguments>config set registry https://registry.npm.taobao.org</arguments>
    </configuration>
</execution>
<execution>
    <id>npm config puppeteer</id>
    <goals>
        <goal>npm</goal>
    </goals>
    <configuration>
        <arguments>config set puppeteer_download_host=https://npm.taobao.org/mirrors</arguments>
    </configuration>
</execution>

评论