Celery¶
The logfire.instrument_celery()
method will create a span for every task
executed by your Celery workers.
The integration also supports the Celery beat.
Installation¶
Install logfire
with the celery
extra:
pip install 'logfire[celery]'
uv add 'logfire[celery]'
poetry add 'logfire[celery]'
Celery Worker¶
Info
The broker you use doesn't matter for the Celery instrumentation.
Any broker supported by Celery will work.
For our example, we'll use redis. You can run it with Docker:
docker run --rm -d -p 6379:6379 redis
Below we have a minimal example using Celery. You can run it with celery -A tasks worker --loglevel=info
:
import logfire
from celery import Celery
from celery.signals import worker_init
@worker_init.connect()
def init_worker(*args, **kwargs):
logfire.configure(service_name="worker")
logfire.instrument_celery()
app = Celery("tasks", broker="redis://localhost:6379/0")
@app.task
def add(x: int, y: int):
return x + y
add.delay(42, 50)
Celery Beat¶
As said before, it's also possible that you have periodic tasks scheduled with Celery beat.
Let's add the beat to the previous example:
import logfire
from celery import Celery
from celery.signals import worker_init, beat_init
@worker_init.connect()
def init_worker(*args, **kwargs):
logfire.configure(service_name="worker")
logfire.instrument_celery()
@beat_init.connect()
def init_beat(*args, **kwargs):
logfire.configure(service_name="beat")
logfire.instrument_celery()
app = Celery("tasks", broker="redis://localhost:6379/0")
app.conf.beat_schedule = {
"add-every-30-seconds": {
"task": "tasks.add",
"schedule": 30.0,
"args": (16, 16),
},
}
@app.task
def add(x: int, y: int):
return x + y
The code above will schedule the add
task to run every 30 seconds with the arguments 16
and 16
.
To run the beat, you can use the following command:
celery -A tasks beat --loglevel=info
The keyword arguments of logfire.instrument_celery()
are passed to the
CeleryInstrumentor().instrument()
method.