Apothic Client Functions and Jobs

Learn when to use remote calls, background jobs, async APIs, retries, batching, execution images, and flexible placement filters.

Last updated: 4/23/2026
API Version: v0.1.0
apothic-clientfunctionsjobsasync

Apothic Client Functions and Jobs#

Most Apothic apps start with one or more @app.function(...) callables. They are the core unit for remote execution.

A minimal function#

from apothic import App

app = App("reporting")


@app.function(cpu=1, memory_mb=1024, timeout_s=300)
def summarize(text: str) -> str:
    return text[:120]

Core signatures#

The function decorator carries both execution settings and packaging settings:

@app.function(
    *,
    schedule: Cron | Period | None = None,
    liv: LivConfig | dict[str, Any] | tuple[Any, ...] | None = None,
    gpu: str | list[str] | None = None,
    gpu_count: int | None = None,
    min_vram_gb: int | None = None,
    geolocation: str | list[str] | None = None,
    offer_filters: dict[str, dict[str, Any]] | None = None,
    max_cost_per_hour_usd: float | None = None,
    max_cost_per_tflop_hour_usd: float | None = None,
    min_total_flops: float | None = None,
    min_gpu_ram_bandwidth_gbps: float | None = None,
    min_cpu_cores: float | None = None,
    min_cpu_ram_gb: float | None = None,
    min_cpu_ghz: float | None = None,
    min_disk_bandwidth_mb_s: float | None = None,
    min_internet_upload_mbps: float | None = None,
    min_internet_download_mbps: float | None = None,
    secure_cloud_only: bool | None = None,
    cpu: int = 1,
    memory_mb: int = 1024,
    disk_gb: float | None = None,
    timeout_s: int = 600,
    retries: int | Retries | None = None,
    max_retries: int = 0,
    concurrent_requests: int = 1,
    max_pending_tasks: int = 100,
    max_containers: int = 1,
    tasks_per_container: int = 1,
    image: Image | None = None,
    secrets: list[Secret] | None = None,
    volumes: dict[str, StorageMount] | list[StorageMount] | None = None,
)

Once a function is deployed, the lookup and invocation surface is:

RemoteFunction.from_name(
    app_name: str,
    function_name: str,
    *,
    client: ControlPlaneClient | None = None,
    base_url: str | None = None,
) -> RemoteFunction

fn.remote(*args, timeout_s: float = 600.0, **kwargs) -> Any
fn.spawn(*args, **kwargs) -> Job
fn.remote_gen(*args, timeout_s: float = 600.0, **kwargs)
fn.remote_events(*args, snapshot: bool = False, event_types: list[str] | tuple[str, ...] | None = None, **kwargs)
fn.spawn_map(items: Iterable[Any]) -> list[Job]
fn.map(items: Iterable[Any], *, timeout_s: float = 600.0) -> list[Any]
fn.starmap(items: Iterable[Any], *, timeout_s: float = 600.0) -> list[Any]
fn.for_each(items: Iterable[Any], *, timeout_s: float = 600.0) -> None

The same invocation helpers exist on deployed Function handles and on RemoteFunction lookups.

Once deployed, you can invoke that function in three common ways.

Generated functions with liv#

If you want the implementation to be generated from a stub before deploy, add @apothic.liv(...):

import apothic

app = apothic.App("generated-demo")


@apothic.liv(
    self_debug=True,
    cache="account",
    examples=[((" READY ",), {})],
)
@app.function(cpu=1, memory_mb=512, timeout_s=60)
def normalize_status(status: str) -> str:
    """Return exactly `status:` followed by the lowercase input stripped of outer whitespace."""
    raise NotImplementedError

The current mode is deploy-time freeze, which means the generated implementation is materialized before deploy and then shipped as ordinary Python.

remote() for request/response calls#

Use remote() when you want the result immediately:

result = summarize.remote("long input here")
print(result)

This is the simplest path for short-running work.

spawn() for background work#

Use spawn() when you want a job handle first and the result later:

job = summarize.spawn("generate a summary")
print(job.job_id)
print(job.status())
print(job.wait())

Useful methods on the returned Job:

  • job.status()
  • job.logs()
  • job.wait()
  • job.cancel()
  • job.watch()

Watch execution events#

You can watch typed execution events as the job progresses:

job = summarize.spawn("hello")

for event in job.watch(snapshot=True):
    print(event.event, event.data)

If you want to create the job and watch it in one step, use remote_events(...):

for event in summarize.remote_events("hello", snapshot=True):
    print(event.event, event.data)

This is especially useful for:

  • long-running jobs
  • interactive UIs
  • automation that wants progress before the final result

Async APIs#

The SDK also exposes async variants for deploy, invoke, job inspection, and operator flows:

from apothic import App, RemoteFunction

app = App("async-demo")


@app.function()
def infer(value: str) -> str:
    return value.upper()


async def run() -> None:
    await app.aio.deploy()
    remote = RemoteFunction.from_name("async-demo", "infer")
    job = await remote.aio.spawn("hello")
    print(await job.aio.status())
    print(await job.aio.wait())

Use the async surface when you are integrating with FastAPI, Starlette, workers, or any other event-loop-driven application.

Retries#

For retryable work, set either a simple retry count or an explicit retry policy:

from apothic import App, Retries

app = App("retry-demo")


@app.function(
    retries=Retries(
        max_retries=4,
        initial_delay=2.0,
        backoff_coefficient=2.0,
        max_delay=30.0,
    )
)
def fetch_record(record_id: str) -> dict:
    ...

Use retries for transient failures such as:

  • upstream API timeouts
  • temporary capacity gaps
  • short-lived dependency outages

Concurrency limits#

Use @concurrent(...) when you want tighter control over how much work is admitted at once:

from apothic import App, concurrent

app = App("concurrency-demo")


@app.function()
@concurrent(max_inputs=16, target_inputs=12)
def process(item: str) -> str:
    return item.upper()

This is a good fit for:

  • model servers with known saturation points
  • workloads that are memory-sensitive
  • APIs that slow down sharply under too much parallelism

Batch work into one invocation#

Use @batched(...) when your function is more efficient on a list of inputs than on single inputs:

from apothic import App, batched

app = App("batch-demo")


@app.function()
@batched(max_batch_size=8, wait_ms=50)
def score_texts(values: list[str]) -> list[int]:
    return [len(value) for value in values]

Clients can still submit normal single inputs. The SDK and runtime handle the batched execution path for you.

Placement filters and structured offer_filters#

You can start with the named convenience fields:

  • gpu
  • gpu_count
  • min_vram_gb
  • geolocation
  • max_cost_per_hour_usd

When you need range queries or less-common capacity fields, use offer_filters directly:

from apothic import App

app = App("filters-demo")


@app.function(
    gpu=["RTX_4070", "RTX_4070S", "RTX_4070_TI", "RTX_3080", "RTX_3060", "RTX_3060_TI"],
    gpu_count=1,
    min_vram_gb=10,
    geolocation=["US", "CA"],
    max_cost_per_hour_usd=1.5,
    offer_filters={
        "num_gpus": {"gte": 1, "lte": 1},
        "gpu_ram": {"gte": 10 * 1024, "lte": 24 * 1024},
        "verification": {"eq": "verified"},
        "cpu_arch": {"eq": "amd64"},
        "pci_gen": {"gte": 3},
    },
)
def inspect_capacity() -> str:
    return "ok"

The operator model accepted by offer_filters is:

  • eq
  • neq
  • gt
  • lt
  • gte
  • lte
  • in
  • notin

Use the named fields when they are readable enough. Use offer_filters when you need the full structured filter model.

Images, disk, and execution context#

Functions can also carry their own execution environment:

from apothic import App, Image

app = App("image-demo")


@app.function(
    image=(
        Image.debian_slim(python_version="3.11")
        .uv_pip_install("numpy", "pandas")
        .env(PYTHONUNBUFFERED="1")
        .run_commands("mkdir -p /app/cache")
        .add_local_python_source("my_package")
    ),
    disk_gb=160,
    timeout_s=900,
)
def analyze() -> str:
    return "ready"

The current image surface includes:

  • slim Python base images
  • arbitrary registry images
  • custom Dockerfiles
  • local files and directories added to build context
  • pip, apt, env, workdir, and shell-command configuration

For simple image.base cases, the runtime can execute the base image directly. For richer image specs, it can publish and reuse derived execution images. If a Linux base image does not already contain a suitable Python runtime, the current runtime can fall back to a managed CPython for execution and later reuse an optimized derived image.

Secrets and mounted storage#

Jobs can mount storage and consume secrets the same way services do:

from apothic import App, CloudBucketMount, Secret, Volume

app = App("storage-demo")

bucket = CloudBucketMount(
    bucket_name="team-datasets",
    access_key_secret="APOTHIC_BUCKET_ACCESS_KEY_SECRET",
    secret_key_secret="APOTHIC_BUCKET_SECRET_KEY_SECRET",
    endpoint_url="https://fly.storage.tigris.dev",
    prefix="examples/training",
).mounted_at("/bucket")


@app.function(
    secrets=[Secret.from_name("HF_TOKEN")],
    volumes=[
        Volume.from_name("model-cache", create_if_missing=True, mount_path_default="/cache").mounted_at("/cache"),
        bucket,
    ],
)
def prepare() -> str:
    return "mounted"

Reach for:

  • Volume.from_name(...) or Volume.cloud(...) for portable named storage
  • Volume.from_name(..., create_if_missing=True) when you want the concise reuse-or-create path
  • Volume.create(..., backend="vast_local") when the fastest persistent path matters more than portability
  • CloudBucketMount(...) when you already have a bucket and credentials

vast_local is host-affine after the first placement, and the client currently supports one vast_local volume per resource.

Common options#

Frequently used function options include:

  • liv
  • cpu
  • memory_mb
  • timeout_s
  • image
  • secrets
  • gpu
  • gpu_count
  • min_vram_gb
  • offer_filters
  • disk_gb
  • schedule
  • retries
  • service_startup_timeout_s on service-shaped resources

Other useful execution helpers include:

  • remote_gen(...)
  • spawn_map(...)
  • map(...)
  • starmap(...)
  • for_each(...)

Start with the smallest shape that works, then widen resources only where you have a clear reason.

What to learn next#