任务调度
Tasks 模块提供 xtils 的并发基础:事件循环、线程池、任务组、定时器(单调时钟和墙上时钟)、Cron 调度器和类型化事件分发。
概述
xtils 中的所有异步工作都通过 TaskRunner 抽象进行。无论是套接字变为可读、定时器触发,还是从另一个线程提交的任务 — 一切都由 TaskRunner 在其所属线程上处理。
TaskRunner — 抽象接口
cpp
#include "xtils/tasks/task_runner.h"
using Task = std::function<void()>;
class TaskRunner {
virtual void PostTask(Task task) = 0;
virtual void PostDelayedTask(Task task, uint32_t delay_ms) = 0;
virtual void AddFileDescriptorWatch(PlatformHandle fd, Task callback) = 0;
virtual void RemoveFileDescriptorWatch(PlatformHandle fd) = 0;
virtual bool RunsTasksOnCurrentThread() const = 0;
};UnixTaskRunner — epoll/poll 事件循环
基于 epoll(Linux)构建的主事件循环实现:
cpp
#include "xtils/tasks/unix_task_runner.h"
UnixTaskRunner runner;
runner.AddFileDescriptorWatch(socket_fd, [&]() {
char buf[1024];
ssize_t n = read(socket_fd, buf, sizeof(buf));
});
runner.PostDelayedTask([]() {
LogI("1 秒后触发");
}, 1000);
runner.Run(); // 阻塞,处理事件
runner.Quit(); // 停止循环ThreadTaskRunner — 专用线程
在自己的线程上包装 UnixTaskRunner。线程安全:可从任何线程 PostTask。
cpp
#include "xtils/tasks/thread_task_runner.h"
auto runner = ThreadTaskRunner::CreateAndStart("io_thread");
// 从任何线程提交工作
runner.PostTask([]() {
// 在专用 io_thread 上运行
});
runner.PostDelayedTask([]() {
LogI("延迟任务");
}, 2000);TaskGroup — 并行/串行执行
cpp
#include "xtils/tasks/task_group.h"
// 创建并行线程池
auto pool = TaskGroup::Parallel(4);
// 创建串行执行器
auto seq = TaskGroup::Sequential();
// 提交任务
pool->PostTask([]() { HeavyComputation(); });
pool->PostAsyncTask([]() { LogI("延迟工作"); }, 1000);
// 阻塞等待结果
auto result = pool->RunUntilCompleted([]() {
return ComputeExpensiveResult();
});定时器
SteadyTimer(单调时钟)
最适合相对计时 — 不受系统时钟变化影响:
cpp
#include "xtils/tasks/timer.h"
SteadyTimer timer(task_group.get());
// 一次性定时器
auto id = timer.SetRelativeTimer(5000, []() {
LogI("5 秒后触发一次");
}, TimerType::OneShot);
// 重复定时器
auto id2 = timer.SetRepeatingTimer(1000, []() {
LogI("每秒触发");
});
// 取消
timer.CancelTimer(id);
timer.CancelAllTimers();SystemTimer(墙上时钟 / UTC)
用于绝对时间调度(例如「在 2025-01-01 00:00:00 UTC 触发」):
cpp
SystemTimer timer(task_group.get());
uint64_t target_utc_ms = 1735689600000;
timer.SetAbsoluteUtcTimer(target_utc_ms, []() {
LogI("新年快乐!");
});TIP
使用 SteadyTimer(别名 MonotonicTimer)处理间隔和超时。使用 SystemTimer(别名 UtcTimer)进行日历调度。
CronScheduler
使用 cron 表达式或简单间隔调度周期性任务:
cpp
#include "xtils/tasks/cron_scheduler.h"
CronScheduler cron(480); // UTC+8
// 简单间隔:每 30 秒
auto id1 = cron.every(Seconds(30), []() {
LogI("每 30 秒");
});
// Cron 风格:每小时的第 0 分钟
// cron(秒, 分, 时, 日, 月, 星期几, 回调)
// 空集 = 通配符(匹配所有)
auto id2 = cron.cron(
{0}, {0}, {}, {}, {}, {},
[]() { LogI("整点任务"); }
);
// 每个工作日早上 9:00
auto id3 = cron.cron(
{0}, {0}, {9}, {}, {}, {1,2,3,4,5},
[]() { LogI("早安!"); }
);
cron.start();
cron.cancel(id1);
cron.stop();EventManager
跨线程的类型化发布/订阅事件分发:
cpp
#include "xtils/tasks/event.h"
auto tg = TaskGroup::Parallel(2);
EventManager events(tg);
// 定义事件类型
struct DataReady { std::vector<int> data; };
// 订阅
events.Connect<DataReady>([](const DataReady& e) {
LogI("收到 %zu 个项目", e.data.size());
});
// 发布(可从任何线程调用)
events.Emit(DataReady{{1, 2, 3, 4, 5}});基于枚举的事件
cpp
enum class Signal { Refresh, Pause, Resume };
events.Connect<Signal>(Signal::Refresh, [](const Signal&) {
LogI("正在刷新...");
});
events.Emit(Signal::Refresh);完整示例
cpp
#include <xtils/tasks/task_group.h>
#include <xtils/tasks/timer.h>
#include <xtils/tasks/cron_scheduler.h>
#include <xtils/tasks/event.h>
#include <xtils/logging/logger.h>
using namespace xtils;
struct MetricsCollected { double cpu; double memory; };
int main() {
auto pool = TaskGroup::Parallel(4);
SteadyTimer timer(pool.get());
EventManager events(pool);
// 每 5 秒收集指标
timer.SetRepeatingTimer(5000, [&]() {
pool->PostTask([&]() {
double cpu = MeasureCpu();
double mem = MeasureMemory();
events.Emit(MetricsCollected{cpu, mem});
});
});
// 响应指标
events.Connect<MetricsCollected>([](const MetricsCollected& m) {
LogI("CPU: %.1f%%, 内存: %.1f%%", m.cpu, m.memory);
if (m.cpu > 90.0) LogW("CPU 使用率过高!");
});
// Cron:每天凌晨 3 点清理
CronScheduler cron(480);
cron.cron({0}, {0}, {3}, {}, {}, {}, []() {
LogI("执行每日清理...");
CleanupOldLogs();
});
cron.start();
pool->MainRunner()->Run();
return 0;
}