Apothic Client Shared State and Coordination
Use queues, dictionaries, watches, claims, leases, locks, compare-and-set, and leader election to coordinate distributed work.
Apothic Client Shared State and Coordination
Queue and Dict let you coordinate distributed workers without building your own control plane.
Use them when you need:
- a work queue
- shared configuration or state
- leader election
- compare-and-set updates
- live change streams
Common signatures
The shared primitives are intentionally small and composable.
Queue
Queue.from_name(name: str) -> Queue
Queue.create(name: str, *, client: ControlPlaneClient | None = None, base_url: str | None = None) -> Queue
Queue.list(*, client: ControlPlaneClient | None = None, base_url: str | None = None) -> list[Queue]
queue.delete(*, client: ControlPlaneClient | None = None, base_url: str | None = None) -> QueueResponse
queue.clear(*, client: ControlPlaneClient | None = None, base_url: str | None = None) -> QueueResponse
queue.length(*, client: ControlPlaneClient | None = None, base_url: str | None = None) -> int
queue.put(value: Any, *, client: ControlPlaneClient | None = None, base_url: str | None = None) -> Any
queue.put_many(values: Iterable[Any], *, client: ControlPlaneClient | None = None, base_url: str | None = None) -> list[Any]
queue.put_work(value: Any, *, max_attempts: int | None = 3, dead_letter_queue: str | None = None, metadata: dict[str, Any] | None = None, client: ControlPlaneClient | None = None, base_url: str | None = None) -> Any
queue.get(*, default: Any = None, timeout_s: float | None = None, poll_interval_s: float = 1.0, client: ControlPlaneClient | None = None, base_url: str | None = None) -> Any
queue.get_many(limit: int, *, timeout_s: float | None = None, poll_interval_s: float = 0.5, client: ControlPlaneClient | None = None, base_url: str | None = None) -> list[Any]
queue.claim(*, claimer: str, lease_s: float = 60.0, timeout_s: float | None = None, poll_interval_s: float = 0.5, client: ControlPlaneClient | None = None, base_url: str | None = None) -> QueueClaim | None
queue.claimed(*, claimer: str, lease_s: float = 60.0, timeout_s: float | None = None, poll_interval_s: float = 0.5, auto_renew: bool = True, renew_interval_s: float | None = None, ack_on_success: bool = True, release_on_error: bool = True, client: ControlPlaneClient | None = None, base_url: str | None = None) -> _QueueClaimContext
queue.claim_work(*, claimer: str, lease_s: float = 60.0, timeout_s: float | None = None, poll_interval_s: float = 0.5, dead_letter_queue: str | None = None, max_attempts: int | None = None, client: ControlPlaneClient | None = None, base_url: str | None = None) -> QueueWorkItem | None
queue.ack(claims: QueueClaim | str | Iterable[QueueClaim | str], *, client: ControlPlaneClient | None = None, base_url: str | None = None) -> QueueResponse
queue.release(claims: QueueClaim | str | Iterable[QueueClaim | str], *, client: ControlPlaneClient | None = None, base_url: str | None = None) -> QueueResponse
queue.renew(claims: QueueClaim | str | Iterable[QueueClaim | str], *, lease_s: float, client: ControlPlaneClient | None = None, base_url: str | None = None) -> QueueResponse
queue.watch(*, last_index: int = -1, timeout_s: float | None = None, poll_interval_s: float = 0.5, snapshot: bool = False, client: ControlPlaneClient | None = None, base_url: str | None = None)
Dict
Dict.from_name(name: str) -> Dict
Dict.create(name: str, *, client: ControlPlaneClient | None = None, base_url: str | None = None) -> Dict
Dict.list(*, client: ControlPlaneClient | None = None, base_url: str | None = None) -> list[Dict]
shared.delete(*, client: ControlPlaneClient | None = None, base_url: str | None = None) -> DictResponse
shared.clear(*, client: ControlPlaneClient | None = None, base_url: str | None = None) -> DictResponse
shared.length(*, client: ControlPlaneClient | None = None, base_url: str | None = None) -> int
shared.put(key: Any, value: Any, *, client: ControlPlaneClient | None = None, base_url: str | None = None) -> Any
shared.update(values: dict[Any, Any] | Iterable[tuple[Any, Any]], *, client: ControlPlaneClient | None = None, base_url: str | None = None) -> dict[Any, Any]
shared.get(key: Any, *, default: Any = None, client: ControlPlaneClient | None = None, base_url: str | None = None) -> Any
shared.get_many(keys: Iterable[Any], *, client: ControlPlaneClient | None = None, base_url: str | None = None) -> dict[Any, Any]
shared.items(*, client: ControlPlaneClient | None = None, base_url: str | None = None) -> list[tuple[Any, Any]]
shared.watch(*, last_index: int = -1, timeout_s: float | None = None, poll_interval_s: float = 0.5, snapshot: bool = False, client: ControlPlaneClient | None = None, base_url: str | None = None)
shared.acquire_lease(key: Any, *, holder: str, lease_s: float = 60.0, client: ControlPlaneClient | None = None, base_url: str | None = None) -> DictLease
shared.lease(key: Any, *, holder: str, lease_s: float = 60.0, auto_renew: bool = True, renew_interval_s: float | None = None, client: ControlPlaneClient | None = None, base_url: str | None = None) -> _DictLeaseContext
shared.release_lease(key: Any, *, lease_id: str, client: ControlPlaneClient | None = None, base_url: str | None = None) -> DictResponse
shared.renew_lease(key: Any, *, lease_id: str, lease_s: float, client: ControlPlaneClient | None = None, base_url: str | None = None) -> DictLease
shared.compare_and_set(key: Any, expected: Any, value: Any, *, lease_id: str | None = None, client: ControlPlaneClient | None = None, base_url: str | None = None) -> DictCompareAndSetResult
shared.locked(key: Any, *, holder: str, lease_s: float = 60.0, wait_timeout_s: float | None = None, poll_interval_s: float = 0.5, default_value: Any = True, auto_renew: bool = True, renew_interval_s: float | None = None, client: ControlPlaneClient | None = None, base_url: str | None = None) -> _DictLockContext
shared.try_become_leader(key: Any, *, holder: str, lease_s: float = 60.0, default_value: Any = True, client: ControlPlaneClient | None = None, base_url: str | None = None) -> DictLock | None
Queue basics
Create a queue and add items:
from apothic import Queue
jobs = Queue.create("image-jobs")
jobs.put({"image_id": "img-1"})
jobs.put_many([{"image_id": "img-2"}, {"image_id": "img-3"}])
Read items back:
item = jobs.get(timeout_s=5.0, poll_interval_s=0.25)
batch = jobs.get_many(10, timeout_s=5.0, poll_interval_s=0.25)
Claim work instead of removing it immediately
Claims are useful when you want retryable work with a lease:
claim = jobs.claim(claimer="worker-a", lease_s=60.0)
if claim is not None:
print(claim.claim_id, claim.value)
jobs.ack([claim])
Renew or release if work takes longer than expected:
jobs.renew([claim], lease_s=120.0)
jobs.release([claim])
Use a context manager for workers
For worker loops, the context manager is usually the cleanest path:
with jobs.claimed(claimer="worker-a", lease_s=60.0) as item:
if item is None:
return
process(item)
By default this will:
- auto-renew while work is still running
- acknowledge on success
- release on error
Retry and dead-letter work
When you want retry metadata to travel with the payload, enqueue work items explicitly:
jobs.put_work(
{"image_id": "img-42"},
max_attempts=3,
dead_letter_queue="image-jobs-dead",
)
Claim and process them:
work = jobs.claim_work(claimer="worker-a", timeout_s=1.0)
if work is not None:
try:
process(work.payload)
work.ack()
except Exception as exc:
work.fail(error=str(exc))
If retries remain, fail() re-enqueues the item. If the retry budget is exhausted, it can move the payload to the dead-letter queue.
Dict basics
Dict is the shared key/value primitive:
from apothic import Dict
shared = Dict.create("shared-state")
shared.put("alpha", {"score": 1})
shared.update({"beta": {"score": 2}})
print(shared.get("alpha"))
print(shared.items())
Watch for changes
Both primitives expose event streams.
Queue:
for event in jobs.watch(snapshot=True, timeout_s=0.0):
print(event.event, event.data)
Dict:
for event in shared.watch(snapshot=True, timeout_s=0.0):
print(event.event, event.data)
snapshot=True prepends the current state before the live stream starts.
Leases and locks on dict keys
Use a lease when one worker needs temporary ownership of a key:
lease = shared.acquire_lease("alpha", holder="worker-a", lease_s=60.0)
shared.renew_lease("alpha", lease_id=lease.lease_id, lease_s=120.0)
shared.release_lease("alpha", lease_id=lease.lease_id)
Use the context manager when you want automatic renewal:
with shared.lease("alpha", holder="worker-a", lease_s=60.0) as lease:
update_record(lease.current_value)
Compare-and-set updates
Use compare-and-set when two workers should not overwrite each other blindly:
result = shared.compare_and_set(
"alpha",
{"score": 1},
{"score": 2},
lease_id=lease.lease_id,
)
if result.updated:
print("update applied")
else:
print("current value:", result.current_value)
Locks and leader election
For higher-level coordination, Dict also supports locks:
with shared.locked("scheduler-lock", holder="node-a", lease_s=30.0):
run_singleton_task()
Or explicit leader election:
leader = shared.try_become_leader("cluster-leader", holder="node-a", lease_s=30.0)
if leader is not None:
try:
run_control_loop()
finally:
leader.release()
Async APIs
If your app is already async, use the async helpers:
item = await jobs.aio.get(timeout_s=5.0, poll_interval_s=0.25)
events = [event async for event in shared.aio.watch(snapshot=True, timeout_s=0.0)]
Pair coordination with storage and sandboxes
These coordination primitives are often most useful when paired with other runtime resources:
- use
Queueto distribute work while a sharedVolumeorCloudBucketMountholds artifacts - use
Dictlocks or leader election to coordinate access to avast_localcache - use sandboxes or stateful classes when a worker needs both shared coordination and sticky local state
Those storage and stateful-resource patterns are covered in the companion guide below.
