getfluxly (Python)

Python SDK

Server-side ingest from any Python runtime. Mirrors the Node SDK's shape and batching defaults. Sync Client and AsyncClient exposed from the same package.

Install

pip install getfluxly

Supports Python 3.10+. The only runtime dependency is httpx.

Quick start (sync)

from getfluxly import Client

client = Client(token="gflux_secret_yourtoken")

client.track("subscription_started",
             external_id="user_42",
             properties={"plan": "pro"})

client.identify(external_id="user_42",
                traits={"email": "x@y.com", "plan": "pro"})

client.alias(user_id="user_42",
             anonymous_id="anon_a8f3c2")

client.flush()
client.shutdown()  # also runs from atexit

Quick start (async)

from getfluxly import AsyncClient

async with AsyncClient(token="gflux_secret_yourtoken") as ac:
    await ac.track("subscription_started",
                   external_id="user_42",
                   properties={"plan": "pro"})

Configuration

All options mirror the Node SDK so that observability across SDKs uses one mental model.

| Option | Default | Notes | | --- | --- | --- | | flush_at | 20 | Events queued before forced flush | | flush_interval | 5.0 | Periodic flush cadence, seconds | | max_retries | 2 | Per failed batch | | timeout | 5.0 | Per HTTP request, seconds | | max_queue_size | 1000 | Hard cap, raises queue_overflow |

Retry-After is honored. Each batch carries a unique X-Idempotency-Key that survives retries. ±25% jitter on exponential backoff.

Errors

from getfluxly import GFluxError

try:
    client.track("invoice_paid", external_id="user_42")
except GFluxError as e:
    if e.code == "queue_overflow":
        # back-pressure; the batch is full
        ...
    elif e.retryable:
        # SDK already retried max_retries times
        ...