Java实现通用本地延迟队列

N 人看过

这里使用了delayed 来实现延迟队列。

1. 定义函数接口

@FunctionalInterface
public interface EventFunction {

    void apply();
}

这个是用于执行函数,Java自带的有 Consumer 和 Function 等其他,因为我这边不需要入参和结果,所以我自己重新定义了一个。


2. 定义实体类

public class DelayedEntity implements Delayed {

    private final String id;
    private final long expireTime;
    private final EventFunction function;

    public DelayedEntity(long expireTime, EventFunction function) {
        super();
        this.expireTime = expireTime;
        this.function = function;
        // HuTool工具自动生成id,用于排查问题
        this.id = IdUtil.fastSimpleUUID();
    }

    public void execute() {
        function.apply();
    }


    @Override
    public long getDelay(TimeUnit unit) {
        return expireTime - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    public String getId() {
        return id;
    }
}

3. 定义消费者

@Slf4j
@Component
public class DelayEventConsumer implements Runnable {

    private final DelayQueue<DelayedEntity> queue = new DelayQueue<>();

    public DelayEventConsumer() {
    }

    public void submit(long expireTime, EventFunction function) {
        queue.put(new DelayedEntity(expireTime, function));
    }


    @Override
    public void run() {
        DelayedEntity entity = null;
        while (true) {
            try {
                entity = queue.poll();
                if (entity == null) {
                    continue;
                }
                log.info("延迟队列 [{}] 执行消息", entity.getId());
                entity.execute();
                log.info("延迟队列 [{}] 执行完毕", entity.getId());
            } catch (Exception e) {
                log.error(String.format("延迟队列 [%s] 执行报错", entity != null ? entity.getId() : "null"), e);
            }
        }
    }
}

4. 定义线程池

定义一个只有一个线程的线程池,在程序启动的时候执行。

@Component
public class ApplicationStartListener implements ApplicationListener<ApplicationStartedEvent> {

    @Autowired
    private DelayEventConsumer delayEventConsumer;

    @Override
    public void onApplicationEvent(@NotNull ApplicationStartedEvent event) {
        // HuTool工具生成只有一个线程的线程池,将消费者放进去执行
        ThreadUtil.newSingleExecutor().submit(delayEventConsumer);
    }

}

5. 测试

写了个接口用来测试

@Slf4j
@RestController
@RequestMapping("/test")
@RequiredArgsConstructor
public class TestController {
  private final DelayEventConsumer delayEventConsumer;
  private final UserTagService userTagService;
  
  @GetMapping("/testDelayQueue")
    public void testDelayQueue() {
        log.info("提交事件");
        // 延迟2秒执行
        delayEventConsumer.submit(System.currentTimeMillis() + 2000, () -> {
            List<UserTags> userTags = userTagService.randomRobot();
            log.info(JsonUtil.toJsonStr(userTags));
        });
    }
}

结果

2023-03-08 17:27:49.631  INFO 16792 --- [ool-13-thread-1] net.vitalblog.Logger   : 提交事件
2023-03-08 17:27:51.755  INFO 16792 --- [ool-13-thread-1] net.vitalblog.Logger   : 延迟队列 [c195406b95a048f7b27825f6839dbe98] 执行消息
2023-03-08 17:27:52.475  INFO 16792 --- [ool-13-thread-1] net.vitalblog.Logger   : [{"country":"IN","createTime":1677764575237,"id":4976,"level":20,"nickname":"👑 ptB_Colin","sid":104485,"type":"robot","userId":1368812591190769664}]
2023-03-08 17:27:52.476  INFO 16792 --- [ool-13-thread-1] net.vitalblog.Logger   : 延迟队列 [c195406b95a048f7b27825f6839dbe98] 执行完毕