首页 - 关于研博 - 技术笔记 - PowerJob进阶—MapProcessor
PowerJob进阶—MapProcessor
2024.10.15

前言

在学习完powerjob并了解基本工作原理之后,基于业务需要基础的BasicProcessor(单机处理器)不能完美的契合业务需求,BasicProcessor如果一次任务中处理设备量过大,那么则会造成单个任务实例负载过高,并且系统压力过大,因此选用了支持将单个任务中全部设备,通过将任务分批拆分为多个包含少量设备的子任务的MapProcessor。

 

 

MapProcessor介绍

 

 

MapProcessor接口继承了BasicProcessor接口并且对其进行再次增强,使其可以通过子任务的方式将任务拆分执行,对执行任务的节点更加友好,在MapProcessor中可以通过MapProcessor接口的isRoot()方法判断该次执行的任务是否为根任务,调用该方法会返回一个布尔值,为根任务,则可以构建子任务,需手动创建子任务实体类,用于存入子任务执行时所需要的参数,此次在项目中使用则是解决设备功能调用中同时给大批量设备同时下发功能调用指令.考虑到后期设备可能过多,所以选择MapProcessor处理器通过构建子任务的方式将,任务拆分,每个子任务携带部分设备ID分别执行,减轻单次执行任务时的压力。

 

具体实现

 

  

1. 定义子任务类

 

用于在构建子任务时携带执行所需参数。

2.创建执行执行器并实现MapProcessor接口

在实现接口时需重写process()方法即执行的任务逻辑

 

3. isRoot()

通过MapProcessor执行任务,无论是根任务还是子任务都是执行的同一个process()方法,所以MapProcessor接口提供了isRoot()方法用来判断,当前执行的任务是根任务还是子任务。

4.判断为根任务

当isRoot()方法返回true时,则表明当前任务为根任务,此时需构建子任务集合,例如此任务需操作1000个设备,那么则可以构建5个子任务将1000个设备ID分别保存到5个子任务对象中,然后将五个子任务对象放入到集合中,再通过调用map()方法,将子任务集合及任务名,传入到taskContext,任务上下文中,以便后续的调用。

5.判断不为根任务

本文仅以简单的二层任务结构距离,如果判断不为根任务则此时一定是子任务,只需要通过taskContext调用getSubTask()方法,将我们创建的子任务取出。

并根据先前在创建子任务时传入的执行所需参数,进行对应的业务处理。

 

  

 

创建任务

在创建需要使用MapProcessor处理器处理的任务时,其他的参数与我们之前创建演示Demo无异,只是需要在执行配置处,选择Map执行,并将我们所编写的处理器全类名填写正确.如果没有选择Map执行,任务实例日志中会显示。

此时则需要检查,创建任务的参数是否填选正确。

 

后续的任务执行则无特别操作,只需要在任务管理页面,通过滑块来确定是否需要执行该任务.并在运行实例中可以查看每次任务执行的日志。

 

 

拓展: MapReduceProcessor

 

  

此处理器则是在MapProcessor处理器基础上,添加了reduce()方法,在实现该接口时除了需要重写process方法外还需要重写reduce方法.reduce方法则是根据process()方法的返回值,对子任务的执行状态进行统计,方便后续对子任务的计算操作,单此处理器在执行计算统计时,需要对数据库进行全量扫描,对节点内存压力过大,谨慎使用,如果只是简单的对任务进行拆分,不需要后续再对子任务处理结果进行计算操作,MapProcessor即可进行处理。

 

获取相关资料
下载地址将会发送至您填写的邮箱
相关新闻
skywalking快速入门
2024-10-22
MQTT协议及5.0版本特性
2024-10-22
MQTT协议介绍及Java教程
2024-10-22
  • 在线客服
  • 电话咨询
  • 微信
  • 短视频