> ## Documentation Index
> Fetch the complete documentation index at: https://docs.cyberwave.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Edge Workers

> Hook-based worker modules for on-device ML inference and event-driven processing at the edge.

<Warning>
  **STUB DOCUMENT:** This page is intentionally minimal and will be expanded with deeper technical details in a future update.
</Warning>

## Overview

Edge workers are Python modules that run inside worker containers on edge devices. Each worker declares **hooks** (callbacks for sensor data) and **model loading** using the `cw` client—no imports needed.

**Architecture:** one edge device → one worker container → one runtime → many worker modules.

## Quick Example

```python theme={null}
# /app/workers/detect_people.py — loaded by the runtime automatically

model = cw.models.load("yolov8n")
twin_uuid = cw.config.twin_uuid

@cw.on_frame(twin_uuid, sensor="front")
def detect_people(frame, ctx):
    results = model.predict(frame, classes=["person"], confidence=0.5)
    for det in results:
        if det.area_ratio > 0.3:
            cw.publish_event(twin_uuid, "person_too_close", {
                "detections": len(results),
                "frame_ts": ctx.timestamp,
            })
```

## Hook Decorators

Register callbacks for sensor streams. Hooks are passive at import time—the runtime activates them.

| Decorator                              | Channel                                       | Sensor kwarg |
| -------------------------------------- | --------------------------------------------- | ------------ |
| `@cw.on_frame(uuid, sensor=None)`      | `frames/{sensor}` (wildcard when omitted)     | `sensor`     |
| `@cw.on_depth(uuid, sensor=None)`      | `depth/{sensor}` (wildcard when omitted)      | `sensor`     |
| `@cw.on_audio(uuid, sensor=None)`      | `audio/{sensor}` (wildcard when omitted)      | `sensor`     |
| `@cw.on_pointcloud(uuid, sensor=None)` | `pointcloud/{sensor}` (wildcard when omitted) | `sensor`     |
| `@cw.on_lidar(uuid, sensor=None)`      | `lidar/{sensor}` (wildcard when omitted)      | `sensor`     |
| `@cw.on_imu(uuid)`                     | `imu`                                         | —            |
| `@cw.on_joint_states(uuid)`            | `joint_states`                                | —            |
| `@cw.on_gps(uuid)`                     | `gps`                                         | —            |
| `@cw.on_battery(uuid)`                 | `battery`                                     | —            |
| `@cw.on_data(uuid, channel)`           | custom                                        | —            |

All callbacks receive `(sample_payload, ctx)` where `ctx` is a `HookContext` with `timestamp`, `channel`, `sensor_name`, `twin_uuid`, and `metadata`.

<Note>
  Passing `sensor=None` (or omitting it) subscribes to every sensor of that
  type on the twin — the SDK constructs a `frames/**` (or `depth/**`, etc.)
  wildcard key expression. `ctx.sensor_name` is populated from the observed
  publish key so a single handler can disambiguate multi-sensor twins.
  Pin `sensor="<name>"` (e.g. `"color_camera"`) when you need to target one
  specific sensor — the name must match what the twin asset declares.
</Note>

## Model Loading

```python theme={null}
model = cw.models.load("yolov8n")                    # auto-detect runtime
model = cw.models.load("my-model", runtime="ultralytics", device="cuda:0")
result = model.predict(frame, confidence=0.5, classes=["person"])
```

`cw.models.load()` caches models — safe to call at module level.

## Publishing Events

```python theme={null}
cw.publish_event(twin_uuid, "event_type", {"key": "value"})
```

Publishes to `cyberwave/twin/{uuid}/event` via MQTT. Payload matches the backend `mqtt_consumer.handle_business_event()` schema.

## Configuration

| Env var                  | Purpose                                                                                                                                                                                                                                                                         |
| ------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `CYBERWAVE_TWIN_UUID`    | Twin UUID, available as `cw.config.twin_uuid`                                                                                                                                                                                                                                   |
| `CYBERWAVE_WORKERS_DIR`  | Workers directory (default: `/app/workers`)                                                                                                                                                                                                                                     |
| `CYBERWAVE_MODEL_DIR`    | Model weights directory (default: `/app/models`)                                                                                                                                                                                                                                |
| `CYBERWAVE_MODEL_DEVICE` | Default inference device. Unset = auto-detect with a cuDNN probe (falls back to `cpu` if CUDA is present but cuDNN can't execute a conv2d — e.g. Pascal/sm\_61 on cuDNN 9, or very new archs not in the torch build). Set to `cuda:0` to force GPU, or `cpu` to skip the probe. |

## Runtime Entrypoint

Worker modules never call `cw.run_edge_workers()` themselves. The container entrypoint does:

```python theme={null}
from cyberwave import Cyberwave
cw = Cyberwave(api_key=...)
cw.run_edge_workers()  # loads workers, wires hooks, blocks
```

## Lifecycle

Edge Core starts the worker container only when at least one workflow is active for the connected twins (i.e. the latest sync produced one or more `wf_*.py` files in `{config_dir}/workers/`). When no active workflow exists, the `cyberwaveos/edge-ml-worker` image is not pulled at all.

Workflow activations and deactivations are picked up automatically:

* **At startup:** after the workflow sync step, the worker container is started if files exist; otherwise startup logs `No active workflows for connected twins; worker container not started` and proceeds.
* **At runtime:** every \~5 minutes the workers directory is re-synced; the worker container is started when files appear and stopped when files disappear. Both transitions are idempotent.
* **Immediately on activate:** when a `run_on_edge` workflow is activated, the backend publishes a `sync_workflows` MQTT command on each referenced twin's command topic; edge core runs an immediate sync and lifecycle reconcile so the new `wf_*.py` lands within seconds.
* **Immediately on remove (deactivate / soft-delete / `run_on_edge` flip off):** the backend publishes a surgical `remove_workflow_worker` MQTT command naming the specific worker filename(s); edge core unlinks the file from `{config_dir}/workers/` and immediately reconciles the container lifecycle. The same nudge also fires when a workflow is removed as part of a workspace or environment teardown. Removal propagates within seconds instead of waiting up to \~5 minutes for the periodic sync plus the bulk-sync two-strikes cleanup. The periodic reconcile remains the correctness backstop.

`cyberwave workflow sync <twin_uuid>` from the CLI continues to trigger an immediate sync via MQTT, which reuses the same lifecycle path on the next reconcile.
