getfluxly (Python)
Python SDK
Server-side ingest from any Python runtime. Mirrors the Node SDK's shape and batching defaults. Sync Client and AsyncClient exposed from the same package.
Install
pip install getfluxly
Supports Python 3.10+. The only runtime dependency is httpx.
Quick start (sync)
from getfluxly import Client
client = Client(token="gflux_secret_yourtoken")
client.track("subscription_started",
external_id="user_42",
properties={"plan": "pro"})
client.identify(external_id="user_42",
traits={"email": "x@y.com", "plan": "pro"})
client.alias(user_id="user_42",
anonymous_id="anon_a8f3c2")
client.flush()
client.shutdown() # also runs from atexit
Quick start (async)
from getfluxly import AsyncClient
async with AsyncClient(token="gflux_secret_yourtoken") as ac:
await ac.track("subscription_started",
external_id="user_42",
properties={"plan": "pro"})
Configuration
All options mirror the Node SDK so that observability across SDKs uses one mental model.
| Option | Default | Notes |
| --- | --- | --- |
| flush_at | 20 | Events queued before forced flush |
| flush_interval | 5.0 | Periodic flush cadence, seconds |
| max_retries | 2 | Per failed batch |
| timeout | 5.0 | Per HTTP request, seconds |
| max_queue_size | 1000 | Hard cap, raises queue_overflow |
Retry-After is honored. Each batch carries a unique X-Idempotency-Key that survives retries. ±25% jitter on exponential backoff.
Errors
from getfluxly import GFluxError
try:
client.track("invoice_paid", external_id="user_42")
except GFluxError as e:
if e.code == "queue_overflow":
# back-pressure; the batch is full
...
elif e.retryable:
# SDK already retried max_retries times
...