The Cyberwave Python SDK (cyberwave) is a unified client that wraps both the Cyberwave REST API and MQTT API into a single Python interface. It handles authentication, protocol negotiation, and message serialization so you can interact with digital twins, control physical robots, stream video, manage workflows, and handle alerts, all using Python.
Capability
Protocol
Description
Digital Twin management
REST
Create, query, move, and configure digital twins
Joint control
MQTT
Send real-time joint position commands (degrees or radians)
Position, rotation, scale
MQTT
Update a twin’s 3D pose in the environment
Video streaming
WebRTC + MQTT
Stream camera feeds (standard + RealSense) to the cloud
Frame capture
WebRTC
Grab individual frames as files, numpy arrays, PIL images, or raw bytes
Asset catalog
REST
Search, create, and upload robot assets (including large GLB files)
Workspace and project management
REST
List workspaces, create projects and environments
Edge management
REST
Register, update, and delete edge devices
Workflows
REST
List, trigger, and monitor workflow executions
Alerts
REST
Create, acknowledge, resolve, and manage twin alerts
ML Model catalog
REST
List, get, and delete ML model records in your workspace
ML inference
Local / Cloud
Load a model and run .predict() on images at the edge or in the cloud
For camera streaming, install the optional extras:
Standard Cameras (OpenCV)
Intel RealSense (RGB + Depth)
Host Microphones
pip install cyberwave[camera]
pip install cyberwave[realsense]
On ARM64/Raspberry Pi, you’ll also need the librealsense SDK installed on your system. On x86_64, install it via sudo apt install librealsense2 or use pre-built wheels. On Raspberry Pi OS (ARM64), you must build librealsense from source.
pip install cyberwave[microphone]
Use this extra for direct host microphone capture helpers based on sounddevice, WebRTC audio (aiortc/av), and Linux hotplug monitoring via pyudev.
Use cw.affect() to decide whether high-level robot actions should impact the simulation or the live robot.
from cyberwave import Cyberwavecw = Cyberwave()cw.affect("simulation")rover = cw.twin("unitree/go2")rover.move_forward(distance=1.0) # moves the digital twincw.affect("live")rover.move_forward(distance=1.0) # moves the physical robot
Accepted values are:
Value
Effect
"simulation"
Sends high-level actions to the simulator and updates the digital twin in the Cyberwave environment
"live"
Sends high-level actions to the live robot through Cyberwave’s live control path
"real-world" is also accepted as an alias for "live".affect() is chainable, so you can also write cw.affect("simulation").twin("unitree/go2").
cw.twin() is the primary method for working with digital twins. Pass an asset registry ID (vendor/model) to create or retrieve a twin:
robot = cw.twin("the-robot-studio/so101")
This method:
Queries the Asset Catalog (REST) to resolve the asset by registry ID
Creates a new twin instance in your environment, or retrieves an existing one
Returns a capability-specific Twin class based on the asset’s metadata
You can also target a specific environment or retrieve a twin by UUID or unified slug:
# By UUIDrobot = cw.twin("the-robot-studio/so101", environment_id="YOUR_ENVIRONMENT_UUID")twin = cw.twin(twin_id="YOUR_TWIN_UUID")# By unified slugrobot = cw.twin("acme/catalog/so101", environment_id="acme/envs/production")twin = cw.twin(twin_id="acme/twins/my-arm")
Slugs and UUIDs are interchangeable wherever an identifier is accepted. Access an entity’s slug via the .slug property (e.g. twin.slug).If no default environment is configured (via CYBERWAVE_ENVIRONMENT_ID or environment_id parameter), the SDK auto-creates a “Quickstart Environment” and places the twin there.
Send real-time joint commands to a robot arm or any articulated twin. You can use degrees or radians:
import mathcw.affect("simulation")robot = cw.twin("the-robot-studio/so101")robot.joints.set("shoulder_pan", 45, degrees=True)robot.joints.set("elbow_flex", math.pi/4, degrees=False)angle = robot.joints.get("shoulder_pan")joint_names = robot.joints.list()# Get all joint states as a dict {name: radians}all_joints = robot.joints.get_all()# Print a human-readable table of all joint states (radians + degrees)robot.joints.print_joint_states()
print_joint_states() fetches the latest state from the server and prints a formatted table:
Joint states for twin <twin-uuid>:------------------------------------------------------Joint Radians Degrees------------------------------------------------------elbow_flex 0.0000 rad 0.00 °shoulder_pan 0.7854 rad 45.00 °------------------------------------------------------
Method
Description
joints.set(name, value, degrees=True)
Set a joint to a target position
joints.get(name)
Get the current position of a specific joint (radians)
joints.list()
List all joint names for the twin
joints.get_all()
Get all joint states as a dict[name, radians]
joints.print_joint_states()
Print all joint states in a human-readable table (radians + degrees)
There’s also a camera namespace with convenience methods:
frame = robot.camera.read() # numpy array (default)path = robot.camera.snapshot() # save JPEG to temp filepath = robot.camera.snapshot("out.jpg") # save to a specific path
The fingerprint is a stable hardware identifier derived from hostname, OS, architecture, and MAC address. The Edge Core generates and persists it automatically at ~/.cyberwave/fingerprint.json on first boot.
List, trigger, and monitor workflows programmatically:
cw = Cyberwave()for wf in cw.workflows.list(): print(f"{wf.name} ({wf.slug}) — {wf.status}")# Trigger by UUID or unified slugrun = cw.workflows.trigger( "acme/workflows/pick-and-place", inputs={"target_position": [1.0, 2.0, 0.0], "speed": 0.5},)run.wait(timeout=60)print(run.status, run.result)
You can also start from a Workflow object:
# Get by UUID or slugwf = cw.workflows.get("acme/workflows/pick-and-place")run = wf.trigger(inputs={"speed": 1.0})run.wait()# Dedicated slug lookupwf = cw.workflows.get_by_slug("acme/workflows/pick-and-place")
List and filter past runs:
runs = cw.workflow_runs.list(workflow_id="workflow-uuid", status="error")for r in runs: print(r.uuid, r.status, r.error)
Check if a workflow is currently running:
wf = cw.workflows.get("workflow-uuid")if wf.is_running(): print("Workflow is currently executing")# Or using the manager directlyif cw.workflows.is_running("workflow-uuid"): print("Workflow is currently executing")
is_running() returns True when any run has status running, waiting, or requested.
The SDK exposes typed agent namespaces under cw.agents. stub: use direct resource APIs for deterministic commands, and agent APIs when you want backend planning, previews, setup guidance, or explicit dispatch.
cw.agents.environment: environment editor agent messages and agent-created environments.
cw.agents.workflow: workflow planning, preview, setup-and-draft, and constrained workflow edits.
cw.agents.control: control surfaces, route/action planning, route resolution, and explicit dispatch.
cw.agents.embodiment: server-built embodiment context for an environment or twin.
cw.control is a convenience alias for cw.agents.control.Use cw.agents.control.surfaces(...) to inspect metadata-derived twin controls, then cw.agents.control.plan(...) or cw.agents.control.resolve_route(...) to get a plan. Dispatch one selected action with cw.control.dispatch(..., confirmed=True) and monitor it with cw.actions.get_status(...) or cw.actions.wait(...).Relative movement is available through the same navigation surface: twin.navigation.relative_move([-1, 0, 0], frame="body", metadata={"source": "control_agent"}) moves backward one meter in the robot body frame.
surfaces = cw.agents.control.surfaces("environment-uuid")print(surfaces[0]["controls"])plan = cw.agents.control.plan( "environment-uuid", "Move the Go2 to Waypoint A", twin_uuid="twin-uuid", mode="simulation",)action = cw.control.dispatch("environment-uuid", plan["dispatchable_actions"][0], confirmed=True)cw.actions.wait(action["action_id"], twin_uuid="twin-uuid", timeout=60)
draft = cw.agents.workflow.plan( "environment-uuid", "inspect every pallet and alert if damage is detected",)preview = cw.agents.workflow.preview( "environment-uuid", "inspect every pallet and alert if damage is detected",)context = cw.agents.embodiment.context( "environment-uuid", twin_uuid="twin-uuid",)
Create, manage, and respond to alerts on a twin. Alerts notify operators that action is needed (e.g., a robot needs calibration or a sensor reading is out of range).
for a in twin.alerts.list(status="active"): print(a.name, a.severity, a.status)alert.acknowledge()alert.resolve()alert.silence()alert.update(severity="critical")alert.delete()
To bypass backend deduplication and always create a new alert:
# Import from HuggingFace.# Idempotent by default: if the same repo was already imported it is reused.# Pass reuse_existing=False to force a fresh import.ds = cw.datasets.add("lerobot/pusht", name="pusht")# Import with specific revision / subsetds = cw.datasets.add( "lerobot/aloha_sim_insertion_human", name="aloha-insertion", hf_revision="main", hf_subset="default",)# Upload a local dataset (directory or zip)ds = cw.datasets.add("./my_lerobot_dataset", name="my-dataset")# List, get, deletedatasets = cw.datasets.list(limit=20, processing_status="completed")ds = cw.datasets.get("dataset-uuid")cw.datasets.delete("dataset-uuid")# Get the frontend URL (does not print — caller decides what to do with it)url = cw.datasets.visualize(ds)print(url) # → https://cyberwave.com/acme/datasets/pusht# Wait until the async HuggingFace import completes.# Default on_poll prints one status line per poll; pass on_poll=None to silence.ds = cw.datasets.wait_until_ready(ds)# With custom callback:ds = cw.datasets.wait_until_ready( ds, poll_interval=5.0, timeout=1800, on_poll=lambda d: print(f"{d.processing_status} {d.processed_episodes}/{d.total_episodes}"),)# Fully silent (for libraries / production):ds = cw.datasets.wait_until_ready(ds, on_poll=None)
Both calls are idempotent — if a conversion artifact already exists it is returned immediately; otherwise conversion is kicked off automatically.
# Block until backend conversion is done; returns the signed download URL.# Default on_poll prints one status line per poll; pass on_poll=None to silence.url = cw.datasets.convert(ds, "rlds")print(url) # signed URL valid for 24 h# Convert AND stream the zip to disk in one call.path = cw.datasets.download(ds, "rlds", dest="./data")print(path) # absolute path to saved file# Silence progress output:url = cw.datasets.convert(ds, "rlds", on_poll=None)path = cw.datasets.download(ds, "rlds", dest="./data", on_poll=None)
format
Description
parquet
Cyberwave joined-parquet zip (native)
lerobot3
LeRobot v3 — recommended for LeRobot training pipelines
# All models visible in your workspacefor m in cw.models.list(): print(f"{m.slug:55s} {m.deployment} sdk_load_id={m.sdk_load_id}")# Filter by deployment target (server-side)edge_models = cw.models.list(deployment="edge")cloud_models = cw.models.list(deployment="cloud")# Client-side shorthands (ANDed): e.g. edge + image-capable rowsedge_vision = cw.models.list(filters=["edge", "image"])public_models = cw.models.list_public()# Get a single recordm = cw.models.get("acme/models/yolov8-nano")m = cw.models.get_by_uuid("3f2a1b4c-…")
from PIL import Image# Pass a catalog entry directly — SDK resolves the right load keyentry = cw.models.list(deployment="edge")[0]model = cw.models.load(entry)# Or pass a string (filename, slug, or UUID)model = cw.models.load("yolo26n.pt")pred = model.predict(Image.open("frame.jpg").convert("RGB"), confidence=0.25)print(pred.describe())for d in pred: print(d.label, d.confidence, d.bbox)