From 060a5fabbdfc4bd2ec4c83e5542797372000e3f0 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Tue, 31 Mar 2026 20:55:54 +0200 Subject: [PATCH 1/2] DPL: add status endpoint to the driver --- Framework/Core/CMakeLists.txt | 1 + Framework/Core/include/Framework/DeviceInfo.h | 3 + .../Core/src/ControlWebSocketHandler.cxx | 5 + Framework/Core/src/DPLWebSocket.cxx | 10 +- Framework/Core/src/DPLWebSocket.h | 1 + Framework/Core/src/DriverServerContext.h | 5 + Framework/Core/src/StatusWebSocketHandler.cxx | 503 ++++++++++++++++++ Framework/Core/src/StatusWebSocketHandler.h | 101 ++++ Framework/Core/src/runDataProcessing.cxx | 7 + 9 files changed, 635 insertions(+), 1 deletion(-) create mode 100644 Framework/Core/src/StatusWebSocketHandler.cxx create mode 100644 Framework/Core/src/StatusWebSocketHandler.h diff --git a/Framework/Core/CMakeLists.txt b/Framework/Core/CMakeLists.txt index c311ba980a20b..0e67e1c0cc623 100644 --- a/Framework/Core/CMakeLists.txt +++ b/Framework/Core/CMakeLists.txt @@ -158,6 +158,7 @@ o2_add_library(Framework src/StepTHn.cxx src/Base64.cxx src/DPLWebSocket.cxx + src/StatusWebSocketHandler.cxx src/TimerParamSpec.cxx test/TestClasses.cxx TARGETVARNAME targetName diff --git a/Framework/Core/include/Framework/DeviceInfo.h b/Framework/Core/include/Framework/DeviceInfo.h index ef93ca83ca03f..bc3e895a3d8ed 100644 --- a/Framework/Core/include/Framework/DeviceInfo.h +++ b/Framework/Core/include/Framework/DeviceInfo.h @@ -61,6 +61,9 @@ struct DeviceInfo { std::string lastError; /// An unterminated string which is not ready to be printed yet std::string unprinted; + /// Total number of log lines ever stored in history (monotonically increasing). + /// Used by status clients to track which lines they have already sent. + size_t logSeq = 0; /// Whether the device is active (running) or not. bool active; /// Whether the device is ready to quit. diff --git a/Framework/Core/src/ControlWebSocketHandler.cxx b/Framework/Core/src/ControlWebSocketHandler.cxx index 6d7926918a8c7..35528a1d6dfec 100644 --- a/Framework/Core/src/ControlWebSocketHandler.cxx +++ b/Framework/Core/src/ControlWebSocketHandler.cxx @@ -11,6 +11,7 @@ #include "ControlWebSocketHandler.h" #include "DriverServerContext.h" +#include "StatusWebSocketHandler.h" #include "Framework/DeviceMetricsHelper.h" #include "Framework/ServiceMetricsInfo.h" #include @@ -83,6 +84,10 @@ void ControlWebSocketHandler::endChunk() for (auto& callback : *mContext.metricProcessingCallbacks) { callback(mContext.registry, ServiceMetricsInfo{*mContext.metrics, *mContext.specs, *mContext.infos, mContext.driver->metrics, *mContext.driver}, timestamp); } + // Notify status clients before changed flags are reset so they can see what changed. + for (auto* statusHandler : mContext.statusHandlers) { + statusHandler->sendUpdate(mIndex); + } for (auto& metricsInfo : *mContext.metrics) { std::fill(metricsInfo.changed.begin(), metricsInfo.changed.end(), false); } diff --git a/Framework/Core/src/DPLWebSocket.cxx b/Framework/Core/src/DPLWebSocket.cxx index d9b6594d5f07c..06de46b387c29 100644 --- a/Framework/Core/src/DPLWebSocket.cxx +++ b/Framework/Core/src/DPLWebSocket.cxx @@ -18,6 +18,7 @@ #include "DriverServerContext.h" #include "DriverClientContext.h" #include "ControlWebSocketHandler.h" +#include "StatusWebSocketHandler.h" #include "HTTPParser.h" #include #include @@ -193,9 +194,10 @@ void WSDPLHandler::method(std::string_view const& s) void WSDPLHandler::target(std::string_view const& s) { - if (s != "/") { + if (s != "/" && s != "/status") { throw WSError{404, "Unknown"}; } + mTarget = s; } void populateHeader(std::map& headers, std::string_view const& k, std::string_view const& v) @@ -294,6 +296,12 @@ void WSDPLHandler::endHeaders() break; } } + } else if (mTarget == "/status" && mServerContext->isDriver) { + LOGP(info, "Status client connected ({} total)", mServerContext->statusHandlers.size() + 1); + auto* statusHandler = new StatusWebSocketHandler(*mServerContext, this); + mServerContext->statusHandlers.push_back(statusHandler); + mHandler = std::unique_ptr(statusHandler); + mHandler->headers(mHeaders); } else { if ((mServerContext->isDriver && getenv("DPL_DRIVER_REMOTE_GUI")) || ((mServerContext->isDriver == false) && getenv("DPL_DEVICE_REMOTE_GUI"))) { LOG(info) << "Connection not bound to a PID"; diff --git a/Framework/Core/src/DPLWebSocket.h b/Framework/Core/src/DPLWebSocket.h index 43ec27a6b54f0..1985c37157d65 100644 --- a/Framework/Core/src/DPLWebSocket.h +++ b/Framework/Core/src/DPLWebSocket.h @@ -62,6 +62,7 @@ struct WSDPLHandler : public HTTPParser { bool mHandshaken = false; uv_stream_t* mStream = nullptr; std::map mHeaders; + std::string mTarget; DriverServerContext* mServerContext; }; diff --git a/Framework/Core/src/DriverServerContext.h b/Framework/Core/src/DriverServerContext.h index 4d25c47bd172b..c9f2c80165d92 100644 --- a/Framework/Core/src/DriverServerContext.h +++ b/Framework/Core/src/DriverServerContext.h @@ -29,6 +29,7 @@ namespace o2::framework struct DriverInfo; struct ServiceRegistry; struct GuiCallbackContext; +struct StatusWebSocketHandler; struct DriverServerContext { ServiceRegistryRef registry; @@ -49,6 +50,10 @@ struct DriverServerContext { /// or something like that. bool isDriver = false; + /// Connected MCP/status clients. Updated by StatusWebSocketHandler + /// on connect/disconnect; notified by ControlWebSocketHandler::endChunk(). + std::vector statusHandlers; + /// The handle to the server component of the /// driver. uv_tcp_t serverHandle; diff --git a/Framework/Core/src/StatusWebSocketHandler.cxx b/Framework/Core/src/StatusWebSocketHandler.cxx new file mode 100644 index 0000000000000..db715eff6592d --- /dev/null +++ b/Framework/Core/src/StatusWebSocketHandler.cxx @@ -0,0 +1,503 @@ +// Copyright 2019-2026 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include "StatusWebSocketHandler.h" +#include "DPLWebSocket.h" +#include "DriverServerContext.h" +#include "Framework/DeviceInfo.h" +#include "Framework/DeviceMetricsInfo.h" +#include "Framework/DeviceSpec.h" +#include "Framework/DeviceStateEnums.h" +#include "Framework/LogParsingHelpers.h" +#include +#include +#include +#include +#include + +namespace o2::framework +{ + +namespace +{ + +std::string jsonEscape(std::string_view s) +{ + std::string out; + out.reserve(s.size() + 4); + for (unsigned char c : s) { + switch (c) { + case '"': + out += "\\\""; + break; + case '\\': + out += "\\\\"; + break; + case '\n': + out += "\\n"; + break; + case '\r': + out += "\\r"; + break; + case '\t': + out += "\\t"; + break; + default: + if (c < 0x20) { + char buf[8]; + snprintf(buf, sizeof(buf), "\\u%04x", c); + out += buf; + } else { + out += static_cast(c); + } + } + } + return out; +} + +char const* logLevelName(LogParsingHelpers::LogLevel level) +{ + switch (level) { + case LogParsingHelpers::LogLevel::Debug: + return "debug"; + case LogParsingHelpers::LogLevel::Info: + return "info"; + case LogParsingHelpers::LogLevel::Important: + return "important"; + case LogParsingHelpers::LogLevel::Warning: + return "warning"; + case LogParsingHelpers::LogLevel::Alarm: + return "alarm"; + case LogParsingHelpers::LogLevel::Error: + return "error"; + case LogParsingHelpers::LogLevel::Critical: + return "critical"; + case LogParsingHelpers::LogLevel::Fatal: + return "fatal"; + default: + return "unknown"; + } +} + +char const* streamingStateName(StreamingState s) +{ + switch (s) { + case StreamingState::Streaming: + return "Streaming"; + case StreamingState::EndOfStreaming: + return "EndOfStreaming"; + case StreamingState::Idle: + return "Idle"; + default: + return "Unknown"; + } +} + +void appendMetricValue(std::string& out, DeviceMetricsInfo const& info, size_t mi) +{ + auto const& metric = info.metrics[mi]; + if (metric.pos == 0) { + out += "null"; + return; + } + size_t last = (metric.pos - 1) % metricStorageSize(metric.type); + switch (metric.type) { + case MetricType::Int: + out += std::to_string(info.intMetrics[metric.storeIdx][last]); + break; + case MetricType::Float: { + char buf[32]; + snprintf(buf, sizeof(buf), "%g", static_cast(info.floatMetrics[metric.storeIdx][last])); + out += buf; + break; + } + case MetricType::Uint64: + out += std::to_string(info.uint64Metrics[metric.storeIdx][last]); + break; + default: + out += "null"; + } +} + +/// Extract the value of a simple string field from a flat JSON object. +/// e.g. extractField(R"({"cmd":"subscribe","device":"prod"})", "device") → "prod" +/// Returns empty string_view if not found. +std::string_view extractStringField(std::string_view json, std::string_view key) +{ + std::string needle; + needle += '"'; + needle += key; + needle += "\":"; + auto pos = json.find(needle); + if (pos == std::string_view::npos) { + return {}; + } + pos += needle.size(); + // skip optional whitespace between ':' and '"' + while (pos < json.size() && json[pos] == ' ') { + ++pos; + } + if (pos >= json.size() || json[pos] != '"') { + return {}; + } + ++pos; // skip opening quote + auto end = json.find('"', pos); + if (end == std::string_view::npos) { + return {}; + } + return json.substr(pos, end - pos); +} + +/// Extract the raw value of an array field from a flat JSON object. +/// e.g. extractArrayField(R"({"metrics":["a","b"]})", "metrics") → R"(["a","b"])" +std::string_view extractArrayField(std::string_view json, std::string_view key) +{ + std::string needle; + needle += '"'; + needle += key; + needle += "\":"; + auto pos = json.find(needle); + if (pos == std::string_view::npos) { + return {}; + } + pos += needle.size(); + // skip whitespace + while (pos < json.size() && json[pos] == ' ') { + ++pos; + } + if (pos >= json.size() || json[pos] != '[') { + return {}; + } + auto start = pos; + size_t depth = 0; + while (pos < json.size()) { + if (json[pos] == '[') { + ++depth; + } else if (json[pos] == ']') { + --depth; + if (depth == 0) { + return json.substr(start, pos - start + 1); + } + } + ++pos; + } + return {}; +} + +/// Iterate over the string elements of a JSON array of strings. +/// Calls @a callback for each unescaped string value. +template +void forEachStringInArray(std::string_view arr, F&& callback) +{ + // arr is like ["name1","name2"] + size_t pos = 0; + while (pos < arr.size()) { + auto q = arr.find('"', pos); + if (q == std::string_view::npos) { + break; + } + auto end = arr.find('"', q + 1); + if (end == std::string_view::npos) { + break; + } + callback(arr.substr(q + 1, end - q - 1)); + pos = end + 1; + } +} + +} // anonymous namespace + +StatusWebSocketHandler::StatusWebSocketHandler(DriverServerContext& context, WSDPLHandler* handler) + : mContext{context}, mHandler{handler} +{ +} + +StatusWebSocketHandler::~StatusWebSocketHandler() +{ + auto& handlers = mContext.statusHandlers; + handlers.erase(std::remove(handlers.begin(), handlers.end(), this), handlers.end()); +} + +void StatusWebSocketHandler::headers(std::map const&) +{ + sendSnapshot(); +} + +void StatusWebSocketHandler::frame(char const* data, size_t s) +{ + std::string_view msg{data, s}; + auto cmd = extractStringField(msg, "cmd"); + if (cmd.empty()) { + return; + } + auto deviceName = extractStringField(msg, "device"); + + if (cmd == "list_metrics") { + handleListMetrics(deviceName); + } else if (cmd == "subscribe") { + handleSubscribe(deviceName, extractArrayField(msg, "metrics")); + } else if (cmd == "unsubscribe") { + handleUnsubscribe(deviceName, extractArrayField(msg, "metrics")); + } else if (cmd == "subscribe_logs") { + handleSubscribeLogs(deviceName); + } else if (cmd == "unsubscribe_logs") { + handleUnsubscribeLogs(deviceName); + } +} + +void StatusWebSocketHandler::sendText(std::string const& json) +{ + std::vector outputs; + encode_websocket_frames(outputs, json.data(), json.size(), WebSocketOpCode::Text, 0); + mHandler->write(outputs); +} + +void StatusWebSocketHandler::sendSnapshot() +{ + auto const& specs = *mContext.specs; + auto const& infos = *mContext.infos; + + // Size subscription tables to current device count; grow lazily as needed. + mSubscribedMetrics.resize(specs.size()); + mLastLogSeq.resize(infos.size()); + for (size_t di = 0; di < infos.size(); ++di) { + mLastLogSeq[di] = infos[di].logSeq; + } + + std::string out; + out.reserve(512 + specs.size() * 128); + out += R"({"type":"snapshot","devices":[)"; + for (size_t di = 0; di < specs.size(); ++di) { + if (di > 0) { + out += ','; + } + auto const& info = infos[di]; + out += R"({"name":")"; + out += jsonEscape(specs[di].name); + out += R"(","pid":)"; + out += std::to_string(info.pid); + out += R"(,"active":)"; + out += info.active ? "true" : "false"; + out += R"(,"streamingState":")"; + out += streamingStateName(info.streamingState); + out += R"(","deviceState":")"; + out += jsonEscape(info.deviceState); + out += R"("})"; + } + out += "]}"; + sendText(out); +} + +void StatusWebSocketHandler::sendUpdate(size_t deviceIndex) +{ + auto const& specs = *mContext.specs; + auto const& metrics = *mContext.metrics; + + if (deviceIndex >= specs.size() || deviceIndex >= metrics.size()) { + return; + } + + // Lazily grow the subscription table if new devices were added after snapshot. + if (mSubscribedMetrics.size() <= deviceIndex) { + mSubscribedMetrics.resize(deviceIndex + 1); + } + + auto const& subscribed = mSubscribedMetrics[deviceIndex]; + if (subscribed.empty()) { + return; + } + + auto const& info = metrics[deviceIndex]; + std::string metricsJson; + metricsJson += '{'; + bool first = true; + for (size_t mi = 0; mi < info.metrics.size(); ++mi) { + if (!info.changed[mi]) { + continue; + } + auto const& metric = info.metrics[mi]; + if (metric.type == MetricType::String || + metric.type == MetricType::Enum || + metric.type == MetricType::Unknown) { + continue; + } + auto const& label = info.metricLabels[mi]; + std::string_view labelSV{label.label, label.size}; + if (subscribed.find(std::string(labelSV)) == subscribed.end()) { + continue; + } + if (!first) { + metricsJson += ','; + } + first = false; + metricsJson += '"'; + metricsJson += jsonEscape(labelSV); + metricsJson += "\":"; + appendMetricValue(metricsJson, info, mi); + } + metricsJson += '}'; + + if (first) { + // Nothing subscribed changed in this cycle. + return; + } + + std::string out; + out += R"({"type":"update","device":)"; + out += std::to_string(deviceIndex); + out += R"(,"name":")"; + out += jsonEscape(specs[deviceIndex].name); + out += R"(","metrics":)"; + out += metricsJson; + out += '}'; + sendText(out); +} + +void StatusWebSocketHandler::handleListMetrics(std::string_view deviceName) +{ + size_t di = findDeviceIndex(deviceName); + if (di == SIZE_MAX) { + return; + } + auto const& metrics = *mContext.metrics; + + std::string out; + out += R"({"type":"metrics_list","device":")"; + out += jsonEscape(deviceName); + out += R"(","metrics":[)"; + bool first = true; + if (di < metrics.size()) { + auto const& info = metrics[di]; + for (size_t mi = 0; mi < info.metrics.size(); ++mi) { + auto const& metric = info.metrics[mi]; + if (metric.type == MetricType::String || + metric.type == MetricType::Enum || + metric.type == MetricType::Unknown) { + continue; + } + if (!first) { + out += ','; + } + first = false; + auto const& label = info.metricLabels[mi]; + out += '"'; + out += jsonEscape({label.label, label.size}); + out += '"'; + } + } + out += "]}"; + sendText(out); +} + +void StatusWebSocketHandler::handleSubscribe(std::string_view deviceName, std::string_view metricsArr) +{ + size_t di = findDeviceIndex(deviceName); + if (di == SIZE_MAX || metricsArr.empty()) { + return; + } + if (mSubscribedMetrics.size() <= di) { + mSubscribedMetrics.resize(di + 1); + } + forEachStringInArray(metricsArr, [&](std::string_view name) { + mSubscribedMetrics[di].emplace(name); + }); +} + +void StatusWebSocketHandler::handleUnsubscribe(std::string_view deviceName, std::string_view metricsArr) +{ + size_t di = findDeviceIndex(deviceName); + if (di == SIZE_MAX || metricsArr.empty() || di >= mSubscribedMetrics.size()) { + return; + } + forEachStringInArray(metricsArr, [&](std::string_view name) { + mSubscribedMetrics[di].erase(std::string(name)); + }); +} + +size_t StatusWebSocketHandler::findDeviceIndex(std::string_view name) const +{ + auto const& specs = *mContext.specs; + for (size_t di = 0; di < specs.size(); ++di) { + if (specs[di].name == name) { + return di; + } + } + return SIZE_MAX; +} + +void StatusWebSocketHandler::handleSubscribeLogs(std::string_view deviceName) +{ + size_t di = findDeviceIndex(deviceName); + if (di == SIZE_MAX) { + return; + } + if (mLastLogSeq.size() <= di) { + mLastLogSeq.resize(di + 1, 0); + } + // Start the cursor at the current log position so we only push future lines. + mLastLogSeq[di] = (*mContext.infos)[di].logSeq; + mLogSubscriptions.insert(di); +} + +void StatusWebSocketHandler::handleUnsubscribeLogs(std::string_view deviceName) +{ + size_t di = findDeviceIndex(deviceName); + if (di == SIZE_MAX) { + return; + } + mLogSubscriptions.erase(di); +} + +void StatusWebSocketHandler::sendNewLogs(size_t deviceIndex) +{ + if (mLogSubscriptions.find(deviceIndex) == mLogSubscriptions.end()) { + return; + } + auto const& infos = *mContext.infos; + auto const& specs = *mContext.specs; + if (deviceIndex >= infos.size() || deviceIndex >= specs.size()) { + return; + } + if (mLastLogSeq.size() <= deviceIndex) { + mLastLogSeq.resize(deviceIndex + 1, 0); + } + + auto const& info = infos[deviceIndex]; + size_t newLines = info.logSeq - mLastLogSeq[deviceIndex]; + if (newLines == 0) { + return; + } + // Cap to buffer size to avoid re-reading overwritten entries. + if (newLines > info.history.size()) { + newLines = info.history.size(); + } + + size_t histSize = info.history.size(); + // The oldest unread entry sits at (historyPos - newLines + histSize) % histSize. + size_t startPos = (info.historyPos + histSize - newLines) % histSize; + + std::string_view devName = specs[deviceIndex].name; + for (size_t i = 0; i < newLines; ++i) { + size_t pos = (startPos + i) % histSize; + std::string out; + out += R"({"type":"log","device":")"; + out += jsonEscape(devName); + out += R"(","level":")"; + out += logLevelName(info.historyLevel[pos]); + out += R"(","line":")"; + out += jsonEscape(info.history[pos]); + out += R"("})"; + sendText(out); + } + mLastLogSeq[deviceIndex] = info.logSeq; +} + +} // namespace o2::framework diff --git a/Framework/Core/src/StatusWebSocketHandler.h b/Framework/Core/src/StatusWebSocketHandler.h new file mode 100644 index 0000000000000..86a460e289440 --- /dev/null +++ b/Framework/Core/src/StatusWebSocketHandler.h @@ -0,0 +1,101 @@ +// Copyright 2019-2026 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef O2_FRAMEWORK_STATUSWEBSOCKETHANDLER_H_ +#define O2_FRAMEWORK_STATUSWEBSOCKETHANDLER_H_ + +#include "HTTPParser.h" +#include +#include +#include +#include +#include + +namespace o2::framework +{ +struct DriverServerContext; +struct WSDPLHandler; + +/// WebSocket handler for the /status endpoint. +/// +/// Protocol (client → driver): +/// {"cmd":"list_metrics","device":""} +/// → driver replies with {"type":"metrics_list","device":"","metrics":[...]} +/// +/// {"cmd":"subscribe","device":"","metrics":["m1","m2",...]} +/// → driver starts including those metrics in subsequent update frames +/// +/// {"cmd":"unsubscribe","device":"","metrics":["m1","m2",...]} +/// → driver stops sending those metrics +/// +/// {"cmd":"subscribe_logs","device":""} +/// → driver starts pushing new log lines for the device +/// +/// {"cmd":"unsubscribe_logs","device":""} +/// → driver stops pushing log lines for the device +/// +/// Protocol (driver → client): +/// {"type":"snapshot","devices":[{"name","pid","active","streamingState","deviceState"},...]} +/// → sent once on connect; contains no metrics or logs +/// +/// {"type":"update","device":,"name":"","metrics":{}} +/// → sent after each metrics cycle for devices with subscribed metrics that changed +/// +/// {"type":"metrics_list","device":"","metrics":["m1","m2",...]} +/// → reply to list_metrics command +/// +/// {"type":"log","device":"","level":"","line":""} +/// → pushed for each new log line from a subscribed device +struct StatusWebSocketHandler : public WebSocketHandler { + StatusWebSocketHandler(DriverServerContext& context, WSDPLHandler* handler); + ~StatusWebSocketHandler() override; + + /// Sends the minimal snapshot on handshake completion. + void headers(std::map const& headers) override; + /// Handles incoming commands from the MCP client. + void frame(char const* data, size_t s) override; + void beginChunk() override {} + void endChunk() override {} + void beginFragmentation() override {} + void endFragmentation() override {} + void control(char const* frame, size_t s) override {} + + /// Send a minimal JSON snapshot (device list + basic state, no metrics/logs). + void sendSnapshot(); + /// Push an update for device at @a deviceIndex. + /// Only metrics that are both changed[] and subscribed are included. + /// No-op if nothing subscribed or nothing changed for this device. + void sendUpdate(size_t deviceIndex); + /// Push any log lines for @a deviceIndex that arrived since the last call. + /// No-op if the device is not subscribed for logs. + void sendNewLogs(size_t deviceIndex); + + private: + void sendText(std::string const& json); + void handleListMetrics(std::string_view deviceName); + void handleSubscribe(std::string_view deviceName, std::string_view metricsJson); + void handleUnsubscribe(std::string_view deviceName, std::string_view metricsJson); + void handleSubscribeLogs(std::string_view deviceName); + void handleUnsubscribeLogs(std::string_view deviceName); + size_t findDeviceIndex(std::string_view name) const; + + DriverServerContext& mContext; + WSDPLHandler* mHandler; + /// Per-device set of subscribed metric label strings. + /// Sized to specs->size() on sendSnapshot(); grows if new devices appear. + std::vector> mSubscribedMetrics; + /// Per-device log cursor: value of DeviceInfo::logSeq when we last sent logs. + std::vector mLastLogSeq; + /// Set of device indices whose logs are being streamed. + std::unordered_set mLogSubscriptions; +}; + +} // namespace o2::framework +#endif // O2_FRAMEWORK_STATUSWEBSOCKETHANDLER_H_ diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 98cbf70370c3d..70f3c8940ef26 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -68,6 +68,7 @@ #include "Framework/DefaultsHelpers.h" #include "ProcessingPoliciesHelpers.h" #include "DriverServerContext.h" +#include "StatusWebSocketHandler.h" #include "HTTPParser.h" #include "DPLWebSocket.h" #include "ArrowSupport.h" @@ -891,6 +892,7 @@ void processChildrenOutput(uv_loop_t* loop, info.history[info.historyPos] = token; info.historyLevel[info.historyPos] = logLevel; info.historyPos = (info.historyPos + 1) % info.history.size(); + info.logSeq++; fmt::print("[{}:{}]: {}\n", info.pid, spec.id, token); } // We keep track of the maximum log error a @@ -1541,6 +1543,11 @@ int runStateMachine(DataProcessorSpecs const& workflow, uv_async_init(loop, serverContext.asyncLogProcessing, [](uv_async_t* handle) { auto* context = (DriverServerContext*)handle->data; processChildrenOutput(context->loop, *context->driver, *context->infos, *context->specs, *context->controls); + for (auto* statusHandler : context->statusHandlers) { + for (size_t di = 0; di < context->infos->size(); ++di) { + statusHandler->sendNewLogs(di); + } + } }); while (true) { From 84a03350dde7051de035e05e986d24207de6345e Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Tue, 31 Mar 2026 20:55:54 +0200 Subject: [PATCH 2/2] DPL: add MCP server for DPL Allows debugging a DPL workflow using Claude, ChatGPT or similar tools. --- .../Core/scripts/dpl-mcp-server/README.md | 75 +++++ .../scripts/dpl-mcp-server/dpl_mcp_server.py | 304 ++++++++++++++++++ .../scripts/dpl-mcp-server/pyproject.toml | 19 ++ 3 files changed, 398 insertions(+) create mode 100644 Framework/Core/scripts/dpl-mcp-server/README.md create mode 100644 Framework/Core/scripts/dpl-mcp-server/dpl_mcp_server.py create mode 100644 Framework/Core/scripts/dpl-mcp-server/pyproject.toml diff --git a/Framework/Core/scripts/dpl-mcp-server/README.md b/Framework/Core/scripts/dpl-mcp-server/README.md new file mode 100644 index 0000000000000..65d2378c5d756 --- /dev/null +++ b/Framework/Core/scripts/dpl-mcp-server/README.md @@ -0,0 +1,75 @@ +# DPL Status MCP Server + +An MCP server that connects to a running DPL driver's `/status` WebSocket endpoint and exposes its device state and metrics as tools for an AI assistant (e.g. Claude). + +## Requirements + +```bash +pip install mcp websockets +# or install the package directly: +pip install ./Framework/Core/scripts/dpl-mcp-server/ +``` + +## Running + +The driver port defaults to `8080`. Override with `--port`, `--pid`, or `DPL_STATUS_PORT`: + +```bash +python3 dpl_mcp_server.py --port 8080 +python3 dpl_mcp_server.py --pid 12345 # port = 8080 + pid % 30000 +DPL_STATUS_PORT=8080 python3 dpl_mcp_server.py +``` + +If installed as a package: + +```bash +dpl-mcp-server --pid $(pgrep -f diamond-workflow | head -1) +``` + +## Claude Code integration + +Add to `.mcp.json` in your project (or `~/.claude.json` for global use): + +```json +{ + "mcpServers": { + "dpl": { + "command": "dpl-mcp-server", + "args": ["--pid", "12345"] + } + } +} +``` + +Or with `claude mcp add`: + +```bash +claude mcp add dpl -- dpl-mcp-server --pid 12345 +``` + +## Available tools + +| Tool | Description | +|------|-------------| +| `list_devices` | List all devices with pid, active flag, streaming and device state | +| `list_metrics(device)` | List numeric metrics available for a device | +| `subscribe(device, metrics)` | Subscribe to metrics; driver will push updates when they change | +| `unsubscribe(device, metrics)` | Stop receiving updates for specific metrics | +| `get_updates(max_updates)` | Drain buffered update frames (default: up to 50) | + +## Protocol + +The driver sends a snapshot on connect, then pushes updates only for subscribed metrics that changed each processing cycle. There is no polling — updates arrive in real time as the workflow runs. + +``` +connect → {"type":"snapshot","devices":[{"name","pid","active","streamingState","deviceState"},...]} + +client → {"cmd":"list_metrics","device":"producer"} +driver → {"type":"metrics_list","device":"producer","metrics":["input-parts","output-bytes",...]} + +client → {"cmd":"subscribe","device":"producer","metrics":["output-bytes"]} +driver → {"type":"update","device":0,"name":"producer","metrics":{"output-bytes":1048576}} + (pushed every cycle in which output-bytes changed) + +client → {"cmd":"unsubscribe","device":"producer","metrics":["output-bytes"]} +``` diff --git a/Framework/Core/scripts/dpl-mcp-server/dpl_mcp_server.py b/Framework/Core/scripts/dpl-mcp-server/dpl_mcp_server.py new file mode 100644 index 0000000000000..bc04acf026188 --- /dev/null +++ b/Framework/Core/scripts/dpl-mcp-server/dpl_mcp_server.py @@ -0,0 +1,304 @@ +#!/usr/bin/env python3 +# Copyright 2019-2026 CERN and copyright holders of ALICE O2. +# See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +# All rights not expressly granted are reserved. +# +# This software is distributed under the terms of the GNU General Public +# License v3 (GPL Version 3), copied verbatim in the file "COPYING". +# +# In applying this license CERN does not waive the privileges and immunities +# granted to it by virtue of its status as an Intergovernmental Organization +# or submit itself to any jurisdiction. +"""DPL status MCP server. + +Bridges the DPL driver /status WebSocket endpoint to MCP tools so that an +AI assistant (e.g. Claude) can inspect and monitor a running DPL workflow. + +Usage +----- + python3 dpl_mcp_server.py --port 8080 + python3 dpl_mcp_server.py --pid 12345 # port derived as 8080 + pid % 30000 + DPL_STATUS_PORT=8080 python3 dpl_mcp_server.py + +Wire protocol (client → driver) +-------------------------------- + {"cmd":"list_metrics","device":""} + {"cmd":"subscribe","device":"","metrics":["m1","m2"]} + {"cmd":"unsubscribe","device":"","metrics":["m1"]} + +Wire protocol (driver → client) +-------------------------------- + {"type":"snapshot","devices":[{"name","pid","active","streamingState","deviceState"},...]} + {"type":"update","device":,"name":"","metrics":{}} + {"type":"metrics_list","device":"","metrics":["m1","m2",...]} +""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import os +import sys +from typing import Any + +import websockets +from mcp.server.fastmcp import FastMCP + +# --------------------------------------------------------------------------- +# Global connection state (all access from the single asyncio event loop) +# --------------------------------------------------------------------------- +_port: int = 8080 +_ws: Any = None +_reader_task: asyncio.Task | None = None +_snapshot: dict = {} +_updates: list[dict] = [] +_logs: list[dict] = [] +_metrics_lists: dict[str, list[str]] = {} + + +async def _ensure_connected() -> None: + """Connect (or reconnect) to the driver's /status WebSocket.""" + global _ws, _reader_task + + # Check liveness of existing connection. + if _ws is not None: + try: + pong = await asyncio.wait_for(_ws.ping(), timeout=2.0) + await pong + return + except Exception: + _ws = None + if _reader_task is not None and not _reader_task.done(): + _reader_task.cancel() + _reader_task = None + + url = f"ws://localhost:{_port}/status" + _ws = await websockets.connect(url, subprotocols=["dpl"]) + if _reader_task is None or _reader_task.done(): + _reader_task = asyncio.create_task(_reader()) + + +async def _reader() -> None: + """Background task: read frames from the driver and buffer them.""" + global _ws, _snapshot, _updates, _logs, _metrics_lists + try: + async for raw in _ws: + try: + msg = json.loads(raw) + except json.JSONDecodeError: + continue + t = msg.get("type") + if t == "snapshot": + _snapshot = msg + # Clear stale metric lists from a previous driver instance. + _metrics_lists.clear() + elif t == "update": + _updates.append(msg) + elif t == "log": + _logs.append(msg) + elif t == "metrics_list": + device = msg.get("device", "") + _metrics_lists[device] = msg.get("metrics", []) + except Exception: + pass + finally: + _ws = None + + +async def _send(obj: dict) -> None: + await _ensure_connected() + await _ws.send(json.dumps(obj, separators=(",", ":"))) + + +# --------------------------------------------------------------------------- +# MCP server definition +# --------------------------------------------------------------------------- +mcp = FastMCP("DPL Status") + + +@mcp.tool() +async def list_devices() -> str: + """List all DPL devices with their current status. + + Returns each device's name, PID, active flag, streaming state, and device + state as reported by the driver snapshot. + """ + await _ensure_connected() + if not _snapshot: + return "No snapshot received yet — the driver may still be starting." + devices = _snapshot.get("devices", []) + if not devices: + return "No devices in snapshot." + lines = [] + for d in devices: + lines.append( + f"{d['name']}: pid={d['pid']} active={d['active']} " + f"streaming={d['streamingState']} state={d['deviceState']}" + ) + return "\n".join(lines) + + +@mcp.tool() +async def list_metrics(device: str) -> str: + """List the available numeric metrics for a DPL device. + + Sends a list_metrics command to the driver and waits up to 3 seconds for + the reply. Only numeric metrics (int, float, uint64) are included; string + and enum metrics are excluded. + + Args: + device: Device name exactly as shown by list_devices. + """ + # Remove any stale cached result so we can detect the fresh reply. + _metrics_lists.pop(device, None) + await _send({"cmd": "list_metrics", "device": device}) + for _ in range(60): # up to 3 s + await asyncio.sleep(0.05) + if device in _metrics_lists: + names = _metrics_lists[device] + if not names: + return f"Device '{device}' has no numeric metrics yet." + return f"{len(names)} metric(s): " + ", ".join(names) + return f"No reply from driver for device '{device}' (timeout)." + + +@mcp.tool() +async def subscribe(device: str, metrics: list[str]) -> str: + """Subscribe to one or more metrics for a DPL device. + + After subscribing, the driver will push update frames for the device + whenever any of the subscribed metrics change. Use get_updates to drain + the buffer. + + Args: + device: Device name exactly as shown by list_devices. + metrics: List of metric names to subscribe to (from list_metrics). + """ + await _send({"cmd": "subscribe", "device": device, "metrics": metrics}) + return f"Subscribed to {len(metrics)} metric(s) for '{device}': {', '.join(metrics)}" + + +@mcp.tool() +async def unsubscribe(device: str, metrics: list[str]) -> str: + """Stop receiving updates for specific metrics of a DPL device. + + Args: + device: Device name exactly as shown by list_devices. + metrics: List of metric names to unsubscribe from. + """ + await _send({"cmd": "unsubscribe", "device": device, "metrics": metrics}) + return f"Unsubscribed from {len(metrics)} metric(s) for '{device}'." + + +@mcp.tool() +async def subscribe_logs(device: str) -> str: + """Subscribe to log output for a DPL device. + + After subscribing, new log lines from the device will be buffered and + can be retrieved with get_logs(). + + Args: + device: Device name exactly as shown by list_devices. + """ + await _send({"cmd": "subscribe_logs", "device": device}) + return f"Subscribed to logs for '{device}'." + + +@mcp.tool() +async def unsubscribe_logs(device: str) -> str: + """Stop receiving log output for a DPL device. + + Args: + device: Device name exactly as shown by list_devices. + """ + await _send({"cmd": "unsubscribe_logs", "device": device}) + return f"Unsubscribed from logs for '{device}'." + + +@mcp.tool() +async def get_logs(max_lines: int = 100) -> str: + """Drain and return buffered log lines received since the last call. + + Args: + max_lines: Maximum number of log lines to return (default 100). + """ + await _ensure_connected() + batch = _logs[:max_lines] + del _logs[:max_lines] + if not batch: + return "No buffered log lines." + lines = [] + for entry in batch: + device = entry.get("device", "?") + level = entry.get("level", "?") + line = entry.get("line", "") + lines.append(f"[{device}][{level}] {line}") + return "\n".join(lines) + + +@mcp.tool() +async def get_updates(max_updates: int = 50) -> str: + """Drain and return buffered metric update frames received since the last call. + + Each frame contains the latest values of all subscribed metrics that + changed during that processing cycle. Calling this repeatedly gives a + time-ordered view of metric evolution. + + Args: + max_updates: Maximum number of update frames to return (default 50). + """ + await _ensure_connected() + batch = _updates[:max_updates] + del _updates[:max_updates] + if not batch: + return "No buffered updates." + lines = [] + for upd in batch: + name = upd.get("name") or f"device[{upd.get('device', '?')}]" + metrics = upd.get("metrics", {}) + if metrics: + parts = ", ".join(f"{k}={v}" for k, v in metrics.items()) + lines.append(f"{name}: {parts}") + else: + lines.append(f"{name}: (empty update)") + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- +def main() -> None: + global _port + + parser = argparse.ArgumentParser( + description="DPL status MCP server — expose DPL driver metrics via MCP tools" + ) + group = parser.add_mutually_exclusive_group() + group.add_argument( + "--port", + type=int, + default=None, + help="TCP port of the DPL driver status WebSocket (default: 8080 or DPL_STATUS_PORT env var)", + ) + group.add_argument( + "--pid", + type=int, + default=None, + help="PID of the DPL driver process; port is derived as 8080 + pid %% 30000", + ) + args = parser.parse_args() + + if args.pid is not None: + _port = 8080 + args.pid % 30000 + elif args.port is not None: + _port = args.port + elif "DPL_STATUS_PORT" in os.environ: + _port = int(os.environ["DPL_STATUS_PORT"]) + # else leave _port at the default 8080 + + mcp.run() + + +if __name__ == "__main__": + main() diff --git a/Framework/Core/scripts/dpl-mcp-server/pyproject.toml b/Framework/Core/scripts/dpl-mcp-server/pyproject.toml new file mode 100644 index 0000000000000..f87c1b770c934 --- /dev/null +++ b/Framework/Core/scripts/dpl-mcp-server/pyproject.toml @@ -0,0 +1,19 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "dpl-mcp-server" +version = "0.1.0" +description = "MCP server for monitoring DPL (Data Processing Layer) workflows" +requires-python = ">=3.11" +dependencies = [ + "mcp>=1.0.0", + "websockets>=12.0", +] + +[project.scripts] +dpl-mcp-server = "dpl_mcp_server:main" + +[tool.hatch.build.targets.wheel] +include = ["dpl_mcp_server.py"]