Pinion

Pinion is a tiny, pluggable job queue and worker for Python. It provides a simple @task registry, an in-memory queue for quick starts, and a durable SQLite backend for cross-process work, plus a retry policy with exponential backoff.

Features

Requirements

Installation

Quick Start

CLI

Examples:

# Show queue summary for a DB
pinion status --db pinion.db

# Run a worker for 5s and import your task module for registration
pinion worker --db pinion.db --max-retries 2 --task-timeout 5 \
  --import your_project.tasks --run-seconds 5

# Enqueue a job by name with JSON args/kwargs
pinion enqueue add --db pinion.db --args '[1,2]'

Documentation

Library usage (in-memory)

import threading, time
from pinion import task, Job, InMemoryStorage, Worker, RetryPolicy

@task()
def add(a: int, b: int) -> int:
    return a + b

storage = InMemoryStorage()
worker = Worker(storage, retry=RetryPolicy(jitter=False), task_timeout=2.0)
thread = threading.Thread(target=worker.run_forever, daemon=True)
thread.start()

storage.enqueue(Job("add", (1, 2)))   # args tuple
storage.enqueue(Job("BOOM"))           # case-insensitive lookup (if registered)

time.sleep(2.5)
worker.stop()
thread.join()
# Optional: access basic metrics
print(worker.metrics)

Library usage (SQLite)

import threading, time
from pinion import task, Job, Worker, RetryPolicy, SqliteStorage  # durable backend

@task("boom")
def fail() -> None:
    raise ValueError("kaboom")

storage = SqliteStorage("pinion.db")
worker = Worker(storage, retry=RetryPolicy(jitter=False), task_timeout=2.0)
t = threading.Thread(target=worker.run_forever, daemon=True)
t.start()

storage.enqueue(Job("fail"))

time.sleep(4.5)
worker.stop()
t.join()

# Inspect DLQ (SQLite backend) for permanently failed jobs
# rows: (id, func_name, args_json, kwargs_json, attempts, error, failed_at)
print(storage._conn.execute("SELECT * FROM dlq").fetchall())

Core Concepts

Extending Storage

Implement the Storage protocol to plug in your own backend (e.g., Redis, DB, file-based):

class MyStorage:
    def enqueue(self, job: Job) -> None: ...
    def dequeue(self, timeout: float | None = None) -> Job | None: ...
    def mark_done(self, job: Job) -> None: ...
    def mark_failed(self, job: Job, exc: Exception) -> None: ...
    def size(self) -> int: ...
    def heartbeat(self, job: Job) -> None: ...
    def reap_stale(self, visibility_timeout: float) -> int: ...
    def dead_letter(self, job: Job, exc: Exception) -> None: ...

dequeue should block until timeout or a job is available, mark the job RUNNING, and increment attempts before returning the job.

Design Notes

Limitations

Breaking Changes (since 0.1.x)

Release and Publishing

Pinion targets Python 3.12+ and is published as pinion-queue.

Suggested versioning for this release: bump to 0.2.0.

1) Update version

2) Build distributions

python -m pip install --upgrade pip build twine
python -m build
twine check dist/*

3) Publish to PyPI

# Set PYPI token in environment (from your PyPI account)
export TWINE_USERNAME="__token__"
export TWINE_PASSWORD="pypi-***your-token***"

twine upload dist/*

4) Tag the release

git tag v0.2.0
git push --tags

If pinion-queue is not available on PyPI, choose an alternative name or organization namespace.

Project Layout

Upgrading

The CLI performs a lightweight update check (with a short timeout) and prints a hint if a newer version is available. Disable via PINION_NO_UPDATE_CHECK=1 or pinion --no-update-check.

See the Changelog for release notes: CHANGELOG.md.


Pinion aims to be a tiny, understandable foundation you can extend with a real storage backend and operational features as needed.