Transports connect module streams across process boundaries and/or networks.
- Module: a running component (e.g., camera, mapping, nav).
- Stream: a unidirectional flow of messages owned by a module (one broadcaster → many receivers).
- Topic: the name/identifier used by a transport or pubsub backend.
- Message: payload carried on a stream (often
dimos.msgs.*, but can be bytes / images / pointclouds / etc.).
Each edge in the graph is a transported stream (potentially different protocols). Each node is a module:
Modules don’t know or care how data moves. They just:
- emit messages (broadcast)
- subscribe to messages (receive)
A transport is responsible for the mechanics of delivery (IPC, sockets, Redis, ROS 2, etc.).
Important: delivery semantics depend on the backend:
- Some are best-effort (e.g., UDP multicast / LCM): loss can happen.
- Some can be reliable (e.g., TCP-backed, Redis, some DDS configs) but may add latency/backpressure.
So: treat the API as uniform, but pick a backend whose semantics match the task.
Quick view on performance of our pubsub backends:
python -m pytest -svm tool -k "not bytes" dimos/protocol/pubsub/benchmark/test_benchmark.pyPikchr
color = white
fill = none
linewid = 0.5in
boxwid = 1.0in
boxht = 0.4in
# Boxes with labels
B: box "Blueprints" rad 10px
arrow
M: box "Modules" rad 5px
arrow
T: box "Transports" rad 5px
arrow
P: box "PubSub" rad 5px
# Descriptions below
text "robot configs" at B.s + (0.1, -0.2in)
text "camera, nav" at M.s + (0, -0.2in)
text "LCM, SHM, ROS" at T.s + (0, -0.2in)
text "pub/sub API" at P.s + (0, -0.2in)We’ll go through these layers top-down.
See Blueprints for the blueprint API.
From unitree/go2/blueprints/__init__.py.
Example: rebind a few streams from the default LCMTransport to ROSTransport (defined at transport.py) so you can visualize in rviz2.
nav = autoconnect(
basic,
voxel_mapper(voxel_size=0.1),
cost_mapper(),
replanning_a_star_planner(),
wavefront_frontier_explorer(),
).global_config(n_workers=6, robot_model="unitree_go2")
ros = nav.transports(
{
("lidar", PointCloud2): ROSTransport("lidar", PointCloud2),
("global_map", PointCloud2): ROSTransport("global_map", PointCloud2),
("odom", PoseStamped): ROSTransport("odom", PoseStamped),
("color_image", Image): ROSTransport("color_image", Image),
}
)Each stream on a module can use a different transport. Set .transport on the stream before starting modules.
import time
from dimos.core.module import Module
from dimos.core.stream import In
from dimos.core.transport import LCMTransport
from dimos.hardware.sensors.camera.module import CameraModule
from dimos.msgs.sensor_msgs import Image
from dimos.core.module_coordinator import ModuleCoordinator
class ImageListener(Module):
image: In[Image]
def start(self):
super().start()
self.image.subscribe(lambda img: print(f"Received: {img.shape}"))
if __name__ == "__main__":
# Start local cluster and deploy modules to separate processes
dimos = ModuleCoordinator()
dimos.start()
camera = dimos.deploy(CameraModule, frequency=2.0)
listener = dimos.deploy(ImageListener)
# Choose a transport for the stream (example: LCM typed channel)
camera.color_image.transport = LCMTransport("/camera/rgb", Image)
# Connect listener input to camera output
listener.image.connect(camera.color_image)
dimos.start_all_modules()
time.sleep(2)
dimos.stop()Initialized dimos local cluster with 2 workers, memory limit: auto
2026-01-24T13:17:50.190559Z [info ] Deploying module. [dimos/core/__init__.py] module=CameraModule
2026-01-24T13:17:50.218466Z [info ] Deployed module. [dimos/core/__init__.py] module=CameraModule worker_id=1
2026-01-24T13:17:50.229474Z [info ] Deploying module. [dimos/core/__init__.py] module=ImageListener
2026-01-24T13:17:50.250199Z [info ] Deployed module. [dimos/core/__init__.py] module=ImageListener worker_id=0
Received: (480, 640, 3)
Received: (480, 640, 3)
Received: (480, 640, 3)
See Modules for more on module architecture.
lcmspy shows topic frequency/bandwidth stats:
dimos topic echo /topic listens on typed channels like /topic#pkg.Msg and decodes automatically:
Listening on /camera/rgb (inferring from typed LCM channels like '/camera/rgb#pkg.Msg')... (Ctrl+C to stop)
Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2026-01-24 20:28:59)At the stream layer, a transport is implemented by subclassing Transport (see core/stream.py) and implementing:
broadcast(...)subscribe(...)
Your Transport.__init__ args can be anything meaningful for your backend:
(ip, port)- a shared-memory segment name
- a filesystem path
- a Redis channel
Encoding is an implementation detail, but we encourage using LCM-compatible message types when possible.
Many of our message types provide lcm_encode / lcm_decode for compact, language-agnostic binary encoding (often faster than pickle). For details, see LCM.
Even though transport can be anything (TCP connection, unix socket) for now all our transport backends implement the PubSub interface.
publish(topic, message)subscribe(topic, callback) -> unsubscribe
from dimos.protocol.pubsub.spec import PubSub
import inspect
print(inspect.getsource(PubSub.publish))
print(inspect.getsource(PubSub.subscribe)) @abstractmethod
def publish(self, topic: TopicT, message: MsgT) -> None:
"""Publish a message to a topic."""
...
@abstractmethod
def subscribe(
self, topic: TopicT, callback: Callable[[MsgT, TopicT], None]
) -> Callable[[], None]:
"""Subscribe to a topic with a callback. returns unsubscribe function"""
...Topic/message types are flexible: bytes, JSON, or our ROS-compatible LCM types. We also have pickle-based transports for arbitrary Python objects.
LCM is UDP multicast. It’s very fast on a robot LAN, but it’s best-effort (packets can drop). For local emission it autoconfigures system in a way in which it's more robust and faster then other more common protocols like ROS, DDS
from dimos.protocol.pubsub.lcmpubsub import LCM, Topic
from dimos.msgs.geometry_msgs import Vector3
lcm = LCM()
lcm.start()
received = []
topic = Topic("/robot/velocity", Vector3)
lcm.subscribe(topic, lambda msg, t: received.append(msg))
lcm.publish(topic, Vector3(1.0, 0.0, 0.5))
import time
time.sleep(0.1)
print(f"Received velocity: x={received[0].x}, y={received[0].y}, z={received[0].z}")
lcm.stop()Received velocity: x=1.0, y=0.0, z=0.5
Shared memory is highest performance, but only works on the same machine.
from dimos.protocol.pubsub.shmpubsub import PickleSharedMemory
shm = PickleSharedMemory(prefer="cpu")
shm.start()
received = []
shm.subscribe("test/topic", lambda msg, topic: received.append(msg))
shm.publish("test/topic", {"data": [1, 2, 3]})
import time
time.sleep(0.1)
print(f"Received: {received}")
shm.stop()Received: [{'data': [1, 2, 3]}]
For network communication, DDS uses the Data Distribution Service (DDS) protocol:
from dataclasses import dataclass
from cyclonedds.idl import IdlStruct
from dimos.protocol.pubsub.impl.ddspubsub import DDS, Topic
@dataclass
class SensorReading(IdlStruct):
value: float
dds = DDS()
dds.start()
received = []
sensor_topic = Topic(name="sensors/temperature", data_type=SensorReading)
dds.subscribe(sensor_topic, lambda msg, t: received.append(msg))
dds.publish(sensor_topic, SensorReading(value=22.5))
import time
time.sleep(0.1)
print(f"Received: {received}")
dds.stop()Received: [SensorReading(value=22.5)]
The simplest toy backend is Memory (single process). Start from there when implementing a new pubsub backend.
from dimos.protocol.pubsub.memory import Memory
bus = Memory()
received = []
unsubscribe = bus.subscribe("sensor/data", lambda msg, topic: received.append(msg))
bus.publish("sensor/data", {"temperature": 22.5})
bus.publish("sensor/data", {"temperature": 23.0})
print(f"Received {len(received)} messages:")
for msg in received:
print(f" {msg}")
unsubscribe()Received 2 messages:
{'temperature': 22.5}
{'temperature': 23.0}
See memory.py for the complete source.
Transports often need to serialize messages before sending and deserialize after receiving.
PubSubEncoderMixin at pubsub/spec.py provides a clean way to add encoding/decoding to any pubsub implementation.
| Mixin | Encoding | Use case |
|---|---|---|
PickleEncoderMixin |
Python pickle | Any Python object, Python-only |
LCMEncoderMixin |
LCM binary | Cross-language (C/C++/Python/Go/…) |
JpegEncoderMixin |
JPEG compressed | Image data, reduces bandwidth |
LCMEncoderMixin is especially useful: you can use LCM message definitions with any transport (not just UDP multicast). See LCM for details.
from dimos.protocol.pubsub.spec import PubSubEncoderMixin
import json
class JsonEncoderMixin(PubSubEncoderMixin[str, dict, bytes]):
def encode(self, msg: dict, topic: str) -> bytes:
return json.dumps(msg).encode("utf-8")
def decode(self, msg: bytes, topic: str) -> dict:
return json.loads(msg.decode("utf-8"))Combine with a pubsub implementation via multiple inheritance:
from dimos.protocol.pubsub.memory import Memory
class MyJsonPubSub(JsonEncoderMixin, Memory):
passSwap serialization by changing the mixin:
from dimos.protocol.pubsub.spec import PickleEncoderMixin
class MyPicklePubSub(PickleEncoderMixin, Memory):
passSee pubsub/test_spec.py for the grid tests your new backend should pass.
Add your backend to benchmarks to compare in context:
python -m pytest -svm tool -k "not bytes" dimos/protocol/pubsub/benchmark/test_benchmark.py| Transport | Use case | Cross-process | Network | Notes |
|---|---|---|---|---|
Memory |
Testing only, single process | No | No | Minimal reference impl |
SharedMemory |
Multi-process on same machine | Yes | No | Highest throughput (IPC) |
LCM |
Robot LAN broadcast (UDP multicast) | Yes | Yes | Best-effort; can drop packets on LAN |
Redis |
Network pubsub via Redis server | Yes | Yes | Central broker; adds hop |
ROS |
ROS 2 topic communication | Yes | Yes | Integrates with RViz/ROS tools |
DDS |
Cyclone DDS without ROS (WIP) | Yes | Yes | WIP |

