Skip to main content
STUB DOCUMENT: This page is intentionally minimal and will be expanded with deeper technical details in a future update.
The cw.data module provides time-aware fusion primitives for edge workers that fuse multiple sensor streams.

data.at() — Interpolated point read

Query a channel at an arbitrary timestamp. The SDK interpolates between the two nearest buffered samples. Returns None for unknown channels. When t falls outside the buffered range, the nearest boundary sample is returned (constant extrapolation).
@cw.on_frame(twin_uuid)
def servoing(frame, ctx):
    joints = cw.data.at("joint_states", t=ctx.timestamp, interpolation="linear")

Interpolation strategies

StrategyUse caseBehavior
"linear"Scalar, vector, dict, numpy, QuaternionElement-wise linear interpolation (NLERP for quaternions)
"slerp"Quaternion channels (attitude, end-effector pose)Spherical linear interpolation
"nearest"Non-numeric or discrete channelsReturns closest sample by time
"none"Exact match onlyReturns None if no exact match
Dict interpolation operates over the union of keys from both samples and emits a UserWarning when the schemas diverge (schema drift detection).

Quaternion type

SLERP only triggers when values are wrapped in Quaternion. Plain list[float] of length 4 will not be SLERPed — this avoids ambiguity with RGBA colors, 4-DOF joint arrays, etc.
from cyberwave.data import Quaternion

cw.data.ingest("orientation", ts, Quaternion(x=0, y=0, z=0.707, w=0.707))
pose = cw.data.at("orientation", t=ctx.timestamp, interpolation="slerp")
# pose is a Quaternion instance
Convention: Hamilton (x, y, z, w) — the same as ROS, MuJoCo, and the Cyberwave wire format.

data.window() — Time-range query

Return all buffered samples within a time range. The returned WindowResult is iterable.
imu_samples = cw.data.window("imu", from_t=prev_frame_ts, to_t=ctx.timestamp)
recent_ft = cw.data.window("force_torque", duration_ms=100)

for sample in recent_ft:
    process(sample.ts, sample.value)
Returns an empty WindowResult for unknown channels.

Use cases

Use casePrimitiveWhy
Force-reactive graspingdata.at(t=ctx.timestamp)Force reading must match arm pose at contact
Conveyor pick-and-placedata.at(t=ctx.timestamp)Object position at detection time projected forward
VIO pre-integrationdata.window(from_t=, to_t=)Full IMU series between camera keyframes
Force filteringdata.window(duration_ms=100)Moving average over recent F/T samples
Jerk estimationdata.window(duration_ms=50)Second derivatives require recent position history

Buffer configuration

Each channel gets a deque-backed ring buffer (default: 1000 samples, O(1) eviction). Configure per-channel depth before the first ingest():
cw.data.configure_channel("imu", capacity=5000)
Calling configure_channel after samples have already been ingested raises ValueError.

Custom clock

FusionLayer accepts an optional clock argument (defaults to time.time). Use this to align with a monotonic or sim-time source:
from cyberwave.data import FusionLayer
import time

fusion = FusionLayer(clock=time.monotonic)