From f11e2700a15b85cda0c47326d1a6ff9fdc098d63 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Sat, 10 Aug 2024 16:38:50 +0200 Subject: [PATCH] Make AsyncQueue thread safe Mutex should avoid concurrent appending and flushing of the queue. It would be probably better to use a lockless priority queue, good enough for now. --- Framework/Core/src/AsyncQueue.cxx | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/Framework/Core/src/AsyncQueue.cxx b/Framework/Core/src/AsyncQueue.cxx index 36c0c17009f82..bce2f31e57de7 100644 --- a/Framework/Core/src/AsyncQueue.cxx +++ b/Framework/Core/src/AsyncQueue.cxx @@ -15,10 +15,13 @@ O2_DECLARE_DYNAMIC_LOG(async_queue); +std::mutex gAsyncQueueMutex; + namespace o2::framework { auto AsyncQueueHelpers::create(AsyncQueue& queue, AsyncTaskSpec spec) -> AsyncTaskId { + std::lock_guard guard(gAsyncQueueMutex); AsyncTaskId id; id.value = queue.prototypes.size(); queue.prototypes.push_back(spec); @@ -32,11 +35,14 @@ auto AsyncQueueHelpers::post(AsyncQueue& queue, AsyncTaskId id, AsyncCallback ta taskToPost.id = id; taskToPost.timeslice = timeslice; taskToPost.debounce = debounce; + + std::lock_guard guard(gAsyncQueueMutex); queue.tasks.push_back(taskToPost); } auto AsyncQueueHelpers::run(AsyncQueue& queue, TimesliceId oldestPossible) -> void { + std::lock_guard guard(gAsyncQueueMutex); if (queue.tasks.empty()) { return; } @@ -126,6 +132,7 @@ auto AsyncQueueHelpers::run(AsyncQueue& queue, TimesliceId oldestPossible) -> vo auto AsyncQueueHelpers::reset(AsyncQueue& queue) -> void { + std::lock_guard guard(gAsyncQueueMutex); queue.tasks.clear(); queue.iteration = 0; }