> ## Documentation Index
> Fetch the complete documentation index at: https://docs.cyberwave.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Synchronized Multi-Channel Hooks

> Approximate time synchronizer that fires a callback when samples from all listed channels arrive within a configurable tolerance.

<Warning>
  **STUB DOCUMENT:** This page is intentionally minimal and will be expanded with deeper technical details in a future update.
</Warning>

`@cw.on_synchronized` is an approximate time synchronizer for edge workers that need to fuse data from multiple sensors with temporal guarantees.

## Usage

```python theme={null}
@cw.on_synchronized(twin_uuid, ["frames/front", "depth/default", "joint_states"], tolerance_ms=50)
def detect_collision(samples, ctx):
    frame = samples["frames/front"].payload
    depth = samples["depth/default"].payload
    joints = samples["joint_states"].payload
    # All three are within 50ms of each other
```

## Callback signature

```python theme={null}
def handler(samples: dict[str, Sample], ctx: HookContext) -> None: ...
```

| Parameter       | Type                | Description                                             |
| --------------- | ------------------- | ------------------------------------------------------- |
| `samples`       | `dict[str, Sample]` | Dict keyed by channel name; values are `Sample` objects |
| `ctx.timestamp` | `float`             | Max of all sample timestamps                            |
| `ctx.metadata`  | `dict`              | Contains `synchronized_channels` list                   |

## How it works

1. For each channel in the list, a separate data-layer subscription is created.
2. A shared `latest_samples` buffer (one entry per channel) is maintained under a lock.
3. On each incoming sample the buffer is updated, then checked: if all channels are present **and** `max(timestamps) - min(timestamps) <= tolerance_ms / 1000`, the callback fires with a copy of the buffer.
4. Exceptions in the callback are logged; the runtime continues.

## Cross-twin mode

For multi-camera or cross-device fusion, use `twin_channels` to synchronize channels from different twins:

```python theme={null}
@cw.on_synchronized(
    twin_channels={
        "left": (CAMERA_LEFT, "frames/default"),
        "right": (CAMERA_RIGHT, "frames/default"),
    },
    tolerance_ms=50.0,
)
def on_stereo_pair(samples, ctx):
    left = samples["left"]   # Sample from CAMERA_LEFT twin
    right = samples["right"] # Sample from CAMERA_RIGHT twin
    # ctx.metadata["twin_uuids"] lists the involved twins
```

The single-twin API (`@cw.on_synchronized(twin_uuid, channels)`) continues to work unchanged.

See [Multi-Camera Detection Routing](/edge/drivers/multi-camera-detection-routing) for an end-to-end example.

## Comparison with `latest(max_age_ms=)`

| Approach                                         | Guarantee                      | Use case                                |
| ------------------------------------------------ | ------------------------------ | --------------------------------------- |
| `cw.data.latest(ch, max_age_ms=50)`              | Single-channel staleness check | Pull-based fusion inside `@cw.on_frame` |
| `@cw.on_synchronized(channels, tolerance_ms=50)` | All channels within tolerance  | Push-based multi-sensor fusion          |

## Temporal sync roadmap

| Phase | Primitive                                 | Status           |
| ----- | ----------------------------------------- | ---------------- |
| 0     | `timestamp` in every `Sample`             | Done             |
| 1     | `latest(max_age_ms=)`                     | Done             |
| **2** | **`@cw.on_synchronized()`**               | **This feature** |
| 3     | `cw.data.at(channel, t=, interpolation=)` | Planned          |
| 4     | `cw.data.window(channel, from_t=, to_t=)` | Planned          |
