Pinion is a tiny, pluggable job queue and worker for Python. It provides a simple @task
registry, an in-memory queue for quick starts, and a durable SQLite backend for cross-process work, plus a retry policy with exponential backoff.
Condition
coordinationStorage
protocol (SPI) for custom backends@task
decorator (case-insensitive names)PENDING
, RUNNING
, SUCCESS
, FAILED
pip install -e .
pinion
pinion
prints a concise quickstart, admin command cheatsheet, and exits.pinion --demo
runs a tiny in-memory demo (adds 1+2).status
, running
, pending
, dlq-list
, dlq-replay
, enqueue
, worker
.Examples:
# Show queue summary for a DB
pinion status --db pinion.db
# Run a worker for 5s and import your task module for registration
pinion worker --db pinion.db --max-retries 2 --task-timeout 5 \
--import your_project.tasks --run-seconds 5
# Enqueue a job by name with JSON args/kwargs
pinion enqueue add --db pinion.db --args '[1,2]'
pip install mkdocs mkdocs-material
mkdocs serve --config-file docs/mkdocs-material.yml
cd docs
then: mkdocs serve -f mkdocs-material.yml
http://127.0.0.1:8000/
).import threading, time
from pinion import task, Job, InMemoryStorage, Worker, RetryPolicy
@task()
def add(a: int, b: int) -> int:
return a + b
storage = InMemoryStorage()
worker = Worker(storage, retry=RetryPolicy(jitter=False), task_timeout=2.0)
thread = threading.Thread(target=worker.run_forever, daemon=True)
thread.start()
storage.enqueue(Job("add", (1, 2))) # args tuple
storage.enqueue(Job("BOOM")) # case-insensitive lookup (if registered)
time.sleep(2.5)
worker.stop()
thread.join()
# Optional: access basic metrics
print(worker.metrics)
import threading, time
from pinion import task, Job, Worker, RetryPolicy, SqliteStorage # durable backend
@task("boom")
def fail() -> None:
raise ValueError("kaboom")
storage = SqliteStorage("pinion.db")
worker = Worker(storage, retry=RetryPolicy(jitter=False), task_timeout=2.0)
t = threading.Thread(target=worker.run_forever, daemon=True)
t.start()
storage.enqueue(Job("fail"))
time.sleep(4.5)
worker.stop()
t.join()
# Inspect DLQ (SQLite backend) for permanently failed jobs
# rows: (id, func_name, args_json, kwargs_json, attempts, error, failed_at)
print(storage._conn.execute("SELECT * FROM dlq").fetchall())
enqueue
, dequeue
, mark_done
, mark_failed
, size
, heartbeat
, reap_stale
, dead_letter
@task
max_retries
, base_delay
, cap
, optional jitter
worker.metrics
Implement the Storage
protocol to plug in your own backend (e.g., Redis, DB, file-based):
class MyStorage:
def enqueue(self, job: Job) -> None: ...
def dequeue(self, timeout: float | None = None) -> Job | None: ...
def mark_done(self, job: Job) -> None: ...
def mark_failed(self, job: Job, exc: Exception) -> None: ...
def size(self) -> int: ...
def heartbeat(self, job: Job) -> None: ...
def reap_stale(self, visibility_timeout: float) -> int: ...
def dead_letter(self, job: Job, exc: Exception) -> None: ...
dequeue
should block until timeout or a job is available, mark the job RUNNING
, and increment attempts
before returning the job.
InMemoryStorage
uses a Condition
for coordinating producers/consumers.SqliteStorage
uses WAL mode and an atomic claim (BEGIN IMMEDIATE
+ UPDATE
) to safely select a PENDING
job across processes. Access is serialized with a lock.Worker
uses a JobExecution
context manager to mark success/failure and provides a thread-based per-task timeout option.dlq
table).dead_letter(job, exc)
; custom backends must implement it.Pinion targets Python 3.12+ and is published as pinion-queue
.
Suggested versioning for this release: bump to 0.2.0
.
1) Update version
pyproject.toml
: set version = "0.2.0"
.pinion/__init__.py
: set __version__ = "0.2.0"
.2) Build distributions
python -m pip install --upgrade pip build twine
python -m build
twine check dist/*
3) Publish to PyPI
# Set PYPI token in environment (from your PyPI account)
export TWINE_USERNAME="__token__"
export TWINE_PASSWORD="pypi-***your-token***"
twine upload dist/*
4) Tag the release
git tag v0.2.0
git push --tags
If pinion-queue
is not available on PyPI, choose an alternative name or organization namespace.
pinion/queue.py
— core queue, worker, storages, demo taskspinion/cli.py
— simple CLI demo (pinion
)pip install -U pinion-queue
pipx upgrade pinion-queue
poetry update pinion-queue
The CLI performs a lightweight update check (with a short timeout) and prints a hint if a newer version is available. Disable via PINION_NO_UPDATE_CHECK=1
or pinion --no-update-check
.
See the Changelog for release notes: CHANGELOG.md
.
Pinion aims to be a tiny, understandable foundation you can extend with a real storage backend and operational features as needed.