Apothic Client Functions and Jobs
Learn when to use remote calls, background jobs, async APIs, retries, batching, execution images, and flexible placement filters.
Apothic Client Functions and Jobs
Most Apothic apps start with one or more @app.function(...) callables. They are the core unit for remote execution.
A minimal function
from apothic import App
app = App("reporting")
@app.function(cpu=1, memory_mb=1024, timeout_s=300)
def summarize(text: str) -> str:
return text[:120]
Core signatures
The function decorator carries both execution settings and packaging settings:
@app.function(
*,
schedule: Cron | Period | None = None,
liv: LivConfig | dict[str, Any] | tuple[Any, ...] | None = None,
gpu: str | list[str] | None = None,
gpu_count: int | None = None,
min_vram_gb: int | None = None,
geolocation: str | list[str] | None = None,
offer_filters: dict[str, dict[str, Any]] | None = None,
max_cost_per_hour_usd: float | None = None,
max_cost_per_tflop_hour_usd: float | None = None,
min_total_flops: float | None = None,
min_gpu_ram_bandwidth_gbps: float | None = None,
min_cpu_cores: float | None = None,
min_cpu_ram_gb: float | None = None,
min_cpu_ghz: float | None = None,
min_disk_bandwidth_mb_s: float | None = None,
min_internet_upload_mbps: float | None = None,
min_internet_download_mbps: float | None = None,
secure_cloud_only: bool | None = None,
cpu: int = 1,
memory_mb: int = 1024,
disk_gb: float | None = None,
timeout_s: int = 600,
retries: int | Retries | None = None,
max_retries: int = 0,
concurrent_requests: int = 1,
max_pending_tasks: int = 100,
max_containers: int = 1,
tasks_per_container: int = 1,
image: Image | None = None,
secrets: list[Secret] | None = None,
volumes: dict[str, StorageMount] | list[StorageMount] | None = None,
)
Once a function is deployed, the lookup and invocation surface is:
RemoteFunction.from_name(
app_name: str,
function_name: str,
*,
client: ControlPlaneClient | None = None,
base_url: str | None = None,
) -> RemoteFunction
fn.remote(*args, timeout_s: float = 600.0, **kwargs) -> Any
fn.spawn(*args, **kwargs) -> Job
fn.remote_gen(*args, timeout_s: float = 600.0, **kwargs)
fn.remote_events(*args, snapshot: bool = False, event_types: list[str] | tuple[str, ...] | None = None, **kwargs)
fn.spawn_map(items: Iterable[Any]) -> list[Job]
fn.map(items: Iterable[Any], *, timeout_s: float = 600.0) -> list[Any]
fn.starmap(items: Iterable[Any], *, timeout_s: float = 600.0) -> list[Any]
fn.for_each(items: Iterable[Any], *, timeout_s: float = 600.0) -> None
The same invocation helpers exist on deployed Function handles and on RemoteFunction lookups.
Once deployed, you can invoke that function in three common ways.
Generated functions with liv
If you want the implementation to be generated from a stub before deploy, add @apothic.liv(...):
import apothic
app = apothic.App("generated-demo")
@apothic.liv(
self_debug=True,
cache="account",
examples=[((" READY ",), {})],
)
@app.function(cpu=1, memory_mb=512, timeout_s=60)
def normalize_status(status: str) -> str:
"""Return exactly `status:` followed by the lowercase input stripped of outer whitespace."""
raise NotImplementedError
The current mode is deploy-time freeze, which means the generated implementation is materialized before deploy and then shipped as ordinary Python.
remote() for request/response calls
Use remote() when you want the result immediately:
result = summarize.remote("long input here")
print(result)
This is the simplest path for short-running work.
spawn() for background work
Use spawn() when you want a job handle first and the result later:
job = summarize.spawn("generate a summary")
print(job.job_id)
print(job.status())
print(job.wait())
Useful methods on the returned Job:
job.status()job.logs()job.wait()job.cancel()job.watch()
Watch execution events
You can watch typed execution events as the job progresses:
job = summarize.spawn("hello")
for event in job.watch(snapshot=True):
print(event.event, event.data)
If you want to create the job and watch it in one step, use remote_events(...):
for event in summarize.remote_events("hello", snapshot=True):
print(event.event, event.data)
This is especially useful for:
- long-running jobs
- interactive UIs
- automation that wants progress before the final result
Async APIs
The SDK also exposes async variants for deploy, invoke, job inspection, and operator flows:
from apothic import App, RemoteFunction
app = App("async-demo")
@app.function()
def infer(value: str) -> str:
return value.upper()
async def run() -> None:
await app.aio.deploy()
remote = RemoteFunction.from_name("async-demo", "infer")
job = await remote.aio.spawn("hello")
print(await job.aio.status())
print(await job.aio.wait())
Use the async surface when you are integrating with FastAPI, Starlette, workers, or any other event-loop-driven application.
Retries
For retryable work, set either a simple retry count or an explicit retry policy:
from apothic import App, Retries
app = App("retry-demo")
@app.function(
retries=Retries(
max_retries=4,
initial_delay=2.0,
backoff_coefficient=2.0,
max_delay=30.0,
)
)
def fetch_record(record_id: str) -> dict:
...
Use retries for transient failures such as:
- upstream API timeouts
- temporary capacity gaps
- short-lived dependency outages
Concurrency limits
Use @concurrent(...) when you want tighter control over how much work is admitted at once:
from apothic import App, concurrent
app = App("concurrency-demo")
@app.function()
@concurrent(max_inputs=16, target_inputs=12)
def process(item: str) -> str:
return item.upper()
This is a good fit for:
- model servers with known saturation points
- workloads that are memory-sensitive
- APIs that slow down sharply under too much parallelism
Batch work into one invocation
Use @batched(...) when your function is more efficient on a list of inputs than on single inputs:
from apothic import App, batched
app = App("batch-demo")
@app.function()
@batched(max_batch_size=8, wait_ms=50)
def score_texts(values: list[str]) -> list[int]:
return [len(value) for value in values]
Clients can still submit normal single inputs. The SDK and runtime handle the batched execution path for you.
Placement filters and structured offer_filters
You can start with the named convenience fields:
gpugpu_countmin_vram_gbgeolocationmax_cost_per_hour_usd
When you need range queries or less-common capacity fields, use offer_filters directly:
from apothic import App
app = App("filters-demo")
@app.function(
gpu=["RTX_4070", "RTX_4070S", "RTX_4070_TI", "RTX_3080", "RTX_3060", "RTX_3060_TI"],
gpu_count=1,
min_vram_gb=10,
geolocation=["US", "CA"],
max_cost_per_hour_usd=1.5,
offer_filters={
"num_gpus": {"gte": 1, "lte": 1},
"gpu_ram": {"gte": 10 * 1024, "lte": 24 * 1024},
"verification": {"eq": "verified"},
"cpu_arch": {"eq": "amd64"},
"pci_gen": {"gte": 3},
},
)
def inspect_capacity() -> str:
return "ok"
The operator model accepted by offer_filters is:
eqneqgtltgtelteinnotin
Use the named fields when they are readable enough. Use offer_filters when you need the full structured filter model.
Images, disk, and execution context
Functions can also carry their own execution environment:
from apothic import App, Image
app = App("image-demo")
@app.function(
image=(
Image.debian_slim(python_version="3.11")
.uv_pip_install("numpy", "pandas")
.env(PYTHONUNBUFFERED="1")
.run_commands("mkdir -p /app/cache")
.add_local_python_source("my_package")
),
disk_gb=160,
timeout_s=900,
)
def analyze() -> str:
return "ready"
The current image surface includes:
- slim Python base images
- arbitrary registry images
- custom Dockerfiles
- local files and directories added to build context
- pip, apt, env, workdir, and shell-command configuration
For simple image.base cases, the runtime can execute the base image directly. For richer image specs, it can publish and reuse derived execution images. If a Linux base image does not already contain a suitable Python runtime, the current runtime can fall back to a managed CPython for execution and later reuse an optimized derived image.
Secrets and mounted storage
Jobs can mount storage and consume secrets the same way services do:
from apothic import App, CloudBucketMount, Secret, Volume
app = App("storage-demo")
bucket = CloudBucketMount(
bucket_name="team-datasets",
access_key_secret="APOTHIC_BUCKET_ACCESS_KEY_SECRET",
secret_key_secret="APOTHIC_BUCKET_SECRET_KEY_SECRET",
endpoint_url="https://fly.storage.tigris.dev",
prefix="examples/training",
).mounted_at("/bucket")
@app.function(
secrets=[Secret.from_name("HF_TOKEN")],
volumes=[
Volume.from_name("model-cache", create_if_missing=True, mount_path_default="/cache").mounted_at("/cache"),
bucket,
],
)
def prepare() -> str:
return "mounted"
Reach for:
Volume.from_name(...)orVolume.cloud(...)for portable named storageVolume.from_name(..., create_if_missing=True)when you want the concise reuse-or-create pathVolume.create(..., backend="vast_local")when the fastest persistent path matters more than portabilityCloudBucketMount(...)when you already have a bucket and credentials
vast_local is host-affine after the first placement, and the client currently supports one vast_local volume per resource.
Common options
Frequently used function options include:
livcpumemory_mbtimeout_simagesecretsgpugpu_countmin_vram_gboffer_filtersdisk_gbscheduleretriesservice_startup_timeout_son service-shaped resources
Other useful execution helpers include:
remote_gen(...)spawn_map(...)map(...)starmap(...)for_each(...)
Start with the smallest shape that works, then widen resources only where you have a clear reason.
