Basic usage¶
1. Create a broker and app¶
from faststream import FastStream
from faststream_redis_timers import TimersBroker
from redis.asyncio import Redis
client = Redis.from_url("redis://localhost:6379")
broker = TimersBroker(client)
app = FastStream(broker)
2. Define a subscriber¶
Subscribers work like any FastStream subscriber. Decorate a handler with @broker.subscriber(topic):
@broker.subscriber("reminders")
async def handle_reminder(message: str) -> None:
print(f"Reminder fired: {message}")
3. Publish a timer¶
Use broker.publish() with activate_in (relative delay) or activate_at (absolute time) to schedule delivery. publish() returns the resolved timer_id — keep it for cancellation or inspection.
from datetime import UTC, datetime, timedelta
@app.after_startup
async def schedule_reminder() -> None:
# Relative — fire 24 hours from now
await broker.publish(
"Call dentist",
topic="reminders",
activate_in=timedelta(hours=24),
)
# Absolute — fire at a specific moment
await broker.publish(
"Quarterly review",
topic="reminders",
activate_at=datetime(2026, 6, 1, 9, tzinfo=UTC),
)
activate_at must be timezone-aware. Passing both activate_in and activate_at raises ValueError.
The full example:
from datetime import timedelta
from faststream import FastStream
from faststream_redis_timers import TimersBroker
from redis.asyncio import Redis
client = Redis.from_url("redis://localhost:6379")
broker = TimersBroker(client)
app = FastStream(broker)
@broker.subscriber("reminders")
async def handle_reminder(message: str) -> None:
print(f"Reminder fired: {message}")
@app.after_startup
async def schedule_reminder() -> None:
await broker.publish(
"Call dentist",
topic="reminders",
activate_in=timedelta(hours=24),
)
Broker options¶
| Parameter | Default | Description |
|---|---|---|
client |
required for production | redis.asyncio.Redis client instance — the caller owns its lifecycle (the broker does not close it). The constructor accepts None, but a broker without a client raises IncorrectState on the first operation; None is the test-broker shape (see Testing). |
timeline_key |
timers_timeline |
Sorted set key name |
payloads_key |
timers_payloads |
Hash key name |
start_timeout |
3.0 |
Seconds to wait for the subscriber's first Redis ping during startup |
graceful_timeout |
15.0 |
Seconds to wait for in-flight timers on shutdown |
TimersBroker does not call .aclose() on the supplied client — wrap it in
async with Redis.from_url(...) as client: (or your own try/finally) so the
connection is released when the application stops.
Timer IDs¶
Each timer has a unique timer_id. If you don't provide one, a UUID is generated automatically. You can supply your own to make a timer idempotent — publishing the same timer_id twice will overwrite the first silently (no error, no warning). This is the right behavior for idempotent retry of publish() calls but a footgun if two unrelated callers pick the same ID. Namespace your IDs (e.g., f"invoice-{invoice_id}-due") to avoid accidental collisions.
await broker.publish(
"INV-001",
topic="invoices",
timer_id="invoice-INV-001-due",
activate_in=timedelta(days=30),
)
Past activation times fire immediately¶
Both activate_in=timedelta(seconds=-5) and activate_at=datetime.now(tz=UTC) - timedelta(seconds=5) produce a timer scheduled in the past — the next subscriber poll picks it up and fires it immediately. No error is raised; this lets activate_at computations that take "marginally too long" still deliver instead of breaking. If you want strict scheduling, validate at the call site before publishing.
Cancelling timers¶
Cancel a pending timer before it fires using broker.cancel_timer(topic, timer_id). This is a no-op if the timer has already fired or does not exist. The timer_id returned from publish() is what you'd pass:
timer_id = await broker.publish(
"INV-001",
topic="invoices",
activate_in=timedelta(days=30),
)
# Later — invoice was paid early, cancel the reminder
await broker.cancel_timer("invoices", timer_id)
Inspecting pending timers¶
Three broker methods let you inspect or bulk-cancel timers without poking Redis directly:
| Method | Returns | Use for |
|---|---|---|
await broker.has_pending(topic, timer_id) |
bool |
"is timer X still scheduled?" |
await broker.get_pending_timers(topic, before=None) |
list[str] |
List pending IDs on topic — optionally filter to those due by before (a timezone-aware datetime) |
await broker.cancel_all(topic) |
int |
Cancel every timer on topic; returns the number removed |
from datetime import UTC, datetime, timedelta
# Quick existence check
if await broker.has_pending("invoices", "invoice-INV-001-due"):
...
# All pending timers on a topic
pending = await broker.get_pending_timers("invoices")
# Only those due in the next hour
soon = await broker.get_pending_timers(
"invoices", before=datetime.now(tz=UTC) + timedelta(hours=1),
)
# Wipe a topic's queue (e.g., during a maintenance reset)
removed = await broker.cancel_all("invoices")
These methods only inspect/cancel timers in the queue — handlers that have already started running are unaffected.
Leased timers still appear as pending¶
Timers that have been claimed by a worker and are currently being processed have their score pushed forward by lease_ttl seconds (the visibility-timeout pattern — see How it works). They are still in the sorted set, so:
has_pending(topic, timer_id)returnsTruewhile the handler is running.get_pending_timers(topic)(nobefore) includes them.get_pending_timers(topic, before=datetime.now(tz=UTC))excludes them — they are no longer due "by now" because their score is in the future.
If you are polling has_pending to detect "the timer fired and the handler finished", you will get false positives during the lease window. Pass before=datetime.now(tz=UTC) (or treat the timer as gone only after has_pending is False) to avoid this.
cancel_all race with executing handlers¶
If cancel_all(topic) runs while a worker is mid-handler for a leased timer on that topic, the handler runs to completion. When it finishes, its commit (the ZREM + HDEL that normally removes the timer) becomes a no-op because cancel_all has already deleted both keys for the topic. The work is not rolled back — only the bookkeeping is skipped — so handlers that have side effects (sent emails, written rows) will have already done them. Use cancel_all for queue resets, not for "stop everything in flight."
Debug logging¶
Set log_level=logging.DEBUG on TimersBroker to emit per-timer DEBUG lines: timers fetched per poll cycle, claim contested by another worker, and timer delivered to handler. Useful for diagnosing "my timer didn't fire".