Tasks & Scheduling
The Tasks module provides the concurrency foundation for xtils: event loops, thread pools, task groups, timers (monotonic and wall-clock), a cron scheduler, and typed event dispatch.
Overview
All async work in xtils flows through the TaskRunner abstraction. Whether it's a socket becoming readable, a timer firing, or a task being posted from another thread — everything is processed by a TaskRunner on its owning thread.
┌─────────────────────────────────────────────┐
│ Your Application │
│ │
│ ┌────────────┐ ┌────────────────────────┐ │
│ │ TaskGroup │ │ ThreadTaskRunner │ │
│ │ (workers) │ │ (dedicated I/O thread) │ │
│ └────────────┘ └────────────────────────┘ │
│ │ │ │
│ └────── TaskRunner ──┘ │
└─────────────────────────────────────────────┘TaskRunner — Abstract Interface
#include "xtils/tasks/task_runner.h"
using Task = std::function<void()>;
class TaskRunner {
public:
virtual void PostTask(Task task) = 0;
virtual void PostDelayedTask(Task task, uint32_t delay_ms) = 0;
virtual void AddFileDescriptorWatch(PlatformHandle fd, Task callback) = 0;
virtual void RemoveFileDescriptorWatch(PlatformHandle fd) = 0;
virtual bool RunsTasksOnCurrentThread() const = 0;
};This is the core abstraction — all networking, timers, and I/O watches ultimately use a TaskRunner.
UnixTaskRunner — epoll/poll Event Loop
The primary event loop implementation, built on epoll (Linux) with poll fallback:
#include "xtils/tasks/unix_task_runner.h"
UnixTaskRunner runner;
// Add I/O watch
runner.AddFileDescriptorWatch(socket_fd, [&]() {
// Socket is readable
char buf[1024];
ssize_t n = read(socket_fd, buf, sizeof(buf));
});
// Post a delayed task
runner.PostDelayedTask([]() {
LogI("Fired after 1 second");
}, 1000);
// Block and process events (call this from your main thread)
runner.Run();
// Stop from another thread or callback
runner.Quit();ThreadTaskRunner — Dedicated Thread
Wraps a UnixTaskRunner running on its own thread. Thread-safe: you can PostTask from any thread.
#include "xtils/tasks/thread_task_runner.h"
// Create and start (value type, moveable)
auto runner = ThreadTaskRunner::CreateAndStart("io_thread");
// Or as shared_ptr
auto runner = ThreadTaskRunner::CreateAndStartShared("io_thread");
// Post work from any thread
runner.PostTask([]() {
// Runs on the dedicated io_thread
});
runner.PostDelayedTask([]() {
LogI("Delayed task on io_thread");
}, 2000);
// Access the underlying UnixTaskRunner
UnixTaskRunner* raw = runner.get();TaskGroup — Parallel/Sequential Execution
Task groups manage pools of workers:
#include "xtils/tasks/task_group.h"
// Create a parallel pool (default: hardware_concurrency threads)
auto pool = TaskGroup::Parallel(4);
// Create a sequential executor (single worker thread)
auto seq = TaskGroup::Sequential();Posting Tasks
// Post and forget
pool->PostTask([]() {
// Runs on any available worker
HeavyComputation();
});
// Post with delay
pool->PostAsyncTask([]() {
LogI("Delayed worker task");
}, 1000);RunUntilCompleted — Blocking Future
// Block calling thread until task completes, return result
auto result = pool->RunUntilCompleted([]() {
return ComputeExpensiveResult();
});
LogI("Result: %d", result);Lifecycle
bool pool->IsBusy(); // Any tasks pending/running?
int pool->Size(); // Number of worker threads
void pool->Stop(); // Signal stop
bool pool->StopWaitAll(5s); // Stop and wait (with timeout)
auto runner = pool->MainRunner(); // Get main thread task runnerTimers
SteadyTimer (Monotonic Clock)
Best for relative timing — not affected by system clock changes:
#include "xtils/tasks/timer.h"
SteadyTimer timer(task_group.get());
// One-shot timer
auto id = timer.SetRelativeTimer(5000, []() {
LogI("Fires once after 5 seconds");
}, TimerType::OneShot);
// Repeating timer
auto id2 = timer.SetRepeatingTimer(1000, []() {
LogI("Fires every second");
});
// Absolute timer (fire at a specific steady_clock point)
auto deadline = SteadyTimer::GetCurrentTimestampMs() + 10000;
timer.SetAbsoluteTimer(deadline, []() { /* ... */ });
// Cancel
timer.CancelTimer(id);
timer.CancelAllTimers();
// Get current monotonic time
uint64_t now = SteadyTimer::GetCurrentTimestampMs();SystemTimer (Wall Clock / UTC)
For absolute time scheduling (e.g., "fire at 2025-01-01 00:00:00 UTC"):
SystemTimer timer(task_group.get());
// Fire at a specific UTC timestamp
uint64_t target_utc_ms = 1735689600000; // Some future time
timer.SetAbsoluteUtcTimer(target_utc_ms, []() {
LogI("Happy New Year!");
});
uint64_t now_utc = SystemTimer::GetCurrentUtcTimestampMs();TIP
Use SteadyTimer (aliased as MonotonicTimer) for intervals and timeouts. Use SystemTimer (aliased as UtcTimer) for calendar-based scheduling.
CronScheduler
Schedule recurring tasks with cron-style expressions or simple intervals:
#include "xtils/tasks/cron_scheduler.h"
CronScheduler cron(/* tzOffsetMinutes */ 480); // UTC+8
// Simple interval: every 30 seconds
auto id1 = cron.every(Seconds(30), []() {
LogI("Every 30 seconds");
});
// Cron-style: at minute 0 of every hour
// cron(seconds, minutes, hours, days, months, weekdays, callback)
// Empty set = wildcard (match all)
auto id2 = cron.cron(
{0}, // second 0
{0}, // minute 0
{}, // every hour
{}, // every day
{}, // every month
{}, // every weekday
[]() { LogI("Hourly task"); }
);
// Every weekday at 9:00 AM
auto id3 = cron.cron(
{0}, {0}, {9}, // 09:00:00
{}, // any day of month
{}, // any month
{1,2,3,4,5}, // Mon-Fri
[]() { LogI("Good morning!"); }
);
cron.start();
// Later...
cron.cancel(id1);
auto info = cron.getTaskInfo(id2); // Returns std::optional<TaskInfo>
cron.stop();EventManager
Typed pub/sub event dispatch across threads:
#include "xtils/tasks/event.h"
auto tg = TaskGroup::Parallel(2);
EventManager events(tg);
// Define event types
struct DataReady { std::vector<int> data; };
struct ShutdownRequest {};
// Subscribe (callbacks fire on task group thread)
events.Connect<DataReady>([](const DataReady& e) {
LogI("Got %zu items", e.data.size());
});
events.Connect<ShutdownRequest>([](const ShutdownRequest&) {
LogI("Shutdown requested");
});
// Publish (can be called from any thread)
events.Emit(DataReady{{1, 2, 3, 4, 5}});
events.Emit(ShutdownRequest{});Enum-Based Events
For lightweight signaling without payloads:
enum class Signal { Refresh, Pause, Resume };
events.Connect<Signal>(Signal::Refresh, [](const Signal&) {
LogI("Refreshing...");
});
events.Emit(Signal::Refresh);Patterns
Producer-Consumer
auto pool = TaskGroup::Parallel(4);
SteadyTimer timer(pool.get());
// Producer: generate work items periodically
timer.SetRepeatingTimer(100, [&]() {
auto item = FetchNextItem();
pool->PostTask([item]() {
ProcessItem(item);
});
});Periodic Polling with Backoff
auto runner = ThreadTaskRunner::CreateAndStart("poller");
std::function<void()> poll = [&]() {
auto result = TryFetch();
uint32_t next_delay = result.has_value() ? 1000 : 5000; // Backoff on failure
runner.PostDelayedTask(poll, next_delay);
};
runner.PostTask(poll);Complete Example
#include <xtils/tasks/task_group.h>
#include <xtils/tasks/timer.h>
#include <xtils/tasks/cron_scheduler.h>
#include <xtils/tasks/event.h>
#include <xtils/logging/logger.h>
using namespace xtils;
struct MetricsCollected { double cpu; double memory; };
int main() {
auto pool = TaskGroup::Parallel(4);
SteadyTimer timer(pool.get());
EventManager events(pool);
// Collect metrics every 5 seconds
timer.SetRepeatingTimer(5000, [&]() {
pool->PostTask([&]() {
double cpu = MeasureCpu();
double mem = MeasureMemory();
events.Emit(MetricsCollected{cpu, mem});
});
});
// React to metrics
events.Connect<MetricsCollected>([](const MetricsCollected& m) {
LogI("CPU: %.1f%%, Memory: %.1f%%", m.cpu, m.memory);
if (m.cpu > 90.0) LogW("High CPU usage!");
});
// Cron: daily cleanup at 3 AM
CronScheduler cron(480); // UTC+8
cron.cron({0}, {0}, {3}, {}, {}, {}, []() {
LogI("Running daily cleanup...");
CleanupOldLogs();
});
cron.start();
// Block main thread
pool->MainRunner()->Run();
return 0;
}