Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions .agents/workflows/deploy-to-remote.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
---
description: How to deploy ProtoMQ to a remote server and validate it
---

# ProtoMQ Remote Deployment Guide

This workflow guides agents on how to deploy the ProtoMQ server to a remote machine, specifically configuring it to run as a systemd service, and how to validate that the deployment is working correctly.

> [!IMPORTANT]
> Do not assume any prior knowledge of the target environment.

## Prerequisites
- Ask the user which SSH connection string to use (e.g., `<ssh_user>@<remote_host>`).

## Deployment Steps

1. **Verify Zig Dependency**:
Search for the `zig` binary on the remote machine (e.g., `ssh <ssh_target> "which zig"` or `ssh <ssh_target> "find / -name zig 2>/dev/null | grep bin/zig"`).
If `zig` is not found, **STOP** and tell the user to install Zig on the remote machine before proceeding.

2. **Clone Repository to `/opt/protomq`**:
Connect via the provided SSH connection and create the `/opt/protomq` directory, ensuring appropriate permissions, then clone the repository there.
```bash
ssh <ssh_target> "sudo mkdir -p /opt/protomq && sudo chown \$USER /opt/protomq && git clone https://github.com/electricalgorithm/protomq /opt/protomq || (cd /opt/protomq && git fetch --all)"
```

3. **Checkout and Pull**:
Checkout the correct branch and pull the latest changes.
```bash
ssh <ssh_target> "cd /opt/protomq && git checkout <branch_name> && git pull"
```

4. **Build and Install the Application**:
Build the Zig application on the remote server using the located `zig` binary. Ensure you build with the `-Dadmin_server=true` flag.
Use the `--prefix /opt/protomq` flag so that it installs the `bin/` files and the systemd service file into `/opt/protomq`.
```bash
ssh <ssh_target> "cd /opt/protomq && sudo <path_to_zig_binary> build -Doptimize=ReleaseSafe -Dadmin_server=true --prefix /opt/protomq"
```

5. **Configure systemd Service**:
Since the build step installed the service file directly into `/opt/protomq/etc/systemd/system/protomq.service`, simply link it to the system bus, reload, and start it.
```bash
ssh <ssh_target> "sudo ln -sf /opt/protomq/etc/systemd/system/protomq.service /etc/systemd/system/protomq.service && sudo systemctl daemon-reload && sudo systemctl enable --now protomq && sudo systemctl restart protomq"
```

6. **Verify Service Status**:
Ensure the service is actively running.
```bash
ssh <ssh_target> "systemctl status protomq"
```
It should say `Active: active (running)`.

## Validation Steps

### 1. Local MQTT Client Validation
Send a ProtoMQ request from the **host machine** (the machine you are running on) to the remote machine to verify basic functionality using the `protomq-cli` tool.
First, build the project locally if necessary, then run the CLI (ensure you use the correct IP/host of the remote machine).
```bash
./zig-out/bin/protomq-cli connect --host <remote_host>
```
*(You can also use `publish` or `subscribe` commands with `-t <topic>` to test actual message flow).*

### 2. Admin Server Validation
If the Admin Server is enabled, it will listen on `127.0.0.1:8080` on the remote server. Validate the endpoints directly on the remote machine over SSH using the default authorization token (`admin_secret` or check `ADMIN_TOKEN`):

- **Metrics Endpoint**:
```bash
ssh <ssh_target> 'curl -s -H "Authorization: Bearer admin_secret" http://127.0.0.1:8080/metrics'
```
*Expected Output*: JSON with connections, messages, schemas, etc. `{"connections":0,"messages_routed":0,"schemas":1,"memory_mb":0}`

- **Schemas Endpoint**:
```bash
ssh <ssh_target> 'curl -s -H "Authorization: Bearer admin_secret" http://127.0.0.1:8080/api/v1/schemas'
```
*Expected Output*: Topic-schema mapping JSON. e.g., `{"sensor/data":"SensorData"}`

If all responses match expectations and the remote CLI connection succeeds, the server is healthy and successfully deployed.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ protomq-cli discover --proto-dir schemas
```
This allows clients to "bootstrap" themselves without needing pre-shared `.proto` files.

### Admin Server

ProtoMQ includes an optional HTTP Admin Server for runtime observability and dynamic schema management without polluting the core MQTT hot-paths.

- **Dynamic Schema Registration**: Register `.proto` files at runtime via `POST /api/v1/schemas`.
- **Telemetry**: Monitor active connections, message throughput, and schemas via `GET /metrics`.
- **Zero Overhead Footprint**: The Admin Server is disabled by default to preserve the absolute minimum memory footprint for embedded devices. It is strictly conditionally compiled via the `zig build -Dadmin_server=true` flag. Enabling it moderately increases the initial static memory baseline (e.g., from ~2.6 MB to ~4.0 MB) by safely running a parallel HTTP listener, but it executes cooperatively on the same event loop ensuring zero degradation to per-message MQTT performance. When the flag is deactivated, it incurs **zero overhead footprint**.

### Performance Results

ProtoMQ delivers high performance across both high-end and edge hardware:
Expand Down
5 changes: 4 additions & 1 deletion benchmarks/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ name = "protomq-bench-suite"
version = "0.1.0"
description = "ProtoMQ benchmark suite scripts"
requires-python = ">=3.11"
dependencies = []
dependencies = [
"psutil>=7.2.2",
]

[project.scripts]
protomq-bench-b1 = "b1_baseline_concurrency.benchmark:main"
Expand All @@ -23,6 +25,7 @@ packages = [
"b5_protobuf_load",
"b6_connection_churn",
"b7_message_sizes",
"common/protomq_benchmarks",
]

[build-system]
Expand Down
36 changes: 35 additions & 1 deletion benchmarks/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,22 @@ pub fn build(b: *std.Build) void {
const target = b.standardTargetOptions(.{});
const optimize = b.standardOptimizeOption(.{});

const admin_server = b.option(bool, "admin_server", "Enable the HTTP Admin Server") orelse false;

const options = b.addOptions();
options.addOption(bool, "admin_server", admin_server);
const options_module = options.createModule();

// Server executable
const server_exe = b.addExecutable(.{
.name = "protomq-server",
.root_module = b.createModule(.{
.root_source_file = b.path("src/main.zig"),
.target = target,
.optimize = optimize,
.imports = &.{
.{ .name = "build_options", .module = options_module },
},
}),
});
b.installArtifact(server_exe);
Expand All @@ -26,6 +35,15 @@ pub fn build(b: *std.Build) void {
});
b.installArtifact(client_exe);

// Install systemd service if building for Linux
if (target.result.os.tag == .linux) {
b.getInstallStep().dependOn(&b.addInstallFileWithDir(
b.path("deploy/systemd/protomq.service"),
.prefix,
"etc/systemd/system/protomq.service",
).step);
}

// Run command for server
const run_server_cmd = b.addRunArtifact(server_exe);
run_server_cmd.step.dependOn(b.getInstallStep());
Expand All @@ -50,6 +68,9 @@ pub fn build(b: *std.Build) void {
.root_source_file = b.path("src/main.zig"),
.target = target,
.optimize = optimize,
.imports = &.{
.{ .name = "build_options", .module = options_module },
},
}),
});
const run_unit_tests = b.addRunArtifact(unit_tests);
Expand Down
14 changes: 14 additions & 0 deletions deploy/systemd/protomq.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[Unit]
Description=ProtoMQ Server
After=network.target

[Service]
Type=simple
User=root
WorkingDirectory=/opt/protomq
ExecStart=/opt/protomq/bin/protomq-server
Restart=always
RestartSec=5

[Install]
WantedBy=multi-user.target
2 changes: 2 additions & 0 deletions src/broker/broker.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ const std = @import("std");
pub const TopicBroker = struct {
allocator: std.mem.Allocator,
subscriptions: std.StringHashMap(SubscriberList),
total_messages_routed: u64,

const SubscriberList = std.ArrayList(usize); // List of client IDs

pub fn init(allocator: std.mem.Allocator) TopicBroker {
return TopicBroker{
.allocator = allocator,
.subscriptions = std.StringHashMap(SubscriberList).init(allocator),
.total_messages_routed = 0,
};
}

Expand Down
1 change: 1 addition & 0 deletions src/broker/mqtt_handler.zig
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ pub const MqttHandler = struct {
std.debug.print(" ⚠ Failed to forward to client {}: {}\n", .{ sub_index, err });
continue;
};
broker.total_messages_routed += 1;
std.debug.print(" → Forwarded to client {}\n", .{sub_index});
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/client/client.zig
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ pub const MqttClient = struct {
}

/// Loop to receive messages (Blocking)
/// Calls callback with (topic, message)
pub fn run(self: *MqttClient, callback: *const fn (topic: []const u8, message: []const u8) void) !void {
/// Calls callback with (context, topic, message)
pub fn run(self: *MqttClient, context: *anyopaque, callback: *const fn (ctx: *anyopaque, topic: []const u8, message: []const u8) void) !void {
if (self.connection == null) return error.NotConnected;

while (self.connection.?.isActive()) {
Expand All @@ -175,7 +175,7 @@ pub const MqttClient = struct {
// Use parser to parse PUBLISH packet
// Parser.parsePublish works for incoming PUBLISH
const publish_pkt = try self.parser.parsePublish(buffer);
callback(publish_pkt.topic, publish_pkt.payload);
callback(context, publish_pkt.topic, publish_pkt.payload);
} else if (header.packet_type == .PINGRESP) {
// Ignore
}
Expand Down
Loading