taskiq icon indicating copy to clipboard operation
taskiq copied to clipboard

Running taskiq with 1 worker doesn't process any background tasks or scheduled tasks

Open avivansh opened this issue 5 months ago • 0 comments

I am running my taskiq worker as:-

poetry run taskiq worker src.jobs.broker:broker src.jobs -w 1 --no-configure-logging

my broker configuration

"""TaskIQ broker configuration with PostgreSQL backend."""

import logging

from taskiq import TaskiqScheduler
from taskiq_postgresql import PostgresqlBroker, PostgresqlResultBackend
from taskiq_postgresql.scheduler_source import PostgresqlSchedulerSource

from src.config.settings import settings

logger = logging.getLogger(__name__)

# Connection pool configuration to prevent exhausting database connections
# These settings are specific to asyncpg connection pools
POOL_CONFIG = {
    "min_size": 2,  # Minimum number of connections to maintain
    "max_size": 5,  # Maximum number of connections allowed
    "max_inactive_connection_lifetime": 300,  # Close idle connections after 5 minutes
}

# Connection configuration for individual connections (not pools)
# These can be passed to asyncpg.connect() calls
CONNECTION_CONFIG = {
    # Add any connection-specific settings here if needed
    # For example: "server_settings": {"jit": "off"}
}

# Configure result backend for storing task results
result_backend = PostgresqlResultBackend(
    dsn=settings.postgres.url,
    # Result backend manages its own connection pool
)

# Configure broker with asyncpg driver for better async performance
# The broker needs both connection_kwargs (for listen driver) and pool_kwargs (for main pool)
broker = PostgresqlBroker(
    dsn=settings.postgres.url,
    connection_kwargs=CONNECTION_CONFIG,  # For individual connections (listen driver)
    pool_kwargs=POOL_CONFIG,  # For the main connection pool
).with_result_backend(result_backend)

# Configure scheduler source for cron jobs
scheduler_source = PostgresqlSchedulerSource(
    dsn=settings.postgres.url,
    # Scheduler manages its own connections
)

# Create scheduler instance
scheduler = TaskiqScheduler(
    broker=broker,
    sources=[scheduler_source],
)

logger.info("TaskIQ broker configured with PostgreSQL backend")

logs for cron

Started with pid 73437..
[1;38;5;4mcron      [0m | [2025-09-01 08:00:20,055][INFO   ][broker:<module>:54] TaskIQ broker configured with PostgreSQL backend
[1;38;5;4mcron      [0m | [2025-09-01 08:00:20,060][WARNING][base_events:_run_once:1994] Executing <Task pending name='Task-1' coro=<run_scheduler() running at /.cache/venv/lib/python3.12/site-packages/taskiq/cli/scheduler/run.py:266> wait_for=<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/lib/python3.12/asyncio/futures.py:389, Task.task_wakeup()] created at /usr/local/lib/python3.12/asyncio/base_events.py:448> cb=[_run_until_complete_cb() at /usr/local/lib/python3.12/asyncio/base_events.py:181] created at /usr/local/lib/python3.12/asyncio/runners.py:100> took 0.153 seconds
mcron      [0m | [2025-09-01 08:00:20,172][INFO   ][run:run_scheduler:272] Starting scheduler.
[1;38;5;4mcron      [0m | [2025-09-01 08:00:20,479][INFO   ][run:run_scheduler:274] Startup completed.
[1;38;5;4mcron      [0m | [2025-09-01 08:05:01,005][INFO   ][run:delayed_send:138] Sending task src.jobs.cron.example_cron:health_check with schedule_id fbda2152-6039-513e-b903-f1b3eeb26295.

logs for worker

Started with pid 73440..

Can't taskiq be run with 1 worker?

avivansh avatar Sep 01 '25 08:09 avivansh