Skip to main content
STUB DOCUMENT: This page is intentionally minimal and will be expanded with deeper technical details in a future update.
@cw.on_synchronized is an approximate time synchronizer for edge workers that need to fuse data from multiple sensors with temporal guarantees.

Usage

@cw.on_synchronized(twin_uuid, ["frames/front", "depth/default", "joint_states"], tolerance_ms=50)
def detect_collision(samples, ctx):
    frame = samples["frames/front"].payload
    depth = samples["depth/default"].payload
    joints = samples["joint_states"].payload
    # All three are within 50ms of each other

Callback signature

def handler(samples: dict[str, Sample], ctx: HookContext) -> None: ...
ParameterTypeDescription
samplesdict[str, Sample]Dict keyed by channel name; values are Sample objects
ctx.timestampfloatMax of all sample timestamps
ctx.metadatadictContains synchronized_channels list

How it works

  1. For each channel in the list, a separate data-layer subscription is created.
  2. A shared latest_samples buffer (one entry per channel) is maintained under a lock.
  3. On each incoming sample the buffer is updated, then checked: if all channels are present and max(timestamps) - min(timestamps) <= tolerance_ms / 1000, the callback fires with a copy of the buffer.
  4. Exceptions in the callback are logged; the runtime continues.

Comparison with latest(max_age_ms=)

ApproachGuaranteeUse case
cw.data.latest(ch, max_age_ms=50)Single-channel staleness checkPull-based fusion inside @cw.on_frame
@cw.on_synchronized(channels, tolerance_ms=50)All channels within tolerancePush-based multi-sensor fusion

Temporal sync roadmap

PhasePrimitiveStatus
0timestamp in every SampleDone
1latest(max_age_ms=)Done
2@cw.on_synchronized()This feature
3cw.data.at(channel, t=, interpolation=)Planned
4cw.data.window(channel, from_t=, to_t=)Planned