Overview
The Cyberwave Python SDK (cyberwave) is a unified client that wraps both the Cyberwave REST API and MQTT API into a single Python interface. It handles authentication, protocol negotiation, and message serialization so you can interact with digital twins, control physical robots, stream video, manage workflows, and handle alerts, all using Python.
Capability Protocol Description Digital Twin management REST Create, query, move, and configure digital twins Joint control MQTT Send real-time joint position commands (degrees or radians) Position, rotation, scale MQTT Update a twin’s 3D pose in the environment Video streaming WebRTC + MQTT Stream camera feeds (standard + RealSense) to the cloud Frame capture WebRTC Grab individual frames as files, numpy arrays, PIL images, or raw bytes Asset catalog REST Search, create, and upload robot assets (including large GLB files) Workspace and project management REST List workspaces, create projects and environments Edge management REST Register, update, and delete edge devices Workflows REST List, trigger, and monitor workflow executions Alerts REST Create, acknowledge, resolve, and manage twin alerts
Installation
For camera streaming, install the optional extras:
pip install cyberwave[camera]
pip install cyberwave[realsense]
On ARM64/Raspberry Pi, you’ll also need the librealsense SDK installed on your system. On x86_64, install it via sudo apt install librealsense2 or use pre-built wheels. On Raspberry Pi OS (ARM64), you must build librealsense from source.
Video streaming also requires FFMPEG :
# macOS
brew install ffmpeg pkg-config
# Ubuntu
sudo apt-get install ffmpeg
Requirements:
Python 3.10 or higher
A Cyberwave API key (generate one from your Profile page )
Quick Start
from cyberwave import Cyberwave
import math, asyncio
cw = Cyberwave( api_key = "your_api_key_here" )
# ──────────────────────────────────────────────
# 1. Create a digital twin from the asset catalog
# ──────────────────────────────────────────────
robot = cw.twin( "the-robot-studio/so101" )
# Target a specific environment (optional)
# robot = cw.twin("the-robot-studio/so101", environment_id="ENV_UUID")
# ──────────────────────────────────────────────
# 2. Position, rotate, and scale the twin
# ──────────────────────────────────────────────
robot.edit_position( x = 1.0 , y = 0.0 , z = 0.5 )
robot.edit_rotation( yaw = 90 )
robot.edit_scale( x = 1.5 , y = 1.5 , z = 1.5 )
# ──────────────────────────────────────────────
# 3. Control joints (degrees or radians)
# ──────────────────────────────────────────────
robot.joints.set( "shoulder_joint" , 45 , degrees = True )
robot.joints.set( "elbow_joint" , math.pi / 4 , degrees = False )
angle = robot.joints.get( "shoulder_joint" )
all_joints = robot.joints.get_all()
print ( f "Joint names: { robot.joints.list() } " )
# ──────────────────────────────────────────────
# 4. Capture a camera frame (no streaming setup needed)
# ──────────────────────────────────────────────
path = robot.capture_frame() # save temp JPEG, returns path
frame = robot.capture_frame( "numpy" ) # numpy BGR array
image = robot.capture_frame( "pil" ) # PIL.Image object
# ──────────────────────────────────────────────
# 5. Search the asset catalog
# ──────────────────────────────────────────────
for asset in cw.assets.search( "unitree" ):
print ( f " { asset.registry_id } : { asset.name } " )
# ──────────────────────────────────────────────
# 6. Manage workspaces, projects, and environments
# ──────────────────────────────────────────────
workspaces = cw.workspaces.list()
project = cw.projects.create( name = "My Project" , description = "Quickstart project" , workspace_id = workspaces[ 0 ].uuid)
env = cw.environments.create( name = "Lab Setup" , description = "Test environment" , project_id = project.uuid)
# ──────────────────────────────────────────────
# 7. Trigger a workflow and wait for completion
# ──────────────────────────────────────────────
run = cw.workflows.trigger( "workflow-uuid" , inputs = { "speed" : 0.5 })
run.wait( timeout = 60 )
print ( f "Workflow finished: { run.status } " )
# ──────────────────────────────────────────────
# 8. Create and manage alerts on a twin
# ──────────────────────────────────────────────
alert = robot.alerts.create(
name = "Calibration needed" ,
description = "Joint 3 drifting beyond tolerance" ,
severity = "warning" ,
alert_type = "calibration_needed" ,
)
alert.acknowledge()
alert.resolve()
# ──────────────────────────────────────────────
# 9. Stream live video (async)
# ──────────────────────────────────────────────
camera = cw.twin( "cyberwave/standard-cam" )
async def stream ():
await camera.stream_video_background()
try :
while True :
await asyncio.sleep( 1 )
finally :
await camera.stop_streaming()
cw.disconnect()
# asyncio.run(stream())
# ──────────────────────────────────────────────
# 10. Manage edge devices
# ──────────────────────────────────────────────
for edge in cw.edges.list():
print ( f " { edge.name } — { edge.uuid } " )
Authentication
The SDK authenticates using an API key. You can provide it in two ways:
Option 1: Environment variable (recommended):
export CYBERWAVE_API_KEY = your_api_key_here
from cyberwave import Cyberwave
cw = Cyberwave()
Option 2: Explicit API key:
from cyberwave import Cyberwave
cw = Cyberwave( api_key = "your_api_key_here" )
You can also set a default environment so cw.twin() always targets the same environment:
export CYBERWAVE_API_KEY = "your_api_key_here"
export CYBERWAVE_ENVIRONMENT_ID = "your_environment_uuid"
With both set, cw.twin("the-robot-studio/so101") will return the first SO101 twin in that environment, or create one if it doesn’t exist.
Generate an API key
Go to the Cyberwave dashboard → Profile → API Tokens and create a new key.
Set the environment variable
Export CYBERWAVE_API_KEY in your shell or add it to your .env file.
Initialize the client
Call Cyberwave(), the SDK picks up the key and establishes connections to both the REST and MQTT endpoints.
Affect Simulation vs Live Environment
Use cw.affect() to decide whether high-level robot actions should impact the simulation or the live robot.
from cyberwave import Cyberwave
cw = Cyberwave()
cw.affect( "simulation" )
rover = cw.twin( "unitree/go2" )
rover.move_forward( distance = 1.0 ) # moves the digital twin
cw.affect( "live" )
rover.move_forward( distance = 1.0 ) # moves the physical robot
Accepted values are:
Value Effect "simulation"Sends high-level actions to the simulator and updates the digital twin in the Cyberwave environment "live"Sends high-level actions to the live robot through Cyberwave’s live control path
"real-world" is also accepted as an alias for "live".
affect() is chainable, so you can also write cw.affect("simulation").twin("unitree/go2").
Digital Twins
Create or Retrieve a Twin
cw.twin() is the primary method for working with digital twins. Pass an asset registry ID (vendor/model) to create or retrieve a twin:
robot = cw.twin( "the-robot-studio/so101" )
This method:
Queries the Asset Catalog (REST) to resolve the asset by registry ID
Creates a new twin instance in your environment, or retrieves an existing one
Returns a capability-specific Twin class based on the asset’s metadata
You can also target a specific environment or retrieve a twin by UUID:
robot = cw.twin( "the-robot-studio/so101" , environment_id = "YOUR_ENVIRONMENT_UUID" )
twin = cw.twin( twin_id = "YOUR_TWIN_UUID" )
If no default environment is configured (via CYBERWAVE_ENVIRONMENT_ID or environment_id parameter), the SDK auto-creates a “Quickstart Environment” and places the twin there.
Position, Rotation, and Scale
Update a twin’s 3D pose in the environment:
robot.edit_position( x = 1.0 , y = 0.0 , z = 0.5 )
robot.edit_position([ 1.0 , 0.5 , 0.0 ])
robot.edit_rotation( yaw = 90 )
robot.edit_scale( x = 1.5 , y = 1.5 , z = 1.5 )
Both keyword arguments and list format are supported for edit_position.
Move a Twin Between Environments
robot.add_to_environment( "TARGET_ENVIRONMENT_UUID" )
This operation:
Creates a deep copy of the twin in the target environment
Marks the original twin as deleted
Deletes the source environment if it has no remaining twins
Joint Control
Send real-time joint commands to a robot arm or any articulated twin. You can use degrees or radians :
import math
cw.affect( "simulation" )
robot = cw.twin( "the-robot-studio/so101" )
robot.joints.set( "shoulder_joint" , 45 , degrees = True )
robot.joints.set( "elbow_joint" , math.pi / 4 , degrees = False )
angle = robot.joints.get( "shoulder_joint" )
joint_names = robot.joints.list()
all_joints = robot.joints.get_all()
Method Description joints.set(name, value, degrees=True)Set a joint to a target position joints.get(name)Get the current position of a specific joint joints.list()List all joint names for the twin joints.get_all()Get all joint states at once
Frame Capture
Capture the latest camera frame from a twin without setting up a full video stream:
robot = cw.twin( "the-robot-studio/so101" )
path = robot.capture_frame() # temp JPEG file path
frame = robot.capture_frame( "numpy" ) # numpy BGR array (requires numpy + opencv-python)
image = robot.capture_frame( "pil" ) # PIL.Image (requires Pillow)
raw = robot.capture_frame( "bytes" ) # raw JPEG bytes
Batch capture multiple frames:
folder = robot.capture_frames( 5 , interval_ms = 200 ) # folder of JPEGs
frames = robot.capture_frames( 5 , format = "numpy" ) # list of numpy arrays
For multi-camera twins, specify a sensor:
wrist = robot.capture_frame( "numpy" , sensor_id = "wrist_cam" )
There’s also a camera namespace with convenience methods:
frame = robot.camera.read() # numpy array (default)
path = robot.camera.snapshot() # save JPEG to temp file
path = robot.camera.snapshot( "out.jpg" ) # save to a specific path
Video Streaming (WebRTC)
Stream live camera feeds to digital twins using WebRTC. The streaming is initiated directly from the twin object.
Standard Camera
import asyncio
from cyberwave import Cyberwave
cw = Cyberwave()
camera = cw.twin( "cyberwave/standard-cam" )
async def main ():
try :
print ( f "Streaming to twin { camera.uuid } ..." )
await camera.stream_video_background()
while True :
await asyncio.sleep( 1 )
except ( KeyboardInterrupt , asyncio.CancelledError):
pass
finally :
await camera.stop_streaming()
cw.disconnect()
asyncio.run(main())
Intel RealSense (RGB + Depth)
For depth cameras, change the twin name — the SDK handles the rest:
import asyncio
from cyberwave import Cyberwave
cw = Cyberwave()
camera = cw.twin( "intel/realsensed455" )
async def main ():
try :
await camera.stream_video_background()
while True :
await asyncio.sleep( 1 )
except ( KeyboardInterrupt , asyncio.CancelledError):
pass
finally :
await camera.stop_streaming()
cw.disconnect()
asyncio.run(main())
Camera Discovery
Discover cameras attached to your device:
from cyberwave.sensor import CV2VideoTrack, CameraConfig, Resolution
supported = CV2VideoTrack.get_supported_resolutions( camera_id = 0 )
info = CV2VideoTrack.get_camera_info( camera_id = 0 )
config = CameraConfig( resolution = Resolution. HD , fps = 30 , camera_id = 0 )
For RealSense devices:
from cyberwave.sensor import RealSenseDiscovery, RealSenseConfig, Resolution
if RealSenseDiscovery.is_available():
devices = RealSenseDiscovery.list_devices()
for dev in devices:
print ( f " { dev.name } (SN: { dev.serial_number } )" )
info = RealSenseDiscovery.get_device_info()
print ( f "Color resolutions: { info.get_color_resolutions() } " )
print ( f "Depth resolutions: { info.get_depth_resolutions() } " )
Workspaces, Projects, and Environments
cw = Cyberwave()
workspaces = cw.workspaces.list()
print ( f "Found { len (workspaces) } workspaces" )
project = cw.projects.create(
name = "My Robotics Project" ,
description = "Main robotics project" ,
workspace_id = workspaces[ 0 ].uuid
)
environment = cw.environments.create(
name = "Development" ,
description = "Dev environment for testing" ,
project_id = project.uuid
)
Asset Catalog
Search and browse pre-built robot assets:
assets = cw.assets.search( "so101" )
for asset in assets:
print ( f " { asset.registry_id } : { asset.name } " )
robot = cw.twin(assets[ 0 ].registry_id)
The search returns both public assets (available at cyberwave.com/catalog ) and private assets belonging to your organization.
Upload GLB Assets
The SDK supports large GLB uploads by automatically switching to a signed URL flow when files exceed the standard upload limit:
asset = cw.assets.create(
name = "Warehouse Shelf" ,
description = "Large GLB upload example" ,
)
updated_asset = cw.assets.upload_glb(asset.uuid, "/path/to/warehouse_shelf.glb" )
print (updated_asset.glb_file)
Edge Management
Manage edge devices (Raspberry Pi, Jetson, etc.) that run the Cyberwave Edge Core:
cw = Cyberwave()
edges = cw.edges.list()
for edge in edges:
print (edge.uuid, edge.name, edge.fingerprint)
edge = cw.edges.get( "your-edge-uuid" )
edge = cw.edges.create(
fingerprint = "linux-a1b2c3d4e5f60000" ,
name = "lab-rpi-001" ,
workspace_id = "your-workspace-uuid" ,
metadata = { "location" : "lab-shelf-2" },
)
edge = cw.edges.update(edge.uuid, { "name" : "lab-rpi-001-renamed" })
cw.edges.delete(edge.uuid)
The fingerprint is a stable hardware identifier derived from hostname, OS, architecture, and MAC address. The Edge Core generates and persists it automatically at /etc/cyberwave/fingerprint.json on first boot.
Workflows
List, trigger, and monitor workflows programmatically:
cw = Cyberwave()
for wf in cw.workflows.list():
print ( f " { wf.name } ( { wf.uuid } ) — { wf.status } " )
run = cw.workflows.trigger(
"workflow-uuid" ,
inputs = { "target_position" : [ 1.0 , 2.0 , 0.0 ], "speed" : 0.5 },
)
run.wait( timeout = 60 )
print (run.status, run.result)
You can also start from a Workflow object:
wf = cw.workflows.get( "workflow-uuid" )
run = wf.trigger( inputs = { "speed" : 1.0 })
run.wait()
List and filter past runs:
runs = cw.workflow_runs.list( workflow_id = "workflow-uuid" , status = "error" )
for r in runs:
print (r.uuid, r.status, r.error)
Alerts
Create, manage, and respond to alerts on a twin. Alerts notify operators that action is needed (e.g., a robot needs calibration or a sensor reading is out of range).
twin = cw.twin( twin_id = "your_twin_uuid" )
alert = twin.alerts.create(
name = "Calibration needed" ,
description = "Joint 3 is drifting beyond tolerance" ,
severity = "warning" ,
alert_type = "calibration_needed" ,
)
Severity Description infoInformational notice warningRequires attention errorSomething is wrong criticalImmediate action needed
Manage alert lifecycle:
for a in twin.alerts.list( status = "active" ):
print (a.name, a.severity, a.status)
alert.acknowledge()
alert.resolve()
alert.silence()
alert.update( severity = "critical" )
alert.delete()
To bypass backend deduplication and always create a new alert:
alert = twin.alerts.create(
name = "Calibration needed" ,
alert_type = "calibration_needed" ,
force = True ,
)
Environment Previews
Render a static PNG snapshot of an environment:
preview = cw.environments.create_preview( "ENVIRONMENT_UUID" )
print (preview.file_url)
This calls POST /api/v1/environments/{uuid}/preview and returns attachment metadata including the rendered image URL.
SDK Architecture
The SDK operates across two communication layers:
Layer Protocol Operations SDK → Cloud (REST) HTTPS CRUD for twins, assets, models, environments, edges, workflows, alerts SDK → Cloud (MQTT) TCP/MQTT Joint updates, position/rotation/scale, WebRTC signaling, health pings Cloud → Edge MQTT Command forwarding when live robot control is active Edge → Hardware Serial / USB / ROS Physical actuation and sensor reading
API Key Generate your API key
PyPI View the package on PyPI
GitHub Source code and examples
REST API Reference Full REST endpoint documentation
MQTT API Reference Real-time messaging specification
Edge VLM Tutorial Build an edge-to-cloud vision pipeline
Live Teleoperation Control physical robots in real time via the SDK