TacoQ Logo
Quickstart

Setup

Get TacoQ up and running on your project using Docker and the Python SDK.

This section assumes you have a basic understanding of the core concepts of task queues and TacoQ. Read the Core Concepts section if you haven't already.

Prerequisites

We recommend using UV to run Python projects.

Infrastructure

TacoQ requires Postgres, RabbitMQ, and the Relay to be running. Let's start by creating a docker-compose.yml file to launch them:

services:
  relay:
    image: ghcr.io/taco-xyz/tacoq-relay:latest
    ports:
      - "3000:3000"
    depends_on:
      rabbitmq:
        condition: service_healthy
      postgres:
        condition: service_healthy
    environment:
      TACOQ_DATABASE_URL: postgresql://user:password@postgres:5432/tacoq
      TACOQ_BROKER_URL: amqp://user:password@rabbitmq:5672

  rabbitmq:
    image: rabbitmq:4-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: user
      RABBITMQ_DEFAULT_PASS: password

  postgres:
    image: postgres:latest
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: password
      POSTGRES_DB: tacoq
    ports:
      - "5432:5432"

Run docker compose up to start the services, and we're ready to go!

Client

Worker

With the infrastructure running, we want to create a worker that can execute tasks. Let's start by installing the TacoQ Python SDK:

pip install tacoq

or, for UV users:

uv init
uv add tacoq

The worker must know how to receive new task assignments and send updates through the broker. We'll begin by creating a new script with the worker application.

To start, configure the broker so that the worker knows where to send tasks to.

from tacoq import BrokerConfig

broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")

Next, we'll configure the worker application.

from tacoq import BrokerConfig
from tacoq import WorkerApplication, WorkerApplicationConfig

broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")
worker_config = WorkerApplicationConfig(
    name="my_worker_1", # The name of the worker for observability purposes.
    kind="my_worker_kind", # The kind of worker to route tasks to.
    broker_config=broker_config,
    broker_prefetch_count=5, # How many tasks to prefetch and execute concurrently.
)

worker_app = WorkerApplication(config=worker_config)

Note that the worker field kind is set to "my_worker_kind". This field will be used by the publisher to know which set of workers to send the task to. We recommend using environment variables to align these values.

The worker application has been created, but it doesn't know how to handle any tasks that come its way. So, let's teach it to handle a task.

We'll create some models for the task input and output. These will be used throughout our application.

from pydantic import BaseModel

# Create Pydantic models for the task input and output.

class TestInputPydanticModel(BaseModel):
    name: str

class TestOutputPydanticModel(BaseModel):
    message: str

Then, we'll create a handler function for the task.

from tacoq import WorkerApplication, WorkerApplicationConfig, BrokerConfig
from pydantic import BaseModel
import asyncio

broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")
worker_config = WorkerApplicationConfig(
    name="my_worker_1", # The name of the worker for observability purposes.
    kind="my_worker_kind", # The kind of worker to route tasks to.
    broker_config=broker_config,
    broker_prefetch_count=5, # How many tasks to prefetch and execute concurrently.
)

worker_app = WorkerApplication(config=worker_config)

# Models

class TestInputPydanticModel(BaseModel):
    name: str

class TestOutputPydanticModel(BaseModel):
    message: str

# Define the task handler

@worker_app.task(kind="task_hello_world")
async def task_hello_world(input_data: TestInputPydanticModel) -> TestOutputPydanticModel:
    # Extract the input data.
    name = input_data.name

    # Simulate some work.
    await asyncio.sleep(3)

    # The results are automatically encoded into bytes and sent back to the broker
    return TestOutputPydanticModel(
        message=f"Hello, {name}!",
    )

When a task's input and output data reach the broker, they must be encoded in byte format.

TacoQ provides a default encoder and decoder that can handle JSON objects with the language's most commonly used libraries. These are well tested and should work for most use cases.

  • Python: pydantic
  • Rust: serde_json

You can also write your own encoders and decoders. Simply peek into the source code of the default ones and implement the required methods.

Note the task field kind is set to "task_hello_world". You can think about it the following way: - Worker Kind: Helps the publisher know which set of workers to send the task to. - Task Kind: Helps the worker know which method to use to handle a task. If you're familiar with task queues, you're probably used to only specifying the task kind and not the worker kind. You can read about this design decision in the System Architecture section.

Now that our worker is ready to handle tasks, we can boot it up via its entrypoint method:

if __name__ == "__main__":
    asyncio.run(worker_app.entrypoint())

The worker is running and ready to handle tasks. Now, let's publish some tasks for it to take care of!

Publisher Client

We'll start by setting up the publisher and its configuration. The publisher's one and only responsibility is to publish tasks via the message broker so that the relay can register them and the worker can execute them.

Begin by creating a new file for the publisher and configuring it. Because the publisher must send tasks via the broker, it will also need the broker configuration.

from tacoq import PublisherClient, BrokerConfig

broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")
publisher = PublisherClient(broker_config=broker_config)

With the publisher application created, we don't need to run an entrypoint. Instead, we will use the object to publish the task.

Let's publish a new task, wait for it to complete, and finally, retrieve the results:

from tacoq import PublisherClient, BrokerConfig
import asyncio

# Configure the publisher
broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")
publisher = PublisherClient(broker_config=broker_config)

async def task_hello_world():
  # Publish the task. It's important to save the object so that we can later
  # retrieve the task's status and results based on its ID.
  task = await publisher.publish_task(
      worker_kind="my_worker_kind",
      task_kind="task_hello_world",
      input_data=TestInputPydanticModel(name="Pedro"),
  )

Our task has now been published and is being worked on. But how do we retrieve the task's status and results?

Relay Client

When the worker is done with the task, it sends the results to the relay, who saves them in the database. The relay can be queried via REST for the task's current state.

To communicate with the relay, we can use the RelayClient class. We can configure it with the Relay's URL.

from tacoq import PublisherClient, BrokerConfig
from tacoq import RelayClient, RelayConfig,PydanticDecoder

# Configure the publisher
broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")
publisher = PublisherClient(broker_config=broker_config)

# Configure the relay
relay_config = RelayConfig(url="http://localhost:3000")
relay_client = RelayClient(relay_config=relay_config)

async def task_hello_world():
  # Publish the task. It's important to save the object so that we can later
  # retrieve the task's status and results based on its ID.
  task = await publisher.publish_task(
      worker_kind="my_worker_kind",
      task_kind="task_hello_world",
      input_data=TestInputPydanticModel(name="Pedro"),
  )

Now, we will use the RelayClient to retrieve the task's results after they are completed and decode the results into the correct object.

from tacoq import PublisherClient, BrokerConfig
from tacoq import RelayClient, RelayConfig,PydanticDecoder
import asyncio

# Configure the publisher
broker_config = BrokerConfig(url="amqp://user:password@localhost:5672")
publisher = PublisherClient(broker_config=broker_config)

# Configure the relay
relay_config = RelayConfig(url="http://localhost:3000")
relay_client = RelayClient(relay_config=relay_config)

async def task_hello_world():
  # Publish the task. It's important to save the object so that we can later
  # retrieve the task's status and results based on its ID.
  task = await publisher.publish_task(
      worker_kind="my_worker_kind",
      task_kind="task_hello_world",
      input_data=TestInputPydanticModel(name="Pedro"),
  )

  # Wait for the task to complete.
  await asyncio.sleep(5)

  # Fetch the current task state based on its ID.
  completed_task = await relay_client.get_task(task.id)

  # Decode the results into the correct result.
  result = completed_task.get_decoded_output_data(
    decoder=PydanticDecoder(TestOutputPydanticModel)
  )

  print(result)

if __name__ == "__main__":
    asyncio.run(task_hello_world())

Congratulations! You've just published, executed, and retrieved a task using TacoQ. You can keep learning more about TacoQ in the Technical Reference section.

Last updated: 4/6/2025

TacoDivision Logo

Frameworks

TacoQ

TacoDocs

WIP

TacoFlow

Soon

TacoBI

Soon

TacoCI

Soon

Taco Plus

Docs Templates

WIP

Early Access

Soon

Priority Support

Soon

BI Templates

Soon

Community

© 2025 Taco Division.
All rights reserved.