Apothic Client Shared State and Coordination

Use queues, dictionaries, watches, claims, leases, locks, compare-and-set, and leader election to coordinate distributed work.

Last updated: 4/21/2026
API Version: v0.1.0
apothic-clientqueuedictcoordination

Apothic Client Shared State and Coordination#

Queue and Dict let you coordinate distributed workers without building your own control plane.

Use them when you need:

  • a work queue
  • shared configuration or state
  • leader election
  • compare-and-set updates
  • live change streams

Common signatures#

The shared primitives are intentionally small and composable.

Queue#

Queue.from_name(name: str) -> Queue
Queue.create(name: str, *, client: ControlPlaneClient | None = None, base_url: str | None = None) -> Queue
Queue.list(*, client: ControlPlaneClient | None = None, base_url: str | None = None) -> list[Queue]
queue.delete(*, client: ControlPlaneClient | None = None, base_url: str | None = None) -> QueueResponse
queue.clear(*, client: ControlPlaneClient | None = None, base_url: str | None = None) -> QueueResponse
queue.length(*, client: ControlPlaneClient | None = None, base_url: str | None = None) -> int
queue.put(value: Any, *, client: ControlPlaneClient | None = None, base_url: str | None = None) -> Any
queue.put_many(values: Iterable[Any], *, client: ControlPlaneClient | None = None, base_url: str | None = None) -> list[Any]
queue.put_work(value: Any, *, max_attempts: int | None = 3, dead_letter_queue: str | None = None, metadata: dict[str, Any] | None = None, client: ControlPlaneClient | None = None, base_url: str | None = None) -> Any
queue.get(*, default: Any = None, timeout_s: float | None = None, poll_interval_s: float = 1.0, client: ControlPlaneClient | None = None, base_url: str | None = None) -> Any
queue.get_many(limit: int, *, timeout_s: float | None = None, poll_interval_s: float = 0.5, client: ControlPlaneClient | None = None, base_url: str | None = None) -> list[Any]
queue.claim(*, claimer: str, lease_s: float = 60.0, timeout_s: float | None = None, poll_interval_s: float = 0.5, client: ControlPlaneClient | None = None, base_url: str | None = None) -> QueueClaim | None
queue.claimed(*, claimer: str, lease_s: float = 60.0, timeout_s: float | None = None, poll_interval_s: float = 0.5, auto_renew: bool = True, renew_interval_s: float | None = None, ack_on_success: bool = True, release_on_error: bool = True, client: ControlPlaneClient | None = None, base_url: str | None = None) -> _QueueClaimContext
queue.claim_work(*, claimer: str, lease_s: float = 60.0, timeout_s: float | None = None, poll_interval_s: float = 0.5, dead_letter_queue: str | None = None, max_attempts: int | None = None, client: ControlPlaneClient | None = None, base_url: str | None = None) -> QueueWorkItem | None
queue.ack(claims: QueueClaim | str | Iterable[QueueClaim | str], *, client: ControlPlaneClient | None = None, base_url: str | None = None) -> QueueResponse
queue.release(claims: QueueClaim | str | Iterable[QueueClaim | str], *, client: ControlPlaneClient | None = None, base_url: str | None = None) -> QueueResponse
queue.renew(claims: QueueClaim | str | Iterable[QueueClaim | str], *, lease_s: float, client: ControlPlaneClient | None = None, base_url: str | None = None) -> QueueResponse
queue.watch(*, last_index: int = -1, timeout_s: float | None = None, poll_interval_s: float = 0.5, snapshot: bool = False, client: ControlPlaneClient | None = None, base_url: str | None = None)

Dict#

Dict.from_name(name: str) -> Dict
Dict.create(name: str, *, client: ControlPlaneClient | None = None, base_url: str | None = None) -> Dict
Dict.list(*, client: ControlPlaneClient | None = None, base_url: str | None = None) -> list[Dict]
shared.delete(*, client: ControlPlaneClient | None = None, base_url: str | None = None) -> DictResponse
shared.clear(*, client: ControlPlaneClient | None = None, base_url: str | None = None) -> DictResponse
shared.length(*, client: ControlPlaneClient | None = None, base_url: str | None = None) -> int
shared.put(key: Any, value: Any, *, client: ControlPlaneClient | None = None, base_url: str | None = None) -> Any
shared.update(values: dict[Any, Any] | Iterable[tuple[Any, Any]], *, client: ControlPlaneClient | None = None, base_url: str | None = None) -> dict[Any, Any]
shared.get(key: Any, *, default: Any = None, client: ControlPlaneClient | None = None, base_url: str | None = None) -> Any
shared.get_many(keys: Iterable[Any], *, client: ControlPlaneClient | None = None, base_url: str | None = None) -> dict[Any, Any]
shared.items(*, client: ControlPlaneClient | None = None, base_url: str | None = None) -> list[tuple[Any, Any]]
shared.watch(*, last_index: int = -1, timeout_s: float | None = None, poll_interval_s: float = 0.5, snapshot: bool = False, client: ControlPlaneClient | None = None, base_url: str | None = None)
shared.acquire_lease(key: Any, *, holder: str, lease_s: float = 60.0, client: ControlPlaneClient | None = None, base_url: str | None = None) -> DictLease
shared.lease(key: Any, *, holder: str, lease_s: float = 60.0, auto_renew: bool = True, renew_interval_s: float | None = None, client: ControlPlaneClient | None = None, base_url: str | None = None) -> _DictLeaseContext
shared.release_lease(key: Any, *, lease_id: str, client: ControlPlaneClient | None = None, base_url: str | None = None) -> DictResponse
shared.renew_lease(key: Any, *, lease_id: str, lease_s: float, client: ControlPlaneClient | None = None, base_url: str | None = None) -> DictLease
shared.compare_and_set(key: Any, expected: Any, value: Any, *, lease_id: str | None = None, client: ControlPlaneClient | None = None, base_url: str | None = None) -> DictCompareAndSetResult
shared.locked(key: Any, *, holder: str, lease_s: float = 60.0, wait_timeout_s: float | None = None, poll_interval_s: float = 0.5, default_value: Any = True, auto_renew: bool = True, renew_interval_s: float | None = None, client: ControlPlaneClient | None = None, base_url: str | None = None) -> _DictLockContext
shared.try_become_leader(key: Any, *, holder: str, lease_s: float = 60.0, default_value: Any = True, client: ControlPlaneClient | None = None, base_url: str | None = None) -> DictLock | None

Queue basics#

Create a queue and add items:

from apothic import Queue

jobs = Queue.create("image-jobs")
jobs.put({"image_id": "img-1"})
jobs.put_many([{"image_id": "img-2"}, {"image_id": "img-3"}])

Read items back:

item = jobs.get(timeout_s=5.0, poll_interval_s=0.25)
batch = jobs.get_many(10, timeout_s=5.0, poll_interval_s=0.25)

Claim work instead of removing it immediately#

Claims are useful when you want retryable work with a lease:

claim = jobs.claim(claimer="worker-a", lease_s=60.0)
if claim is not None:
    print(claim.claim_id, claim.value)
    jobs.ack([claim])

Renew or release if work takes longer than expected:

jobs.renew([claim], lease_s=120.0)
jobs.release([claim])

Use a context manager for workers#

For worker loops, the context manager is usually the cleanest path:

with jobs.claimed(claimer="worker-a", lease_s=60.0) as item:
    if item is None:
        return
    process(item)

By default this will:

  • auto-renew while work is still running
  • acknowledge on success
  • release on error

Retry and dead-letter work#

When you want retry metadata to travel with the payload, enqueue work items explicitly:

jobs.put_work(
    {"image_id": "img-42"},
    max_attempts=3,
    dead_letter_queue="image-jobs-dead",
)

Claim and process them:

work = jobs.claim_work(claimer="worker-a", timeout_s=1.0)
if work is not None:
    try:
        process(work.payload)
        work.ack()
    except Exception as exc:
        work.fail(error=str(exc))

If retries remain, fail() re-enqueues the item. If the retry budget is exhausted, it can move the payload to the dead-letter queue.

Dict basics#

Dict is the shared key/value primitive:

from apothic import Dict

shared = Dict.create("shared-state")
shared.put("alpha", {"score": 1})
shared.update({"beta": {"score": 2}})
print(shared.get("alpha"))
print(shared.items())

Watch for changes#

Both primitives expose event streams.

Queue:

for event in jobs.watch(snapshot=True, timeout_s=0.0):
    print(event.event, event.data)

Dict:

for event in shared.watch(snapshot=True, timeout_s=0.0):
    print(event.event, event.data)

snapshot=True prepends the current state before the live stream starts.

Leases and locks on dict keys#

Use a lease when one worker needs temporary ownership of a key:

lease = shared.acquire_lease("alpha", holder="worker-a", lease_s=60.0)
shared.renew_lease("alpha", lease_id=lease.lease_id, lease_s=120.0)
shared.release_lease("alpha", lease_id=lease.lease_id)

Use the context manager when you want automatic renewal:

with shared.lease("alpha", holder="worker-a", lease_s=60.0) as lease:
    update_record(lease.current_value)

Compare-and-set updates#

Use compare-and-set when two workers should not overwrite each other blindly:

result = shared.compare_and_set(
    "alpha",
    {"score": 1},
    {"score": 2},
    lease_id=lease.lease_id,
)

if result.updated:
    print("update applied")
else:
    print("current value:", result.current_value)

Locks and leader election#

For higher-level coordination, Dict also supports locks:

with shared.locked("scheduler-lock", holder="node-a", lease_s=30.0):
    run_singleton_task()

Or explicit leader election:

leader = shared.try_become_leader("cluster-leader", holder="node-a", lease_s=30.0)
if leader is not None:
    try:
        run_control_loop()
    finally:
        leader.release()

Async APIs#

If your app is already async, use the async helpers:

item = await jobs.aio.get(timeout_s=5.0, poll_interval_s=0.25)
events = [event async for event in shared.aio.watch(snapshot=True, timeout_s=0.0)]

Pair coordination with storage and sandboxes#

These coordination primitives are often most useful when paired with other runtime resources:

  • use Queue to distribute work while a shared Volume or CloudBucketMount holds artifacts
  • use Dict locks or leader election to coordinate access to a vast_local cache
  • use sandboxes or stateful classes when a worker needs both shared coordination and sticky local state

Those storage and stateful-resource patterns are covered in the companion guide below.

What to learn next#