This section assumes you have a basic understanding of the core concepts of task queues and TacoQ. Read the Core Concepts section if you haven't already.
We recommend using UV to run Python projects.
TacoQ requires Postgres, RabbitMQ, and the Relay to be running. Let's start
by creating a docker-compose.yml
file to launch them:
services:
relay:
image: ghcr.io/taco-xyz/tacoq-relay:latest
ports:
- "3000:3000"
depends_on:
rabbitmq:
condition: service_healthy
postgres:
condition: service_healthy
environment:
TACOQ_DATABASE_URL: postgresql://user:password@postgres:5432/tacoq
TACOQ_BROKER_URL: amqp://user:password@rabbitmq:5672
rabbitmq:
image: rabbitmq:4-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: user
RABBITMQ_DEFAULT_PASS: password
postgres:
image: postgres:latest
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: password
POSTGRES_DB: tacoq
ports:
- "5432:5432"
Run docker compose up
to start the services, and we're ready to go!
With the infrastructure running, we want to create a worker that can execute tasks. Let's start by installing the TacoQ Python SDK:
pip install tacoq
or, for UV users:
uv init
uv add tacoq
The worker must know how to receive new task assignments and send updates through the broker. We'll begin by creating a new script with the worker application.
To start, configure the broker so that the worker knows where to send tasks to.
from tacoq import BrokerConfig
broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")
Next, we'll configure the worker application.
from tacoq import BrokerConfig
from tacoq import WorkerApplication, WorkerApplicationConfig
broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")
worker_config = WorkerApplicationConfig(
name="my_worker_1", # The name of the worker for observability purposes.
kind="my_worker_kind", # The kind of worker to route tasks to.
broker_config=broker_config,
broker_prefetch_count=5, # How many tasks to prefetch and execute concurrently.
)
worker_app = WorkerApplication(config=worker_config)
Note that the worker field
kind
is set to"my_worker_kind"
. This field will be used by the publisher to know which set of workers to send the task to. We recommend using environment variables to align these values.
The worker application has been created, but it doesn't know how to handle any tasks that come its way. So, let's teach it to handle a task.
We'll create some models for the task input and output. These will be used throughout our application.
from pydantic import BaseModel
# Create Pydantic models for the task input and output.
class TestInputPydanticModel(BaseModel):
name: str
class TestOutputPydanticModel(BaseModel):
message: str
Then, we'll create a handler function for the task.
from tacoq import WorkerApplication, WorkerApplicationConfig, BrokerConfig
from pydantic import BaseModel
import asyncio
broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")
worker_config = WorkerApplicationConfig(
name="my_worker_1", # The name of the worker for observability purposes.
kind="my_worker_kind", # The kind of worker to route tasks to.
broker_config=broker_config,
broker_prefetch_count=5, # How many tasks to prefetch and execute concurrently.
)
worker_app = WorkerApplication(config=worker_config)
# Models
class TestInputPydanticModel(BaseModel):
name: str
class TestOutputPydanticModel(BaseModel):
message: str
# Define the task handler
@worker_app.task(kind="task_hello_world")
async def task_hello_world(input_data: TestInputPydanticModel) -> TestOutputPydanticModel:
# Extract the input data.
name = input_data.name
# Simulate some work.
await asyncio.sleep(3)
# The results are automatically encoded into bytes and sent back to the broker
return TestOutputPydanticModel(
message=f"Hello, {name}!",
)
When a task's input and output data reach the broker, they must be encoded in byte format.
TacoQ provides a default encoder and decoder that can handle JSON objects with the language's most commonly used libraries. These are well tested and should work for most use cases.
- Python:
pydantic
- Rust:
serde_json
You can also write your own encoders and decoders. Simply peek into the source code of the default ones and implement the required methods.
Note the task field
kind
is set to"task_hello_world"
. You can think about it the following way: - Worker Kind: Helps the publisher know which set of workers to send the task to. - Task Kind: Helps the worker know which method to use to handle a task. If you're familiar with task queues, you're probably used to only specifying the task kind and not the worker kind. You can read about this design decision in the System Architecture section.
Now that our worker is ready to handle tasks, we can boot it up via its
entrypoint
method:
if __name__ == "__main__":
asyncio.run(worker_app.entrypoint())
The worker is running and ready to handle tasks. Now, let's publish some tasks for it to take care of!
We'll start by setting up the publisher and its configuration. The publisher's one and only responsibility is to publish tasks via the message broker so that the relay can register them and the worker can execute them.
Begin by creating a new file for the publisher and configuring it. Because the publisher must send tasks via the broker, it will also need the broker configuration.
from tacoq import PublisherClient, BrokerConfig
broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")
publisher = PublisherClient(broker_config=broker_config)
With the publisher application created, we don't need to run an entrypoint. Instead, we will use the object to publish the task.
Let's publish a new task, wait for it to complete, and finally, retrieve the results:
from tacoq import PublisherClient, BrokerConfig
import asyncio
# Configure the publisher
broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")
publisher = PublisherClient(broker_config=broker_config)
async def task_hello_world():
# Publish the task. It's important to save the object so that we can later
# retrieve the task's status and results based on its ID.
task = await publisher.publish_task(
worker_kind="my_worker_kind",
task_kind="task_hello_world",
input_data=TestInputPydanticModel(name="Pedro"),
)
Our task has now been published and is being worked on. But how do we retrieve the task's status and results?
When the worker is done with the task, it sends the results to the relay, who saves them in the database. The relay can be queried via REST for the task's current state.
To communicate with the relay, we can use the RelayClient
class. We can
configure it with the Relay's URL.
from tacoq import PublisherClient, BrokerConfig
from tacoq import RelayClient, RelayConfig,PydanticDecoder
# Configure the publisher
broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")
publisher = PublisherClient(broker_config=broker_config)
# Configure the relay
relay_config = RelayConfig(url="http://localhost:3000")
relay_client = RelayClient(relay_config=relay_config)
async def task_hello_world():
# Publish the task. It's important to save the object so that we can later
# retrieve the task's status and results based on its ID.
task = await publisher.publish_task(
worker_kind="my_worker_kind",
task_kind="task_hello_world",
input_data=TestInputPydanticModel(name="Pedro"),
)
Now, we will use the RelayClient
to retrieve the task's results after
they are completed and decode the results into the correct object.
from tacoq import PublisherClient, BrokerConfig
from tacoq import RelayClient, RelayConfig,PydanticDecoder
import asyncio
# Configure the publisher
broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")
publisher = PublisherClient(broker_config=broker_config)
# Configure the relay
relay_config = RelayConfig(url="http://localhost:3000")
relay_client = RelayClient(relay_config=relay_config)
async def task_hello_world():
# Publish the task. It's important to save the object so that we can later
# retrieve the task's status and results based on its ID.
task = await publisher.publish_task(
worker_kind="my_worker_kind",
task_kind="task_hello_world",
input_data=TestInputPydanticModel(name="Pedro"),
)
# Wait for the task to complete.
await asyncio.sleep(5)
# Fetch the current task state based on its ID.
completed_task = await relay_client.get_task(task.id)
# Decode the results into the correct result.
result = completed_task.get_decoded_output_data(
decoder=PydanticDecoder(TestOutputPydanticModel)
)
print(result)
if __name__ == "__main__":
asyncio.run(task_hello_world())
Congratulations! You've just published, executed, and retrieved a task using TacoQ. You can keep learning more about TacoQ in the Technical Reference section.
Last updated: 4/6/2025