The Cyberwave Python SDK (cyberwave) is a unified client that wraps both the Cyberwave REST API and MQTT API into a single Python interface. It handles authentication, protocol negotiation, and message serialization so you can interact with digital twins, control physical robots, stream video, manage workflows, and handle alerts, all using Python.
Capability
Protocol
Description
Digital Twin management
REST
Create, query, move, and configure digital twins
Joint control
MQTT
Send real-time joint position commands (degrees or radians)
Position, rotation, scale
MQTT
Update a twin’s 3D pose in the environment
Video streaming
WebRTC + MQTT
Stream camera feeds (standard + RealSense) to the cloud
Frame capture
WebRTC
Grab individual frames as files, numpy arrays, PIL images, or raw bytes
Asset catalog
REST
Search, create, and upload robot assets (including large GLB files)
Workspace and project management
REST
List workspaces, create projects and environments
Edge management
REST
Register, update, and delete edge devices
Workflows
REST
List, trigger, and monitor workflow executions
Alerts
REST
Create, acknowledge, resolve, and manage twin alerts
For camera streaming, install the optional extras:
Standard Cameras (OpenCV)
Intel RealSense (RGB + Depth)
pip install cyberwave[camera]
pip install cyberwave[realsense]
On ARM64/Raspberry Pi, you’ll also need the librealsense SDK installed on your system. On x86_64, install it via sudo apt install librealsense2 or use pre-built wheels. On Raspberry Pi OS (ARM64), you must build librealsense from source.
Use cw.affect() to decide whether high-level robot actions should impact the simulation or the live robot.
from cyberwave import Cyberwavecw = Cyberwave()cw.affect("simulation")rover = cw.twin("unitree/go2")rover.move_forward(distance=1.0) # moves the digital twincw.affect("live")rover.move_forward(distance=1.0) # moves the physical robot
Accepted values are:
Value
Effect
"simulation"
Sends high-level actions to the simulator and updates the digital twin in the Cyberwave environment
"live"
Sends high-level actions to the live robot through Cyberwave’s live control path
"real-world" is also accepted as an alias for "live".affect() is chainable, so you can also write cw.affect("simulation").twin("unitree/go2").
cw.twin() is the primary method for working with digital twins. Pass an asset registry ID (vendor/model) to create or retrieve a twin:
robot = cw.twin("the-robot-studio/so101")
This method:
Queries the Asset Catalog (REST) to resolve the asset by registry ID
Creates a new twin instance in your environment, or retrieves an existing one
Returns a capability-specific Twin class based on the asset’s metadata
You can also target a specific environment or retrieve a twin by UUID or unified slug:
# By UUIDrobot = cw.twin("the-robot-studio/so101", environment_id="YOUR_ENVIRONMENT_UUID")twin = cw.twin(twin_id="YOUR_TWIN_UUID")# By unified slugrobot = cw.twin("acme/catalog/so101", environment_id="acme/envs/production")twin = cw.twin(twin_id="acme/twins/my-arm")
Slugs and UUIDs are interchangeable wherever an identifier is accepted. Access an entity’s slug via the .slug property (e.g. twin.slug).If no default environment is configured (via CYBERWAVE_ENVIRONMENT_ID or environment_id parameter), the SDK auto-creates a “Quickstart Environment” and places the twin there.
Send real-time joint commands to a robot arm or any articulated twin. You can use degrees or radians:
import mathcw.affect("simulation")robot = cw.twin("the-robot-studio/so101")robot.joints.set("shoulder_pan", 45, degrees=True)robot.joints.set("elbow_flex", math.pi/4, degrees=False)angle = robot.joints.get("shoulder_pan")joint_names = robot.joints.list()# Get all joint states as a dict {name: radians}all_joints = robot.joints.get_all()# Print a human-readable table of all joint states (radians + degrees)robot.joints.print_joint_states()
print_joint_states() fetches the latest state from the server and prints a formatted table:
Joint states for twin <twin-uuid>:------------------------------------------------------Joint Radians Degrees------------------------------------------------------elbow_flex 0.0000 rad 0.00 °shoulder_pan 0.7854 rad 45.00 °------------------------------------------------------
Method
Description
joints.set(name, value, degrees=True)
Set a joint to a target position
joints.get(name)
Get the current position of a specific joint (radians)
joints.list()
List all joint names for the twin
joints.get_all()
Get all joint states as a dict[name, radians]
joints.print_joint_states()
Print all joint states in a human-readable table (radians + degrees)
There’s also a camera namespace with convenience methods:
frame = robot.camera.read() # numpy array (default)path = robot.camera.snapshot() # save JPEG to temp filepath = robot.camera.snapshot("out.jpg") # save to a specific path
The fingerprint is a stable hardware identifier derived from hostname, OS, architecture, and MAC address. The Edge Core generates and persists it automatically at ~/.cyberwave/fingerprint.json on first boot.
List, trigger, and monitor workflows programmatically:
cw = Cyberwave()for wf in cw.workflows.list(): print(f"{wf.name} ({wf.slug}) — {wf.status}")# Trigger by UUID or unified slugrun = cw.workflows.trigger( "acme/workflows/pick-and-place", inputs={"target_position": [1.0, 2.0, 0.0], "speed": 0.5},)run.wait(timeout=60)print(run.status, run.result)
You can also start from a Workflow object:
# Get by UUID or slugwf = cw.workflows.get("acme/workflows/pick-and-place")run = wf.trigger(inputs={"speed": 1.0})run.wait()# Dedicated slug lookupwf = cw.workflows.get_by_slug("acme/workflows/pick-and-place")
List and filter past runs:
runs = cw.workflow_runs.list(workflow_id="workflow-uuid", status="error")for r in runs: print(r.uuid, r.status, r.error)
Check if a workflow is currently running:
wf = cw.workflows.get("workflow-uuid")if wf.is_running(): print("Workflow is currently executing")# Or using the manager directlyif cw.workflows.is_running("workflow-uuid"): print("Workflow is currently executing")
is_running() returns True when any run has status running, waiting, or requested.
Create, manage, and respond to alerts on a twin. Alerts notify operators that action is needed (e.g., a robot needs calibration or a sensor reading is out of range).
for a in twin.alerts.list(status="active"): print(a.name, a.severity, a.status)alert.acknowledge()alert.resolve()alert.silence()alert.update(severity="critical")alert.delete()
To bypass backend deduplication and always create a new alert: