STUB DOCUMENT: This page is intentionally minimal and will be expanded with deeper technical details in a future update.
The cw.data module provides time-aware fusion primitives for edge workers that fuse multiple sensor streams.
data.at() — Interpolated point read
Query a channel at an arbitrary timestamp. The SDK interpolates between the two nearest buffered samples. Returns None for unknown channels. When t falls outside the buffered range, the nearest boundary sample is returned (constant extrapolation).
@cw.on_frame(twin_uuid)
def servoing(frame, ctx):
joints = cw.data.at("joint_states", t=ctx.timestamp, interpolation="linear")
Interpolation strategies
| Strategy | Use case | Behavior |
|---|
"linear" | Scalar, vector, dict, numpy, Quaternion | Element-wise linear interpolation (NLERP for quaternions) |
"slerp" | Quaternion channels (attitude, end-effector pose) | Spherical linear interpolation |
"nearest" | Non-numeric or discrete channels | Returns closest sample by time |
"none" | Exact match only | Returns None if no exact match |
Dict interpolation operates over the union of keys from both samples and emits a UserWarning when the schemas diverge (schema drift detection).
Quaternion type
SLERP only triggers when values are wrapped in Quaternion. Plain list[float] of length 4 will not be SLERPed — this avoids ambiguity with RGBA colors, 4-DOF joint arrays, etc.
from cyberwave.data import Quaternion
cw.data.ingest("orientation", ts, Quaternion(x=0, y=0, z=0.707, w=0.707))
pose = cw.data.at("orientation", t=ctx.timestamp, interpolation="slerp")
# pose is a Quaternion instance
Convention: Hamilton (x, y, z, w) — the same as ROS, MuJoCo, and the Cyberwave wire format.
data.window() — Time-range query
Return all buffered samples within a time range. The returned WindowResult is iterable.
imu_samples = cw.data.window("imu", from_t=prev_frame_ts, to_t=ctx.timestamp)
recent_ft = cw.data.window("force_torque", duration_ms=100)
for sample in recent_ft:
process(sample.ts, sample.value)
Returns an empty WindowResult for unknown channels.
Use cases
| Use case | Primitive | Why |
|---|
| Force-reactive grasping | data.at(t=ctx.timestamp) | Force reading must match arm pose at contact |
| Conveyor pick-and-place | data.at(t=ctx.timestamp) | Object position at detection time projected forward |
| VIO pre-integration | data.window(from_t=, to_t=) | Full IMU series between camera keyframes |
| Force filtering | data.window(duration_ms=100) | Moving average over recent F/T samples |
| Jerk estimation | data.window(duration_ms=50) | Second derivatives require recent position history |
Buffer configuration
Each channel gets a deque-backed ring buffer (default: 1000 samples, O(1) eviction). Configure per-channel depth before the first ingest():
cw.data.configure_channel("imu", capacity=5000)
Calling configure_channel after samples have already been ingested raises ValueError.
Custom clock
FusionLayer accepts an optional clock argument (defaults to time.time). Use this to align with a monotonic or sim-time source:
from cyberwave.data import FusionLayer
import time
fusion = FusionLayer(clock=time.monotonic)