> ## Documentation Index
> Fetch the complete documentation index at: https://docs.cyberwave.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Data Fusion Primitives

> Time-aware sensor fusion: interpolated point reads and time-window queries for multi-sensor edge workers.

<Warning>
  **STUB DOCUMENT:** This page is intentionally minimal and will be expanded with deeper technical details in a future update.
</Warning>

The `cw.data` module provides time-aware fusion primitives for edge workers that fuse multiple sensor streams.

## `data.at()` — Interpolated point read

Query a channel at an arbitrary timestamp. The SDK interpolates between the two nearest buffered samples. Returns `None` for unknown channels. When `t` falls outside the buffered range, the nearest boundary sample is returned (constant extrapolation).

```python theme={null}
@cw.on_frame(twin_uuid)
def servoing(frame, ctx):
    joints = cw.data.at("joint_states", t=ctx.timestamp, interpolation="linear")
```

### Interpolation strategies

| Strategy    | Use case                                            | Behavior                                                  |
| ----------- | --------------------------------------------------- | --------------------------------------------------------- |
| `"linear"`  | Scalar, vector, dict, numpy, `Quaternion`           | Element-wise linear interpolation (NLERP for quaternions) |
| `"slerp"`   | `Quaternion` channels (attitude, end-effector pose) | Spherical linear interpolation                            |
| `"nearest"` | Non-numeric or discrete channels                    | Returns closest sample by time                            |
| `"none"`    | Exact match only                                    | Returns `None` if no exact match                          |

Dict interpolation operates over the **union** of keys from both samples and emits a `UserWarning` when the schemas diverge (schema drift detection).

### `Quaternion` type

SLERP only triggers when values are wrapped in `Quaternion`. Plain `list[float]` of length 4 will **not** be SLERPed — this avoids ambiguity with RGBA colors, 4-DOF joint arrays, etc.

```python theme={null}
from cyberwave.data import Quaternion

cw.data.ingest("orientation", ts, Quaternion(x=0, y=0, z=0.707, w=0.707))
pose = cw.data.at("orientation", t=ctx.timestamp, interpolation="slerp")
# pose is a Quaternion instance
```

Convention: Hamilton `(x, y, z, w)` — the same as ROS, MuJoCo, and the Cyberwave wire format.

## `data.window()` — Time-range query

Return all buffered samples within a time range. The returned `WindowResult` is iterable.

```python theme={null}
imu_samples = cw.data.window("imu", from_t=prev_frame_ts, to_t=ctx.timestamp)
recent_ft = cw.data.window("force_torque", duration_ms=100)

for sample in recent_ft:
    process(sample.ts, sample.value)
```

Returns an empty `WindowResult` for unknown channels.

## Use cases

| Use case                | Primitive                      | Why                                                 |
| ----------------------- | ------------------------------ | --------------------------------------------------- |
| Force-reactive grasping | `data.at(t=ctx.timestamp)`     | Force reading must match arm pose at contact        |
| Conveyor pick-and-place | `data.at(t=ctx.timestamp)`     | Object position at detection time projected forward |
| VIO pre-integration     | `data.window(from_t=, to_t=)`  | Full IMU series between camera keyframes            |
| Force filtering         | `data.window(duration_ms=100)` | Moving average over recent F/T samples              |
| Jerk estimation         | `data.window(duration_ms=50)`  | Second derivatives require recent position history  |

## Buffer configuration

Each channel gets a `deque`-backed ring buffer (default: 1000 samples, O(1) eviction). Configure per-channel depth before the first `ingest()`:

```python theme={null}
cw.data.configure_channel("imu", capacity=5000)
```

Calling `configure_channel` after samples have already been ingested raises `ValueError`.

## Custom clock

`FusionLayer` accepts an optional `clock` argument (defaults to `time.time`). Use this to align with a monotonic or sim-time source:

```python theme={null}
from cyberwave.data import FusionLayer
import time

fusion = FusionLayer(clock=time.monotonic)
```
