STUB DOCUMENT: This page is intentionally minimal and will be expanded with deeper technical details in a future update.
@cw.on_synchronized is an approximate time synchronizer for edge workers that need to fuse data from multiple sensors with temporal guarantees.
Usage
@cw.on_synchronized(twin_uuid, ["frames/front", "depth/default", "joint_states"], tolerance_ms=50)
def detect_collision(samples, ctx):
frame = samples["frames/front"].payload
depth = samples["depth/default"].payload
joints = samples["joint_states"].payload
# All three are within 50ms of each other
Callback signature
def handler(samples: dict[str, Sample], ctx: HookContext) -> None: ...
| Parameter | Type | Description |
|---|
samples | dict[str, Sample] | Dict keyed by channel name; values are Sample objects |
ctx.timestamp | float | Max of all sample timestamps |
ctx.metadata | dict | Contains synchronized_channels list |
How it works
- For each channel in the list, a separate data-layer subscription is created.
- A shared
latest_samples buffer (one entry per channel) is maintained under a lock.
- On each incoming sample the buffer is updated, then checked: if all channels are present and
max(timestamps) - min(timestamps) <= tolerance_ms / 1000, the callback fires with a copy of the buffer.
- Exceptions in the callback are logged; the runtime continues.
Comparison with latest(max_age_ms=)
| Approach | Guarantee | Use case |
|---|
cw.data.latest(ch, max_age_ms=50) | Single-channel staleness check | Pull-based fusion inside @cw.on_frame |
@cw.on_synchronized(channels, tolerance_ms=50) | All channels within tolerance | Push-based multi-sensor fusion |
Temporal sync roadmap
| Phase | Primitive | Status |
|---|
| 0 | timestamp in every Sample | Done |
| 1 | latest(max_age_ms=) | Done |
| 2 | @cw.on_synchronized() | This feature |
| 3 | cw.data.at(channel, t=, interpolation=) | Planned |
| 4 | cw.data.window(channel, from_t=, to_t=) | Planned |