Publisher¶
Use broker.publisher(topic) to get a reusable publisher object. This is useful when you want to inject the publisher as a dependency or use it from inside a handler.
Creating a publisher¶
from faststream_redis_timers import TimersBroker
from redis.asyncio import Redis
client = Redis.from_url("redis://localhost:6379")
broker = TimersBroker(client)
reminder_publisher = broker.publisher("reminders")
Publishing from a handler¶
from datetime import timedelta
from faststream_redis_timers import TimersBroker
from redis.asyncio import Redis
client = Redis.from_url("redis://localhost:6379")
broker = TimersBroker(client)
reminder_publisher = broker.publisher("reminders")
@broker.subscriber("orders")
async def handle_order(order_id: str) -> None:
# Schedule a follow-up reminder 7 days after the order
await reminder_publisher.publish(
f"Follow up on order {order_id}",
activate_in=timedelta(days=7),
)
Publisher options¶
The publish() method on a publisher accepts the parameters below and returns the resolved timer_id (generated or provided):
| Parameter | Default | Description |
|---|---|---|
message |
— | The message body (any serializable value) |
timer_id |
auto UUID | Unique ID for the timer — use for idempotency |
activate_in |
timedelta(0) |
Delay before delivery (fires immediately if 0) |
activate_at |
None |
Absolute timezone-aware datetime to fire at. Mutually exclusive with activate_in |
correlation_id |
same as timer_id |
Correlation ID for tracing — round-trips to the handler. Defaults to the resolved timer_id (which is itself an auto UUID when not supplied) |
headers |
None |
dict[str, Any] of headers — round-trips to the handler |
correlation_id defaults to timer_id
If you omit correlation_id, it falls back to the resolved timer_id —
not a fresh UUID. When timer_id is also auto-generated, the two end up
equal but each unique. When you supply your own timer_id (e.g.
"invoice-INV-001-due") for idempotent retries, omitting correlation_id
means every retry of that timer carries the same correlation ID. Pass
correlation_id explicitly if your tracing pipeline needs per-publish
uniqueness.
Past activation times fire immediately
activate_in=timedelta(seconds=-5) and activate_at set in the past both
schedule the timer at a negative-relative score — the next subscriber poll
picks it up and fires it. No error is raised; this lets activate_at
computations that take "marginally too long" still deliver. If you want
strict scheduling, validate at the call site before publishing.
Tracing & headers example¶
from faststream import Context
from faststream.message import StreamMessage
pub = broker.publisher("orders")
@broker.subscriber("orders")
async def handle(
body: dict,
correlation_id: str = Context("message.correlation_id"),
tenant: str = Context("message.headers.x-tenant"),
) -> None:
...
await pub.publish(
{"order_id": 42},
correlation_id="trace-abc-123",
headers={"x-tenant": "acme"},
)
Cancelling timers via a publisher¶
Publishers expose a cancel(timer_id) method that cancels a pending timer on the same topic. This is a no-op if the timer has already fired or does not exist. Capture the timer_id from publish():
pub = broker.publisher("invoices")
timer_id = await pub.publish("INV-001", activate_in=timedelta(days=30))
# Later — cancel the timer via the publisher
await pub.cancel(timer_id)
Inspecting pending timers¶
fetch_redis_timers(dt) returns all timers on this publisher's topic that are due by dt as a list of (topic, timer_id) tuples. This is useful in service tests to assert that timers were scheduled correctly without waiting for them to fire.
from datetime import UTC, datetime, timedelta
pub = broker.publisher("invoices")
await pub.publish("INV-001", timer_id="invoice-INV-001-due", activate_in=timedelta(days=30))
# Assert the timer is scheduled
pending = await pub.fetch_redis_timers(datetime.now(tz=UTC) + timedelta(days=31))
assert ("invoices", "invoice-INV-001-due") in pending
# Timers not yet due are excluded
pending_now = await pub.fetch_redis_timers(datetime.now(tz=UTC))
assert pending_now == []
In TestTimersBroker, fetch_redis_timers always returns [] because messages are delivered immediately — there are no pending timers to inspect.