Skip to main content

Cyberwave is in Private Beta.

Request early access to get access to the Cyberwave dashboard.

Overview

The Cyberwave Python SDK (cyberwave) is a unified client that wraps both the Cyberwave REST API and MQTT API into a single Python interface. It handles authentication, protocol negotiation, and message serialization so you can interact with digital twins, control physical robots, stream video, manage workflows, and handle alerts, all using Python.
CapabilityProtocolDescription
Digital Twin managementRESTCreate, query, move, and configure digital twins
Joint controlMQTTSend real-time joint position commands (degrees or radians)
Position, rotation, scaleMQTTUpdate a twin’s 3D pose in the environment
Video streamingWebRTC + MQTTStream camera feeds (standard + RealSense) to the cloud
Frame captureWebRTCGrab individual frames as files, numpy arrays, PIL images, or raw bytes
Asset catalogRESTSearch, create, and upload robot assets (including large GLB files)
Workspace and project managementRESTList workspaces, create projects and environments
Edge managementRESTRegister, update, and delete edge devices
WorkflowsRESTList, trigger, and monitor workflow executions
AlertsRESTCreate, acknowledge, resolve, and manage twin alerts

Installation

pip install cyberwave
For camera streaming, install the optional extras:
pip install cyberwave[camera]
Video streaming also requires FFMPEG:
# macOS
brew install ffmpeg pkg-config

# Ubuntu
sudo apt-get install ffmpeg
Requirements:
  • Python 3.10 or higher
  • A Cyberwave API key (generate one from your Profile page)

Quick Start

from cyberwave import Cyberwave
import math, asyncio

cw = Cyberwave(api_key="your_api_key_here")

# ──────────────────────────────────────────────
# 1. Create a digital twin from the asset catalog
# ──────────────────────────────────────────────

robot = cw.twin("the-robot-studio/so101")

# Target a specific environment (optional)
# robot = cw.twin("the-robot-studio/so101", environment_id="ENV_UUID")

# ──────────────────────────────────────────────
# 2. Position, rotate, and scale the twin
# ──────────────────────────────────────────────

robot.edit_position(x=1.0, y=0.0, z=0.5)
robot.edit_rotation(yaw=90)
robot.edit_scale(x=1.5, y=1.5, z=1.5)

# ──────────────────────────────────────────────
# 3. Control joints (degrees or radians)
# ──────────────────────────────────────────────

robot.joints.set("shoulder_joint", 45, degrees=True)
robot.joints.set("elbow_joint", math.pi / 4, degrees=False)

angle = robot.joints.get("shoulder_joint")
all_joints = robot.joints.get_all()
print(f"Joint names: {robot.joints.list()}")

# ──────────────────────────────────────────────
# 4. Capture a camera frame (no streaming setup needed)
# ──────────────────────────────────────────────

path  = robot.capture_frame()           # save temp JPEG, returns path
frame = robot.capture_frame("numpy")    # numpy BGR array
image = robot.capture_frame("pil")      # PIL.Image object

# ──────────────────────────────────────────────
# 5. Search the asset catalog
# ──────────────────────────────────────────────

for asset in cw.assets.search("unitree"):
    print(f"{asset.registry_id}: {asset.name}")

# ──────────────────────────────────────────────
# 6. Manage workspaces, projects, and environments
# ──────────────────────────────────────────────

workspaces = cw.workspaces.list()
project = cw.projects.create(name="My Project", description="Quickstart project", workspace_id=workspaces[0].uuid)
env = cw.environments.create(name="Lab Setup", description="Test environment", project_id=project.uuid)

# ──────────────────────────────────────────────
# 7. Trigger a workflow and wait for completion
# ──────────────────────────────────────────────

run = cw.workflows.trigger("workflow-uuid", inputs={"speed": 0.5})
run.wait(timeout=60)
print(f"Workflow finished: {run.status}")

# ──────────────────────────────────────────────
# 8. Create and manage alerts on a twin
# ──────────────────────────────────────────────

alert = robot.alerts.create(
    name="Calibration needed",
    description="Joint 3 drifting beyond tolerance",
    severity="warning",
    alert_type="calibration_needed",
)
alert.acknowledge()
alert.resolve()

# ──────────────────────────────────────────────
# 9. Stream live video (async)
# ──────────────────────────────────────────────

camera = cw.twin("cyberwave/standard-cam")

async def stream():
    await camera.stream_video_background()
    try:
        while True:
            await asyncio.sleep(1)
    finally:
        await camera.stop_streaming()
        cw.disconnect()

# asyncio.run(stream())

# ──────────────────────────────────────────────
# 10. Manage edge devices
# ──────────────────────────────────────────────

for edge in cw.edges.list():
    print(f"{edge.name}{edge.uuid}")

Authentication

The SDK authenticates using an API key. You can provide it in two ways: Option 1: Environment variable (recommended):
export CYBERWAVE_API_KEY=your_api_key_here
from cyberwave import Cyberwave

cw = Cyberwave()
Option 2: Explicit API key:
from cyberwave import Cyberwave

cw = Cyberwave(api_key="your_api_key_here")
You can also set a default environment so cw.twin() always targets the same environment:
export CYBERWAVE_API_KEY="your_api_key_here"
export CYBERWAVE_ENVIRONMENT_ID="your_environment_uuid"
With both set, cw.twin("the-robot-studio/so101") will return the first SO101 twin in that environment, or create one if it doesn’t exist.
1

Generate an API key

Go to the Cyberwave dashboard → ProfileAPI Tokens and create a new key.
2

Set the environment variable

Export CYBERWAVE_API_KEY in your shell or add it to your .env file.
3

Initialize the client

Call Cyberwave(), the SDK picks up the key and establishes connections to both the REST and MQTT endpoints.

Affect Simulation vs Live Environment

Use cw.affect() to decide whether high-level robot actions should impact the simulation or the live robot.
from cyberwave import Cyberwave

cw = Cyberwave()

cw.affect("simulation")
rover = cw.twin("unitree/go2")
rover.move_forward(distance=1.0)   # moves the digital twin

cw.affect("live")
rover.move_forward(distance=1.0)   # moves the physical robot
Accepted values are:
ValueEffect
"simulation"Sends high-level actions to the simulator and updates the digital twin in the Cyberwave environment
"live"Sends high-level actions to the live robot through Cyberwave’s live control path
"real-world" is also accepted as an alias for "live". affect() is chainable, so you can also write cw.affect("simulation").twin("unitree/go2").

Digital Twins

Create or Retrieve a Twin

cw.twin() is the primary method for working with digital twins. Pass an asset registry ID (vendor/model) to create or retrieve a twin:
robot = cw.twin("the-robot-studio/so101")
This method:
  1. Queries the Asset Catalog (REST) to resolve the asset by registry ID
  2. Creates a new twin instance in your environment, or retrieves an existing one
  3. Returns a capability-specific Twin class based on the asset’s metadata
You can also target a specific environment or retrieve a twin by UUID:
robot = cw.twin("the-robot-studio/so101", environment_id="YOUR_ENVIRONMENT_UUID")

twin = cw.twin(twin_id="YOUR_TWIN_UUID")
If no default environment is configured (via CYBERWAVE_ENVIRONMENT_ID or environment_id parameter), the SDK auto-creates a “Quickstart Environment” and places the twin there.

Position, Rotation, and Scale

Update a twin’s 3D pose in the environment:
robot.edit_position(x=1.0, y=0.0, z=0.5)
robot.edit_position([1.0, 0.5, 0.0])

robot.edit_rotation(yaw=90)

robot.edit_scale(x=1.5, y=1.5, z=1.5)
Both keyword arguments and list format are supported for edit_position.

Move a Twin Between Environments

robot.add_to_environment("TARGET_ENVIRONMENT_UUID")
This operation:
  • Creates a deep copy of the twin in the target environment
  • Marks the original twin as deleted
  • Deletes the source environment if it has no remaining twins

Joint Control

Send real-time joint commands to a robot arm or any articulated twin. You can use degrees or radians:
import math

cw.affect("simulation")
robot = cw.twin("the-robot-studio/so101")

robot.joints.set("shoulder_joint", 45, degrees=True)

robot.joints.set("elbow_joint", math.pi/4, degrees=False)

angle = robot.joints.get("shoulder_joint")

joint_names = robot.joints.list()

all_joints = robot.joints.get_all()
MethodDescription
joints.set(name, value, degrees=True)Set a joint to a target position
joints.get(name)Get the current position of a specific joint
joints.list()List all joint names for the twin
joints.get_all()Get all joint states at once

Frame Capture

Capture the latest camera frame from a twin without setting up a full video stream:
robot = cw.twin("the-robot-studio/so101")

path = robot.capture_frame()                    # temp JPEG file path
frame = robot.capture_frame("numpy")            # numpy BGR array (requires numpy + opencv-python)
image = robot.capture_frame("pil")              # PIL.Image (requires Pillow)
raw = robot.capture_frame("bytes")              # raw JPEG bytes
Batch capture multiple frames:
folder = robot.capture_frames(5, interval_ms=200)            # folder of JPEGs
frames = robot.capture_frames(5, format="numpy")              # list of numpy arrays
For multi-camera twins, specify a sensor:
wrist = robot.capture_frame("numpy", sensor_id="wrist_cam")
There’s also a camera namespace with convenience methods:
frame = robot.camera.read()               # numpy array (default)
path  = robot.camera.snapshot()            # save JPEG to temp file
path  = robot.camera.snapshot("out.jpg")   # save to a specific path

Video Streaming (WebRTC)

Stream live camera feeds to digital twins using WebRTC. The streaming is initiated directly from the twin object.

Standard Camera

import asyncio
from cyberwave import Cyberwave

cw = Cyberwave()
camera = cw.twin("cyberwave/standard-cam")

async def main():
    try:
        print(f"Streaming to twin {camera.uuid}...")
        await camera.stream_video_background()

        while True:
            await asyncio.sleep(1)
    except (KeyboardInterrupt, asyncio.CancelledError):
        pass
    finally:
        await camera.stop_streaming()
        cw.disconnect()

asyncio.run(main())

Intel RealSense (RGB + Depth)

For depth cameras, change the twin name — the SDK handles the rest:
import asyncio
from cyberwave import Cyberwave

cw = Cyberwave()
camera = cw.twin("intel/realsensed455")

async def main():
    try:
        await camera.stream_video_background()

        while True:
            await asyncio.sleep(1)
    except (KeyboardInterrupt, asyncio.CancelledError):
        pass
    finally:
        await camera.stop_streaming()
        cw.disconnect()

asyncio.run(main())

Camera Discovery

Discover cameras attached to your device:
from cyberwave.sensor import CV2VideoTrack, CameraConfig, Resolution

supported = CV2VideoTrack.get_supported_resolutions(camera_id=0)
info = CV2VideoTrack.get_camera_info(camera_id=0)

config = CameraConfig(resolution=Resolution.HD, fps=30, camera_id=0)
For RealSense devices:
from cyberwave.sensor import RealSenseDiscovery, RealSenseConfig, Resolution

if RealSenseDiscovery.is_available():
    devices = RealSenseDiscovery.list_devices()
    for dev in devices:
        print(f"{dev.name} (SN: {dev.serial_number})")

    info = RealSenseDiscovery.get_device_info()
    print(f"Color resolutions: {info.get_color_resolutions()}")
    print(f"Depth resolutions: {info.get_depth_resolutions()}")

Workspaces, Projects, and Environments

cw = Cyberwave()

workspaces = cw.workspaces.list()
print(f"Found {len(workspaces)} workspaces")

project = cw.projects.create(
    name="My Robotics Project",
    description="Main robotics project",
    workspace_id=workspaces[0].uuid
)

environment = cw.environments.create(
    name="Development",
    description="Dev environment for testing",
    project_id=project.uuid
)

Asset Catalog

Search and browse pre-built robot assets:
assets = cw.assets.search("so101")

for asset in assets:
    print(f"{asset.registry_id}: {asset.name}")

robot = cw.twin(assets[0].registry_id)
The search returns both public assets (available at cyberwave.com/catalog) and private assets belonging to your organization.

Upload GLB Assets

The SDK supports large GLB uploads by automatically switching to a signed URL flow when files exceed the standard upload limit:
asset = cw.assets.create(
    name="Warehouse Shelf",
    description="Large GLB upload example",
)

updated_asset = cw.assets.upload_glb(asset.uuid, "/path/to/warehouse_shelf.glb")
print(updated_asset.glb_file)

Edge Management

Manage edge devices (Raspberry Pi, Jetson, etc.) that run the Cyberwave Edge Core:
cw = Cyberwave()

edges = cw.edges.list()
for edge in edges:
    print(edge.uuid, edge.name, edge.fingerprint)

edge = cw.edges.get("your-edge-uuid")

edge = cw.edges.create(
    fingerprint="linux-a1b2c3d4e5f60000",
    name="lab-rpi-001",
    workspace_id="your-workspace-uuid",
    metadata={"location": "lab-shelf-2"},
)

edge = cw.edges.update(edge.uuid, {"name": "lab-rpi-001-renamed"})

cw.edges.delete(edge.uuid)
The fingerprint is a stable hardware identifier derived from hostname, OS, architecture, and MAC address. The Edge Core generates and persists it automatically at /etc/cyberwave/fingerprint.json on first boot.

Workflows

List, trigger, and monitor workflows programmatically:
cw = Cyberwave()

for wf in cw.workflows.list():
    print(f"{wf.name} ({wf.uuid}) — {wf.status}")

run = cw.workflows.trigger(
    "workflow-uuid",
    inputs={"target_position": [1.0, 2.0, 0.0], "speed": 0.5},
)

run.wait(timeout=60)
print(run.status, run.result)
You can also start from a Workflow object:
wf = cw.workflows.get("workflow-uuid")
run = wf.trigger(inputs={"speed": 1.0})
run.wait()
List and filter past runs:
runs = cw.workflow_runs.list(workflow_id="workflow-uuid", status="error")
for r in runs:
    print(r.uuid, r.status, r.error)

Alerts

Create, manage, and respond to alerts on a twin. Alerts notify operators that action is needed (e.g., a robot needs calibration or a sensor reading is out of range).
twin = cw.twin(twin_id="your_twin_uuid")

alert = twin.alerts.create(
    name="Calibration needed",
    description="Joint 3 is drifting beyond tolerance",
    severity="warning",
    alert_type="calibration_needed",
)
SeverityDescription
infoInformational notice
warningRequires attention
errorSomething is wrong
criticalImmediate action needed
Manage alert lifecycle:
for a in twin.alerts.list(status="active"):
    print(a.name, a.severity, a.status)

alert.acknowledge()
alert.resolve()
alert.silence()
alert.update(severity="critical")
alert.delete()
To bypass backend deduplication and always create a new alert:
alert = twin.alerts.create(
    name="Calibration needed",
    alert_type="calibration_needed",
    force=True,
)

Environment Previews

Render a static PNG snapshot of an environment:
preview = cw.environments.create_preview("ENVIRONMENT_UUID")
print(preview.file_url)
This calls POST /api/v1/environments/{uuid}/preview and returns attachment metadata including the rendered image URL.

SDK Architecture

The SDK operates across two communication layers:
LayerProtocolOperations
SDK → Cloud (REST)HTTPSCRUD for twins, assets, models, environments, edges, workflows, alerts
SDK → Cloud (MQTT)TCP/MQTTJoint updates, position/rotation/scale, WebRTC signaling, health pings
Cloud → EdgeMQTTCommand forwarding when live robot control is active
Edge → HardwareSerial / USB / ROSPhysical actuation and sensor reading

API Key

Generate your API key

PyPI

View the package on PyPI

GitHub

Source code and examples

REST API Reference

Full REST endpoint documentation

MQTT API Reference

Real-time messaging specification

Edge VLM Tutorial

Build an edge-to-cloud vision pipeline

Live Teleoperation

Control physical robots in real time via the SDK