Skip to content
Open
224 changes: 192 additions & 32 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import types
import weakref
import errno
from contextlib import contextmanager

from queue import Empty, Full
from queue import Empty, Full, ShutDown

from . import connection
from . import context
Expand All @@ -45,21 +46,37 @@ def __init__(self, maxsize=0, *, ctx):
else:
self._wlock = ctx.Lock()
self._sem = ctx.BoundedSemaphore(maxsize)

self._lock_shutdown = ctx.Lock()
# Cannot use a ctx.Value because 'ctypes' library is
# not always available on all Linux platforms.
# Use of Semaphores instead of an array from `heap.BufferWrapper'
# is here more efficient and explicit.
self._sem_flag_shutdown = ctx.Semaphore(0)
self._sem_flag_shutdown_immediate = ctx.Semaphore(0)
self._sem_pending_getters = ctx.Semaphore(0)
self._sem_pending_putters = ctx.Semaphore(0)

# For use by concurrent.futures
self._ignore_epipe = False
self._reset()

if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork)

def __getstate__(self):
context.assert_spawning(self)
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
self._rlock, self._wlock, self._sem, self._opid,
self._lock_shutdown,
self._sem_flag_shutdown, self._sem_flag_shutdown_immediate,
self._sem_pending_getters, self._sem_pending_putters)

def __setstate__(self, state):
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._rlock, self._wlock, self._sem, self._opid,
self._lock_shutdown,
self._sem_flag_shutdown, self._sem_flag_shutdown_immediate,
self._sem_pending_getters, self._sem_pending_putters) = state
self._reset()

def _after_fork(self):
Expand All @@ -81,43 +98,101 @@ def _reset(self, after_fork=False):
self._recv_bytes = self._reader.recv_bytes
self._poll = self._reader.poll

def _is_shutdown(self):
return not self._sem_flag_shutdown.locked()

def _set_shutdown(self, immediate=False):
self._sem_flag_shutdown.release()
if immediate:
self._sem_flag_shutdown_immediate.release()

@contextmanager
def _handle_pending_processes(self, sem):
# Count pending getter or putter processes in a dedicated
# semaphore. These 2 semaphores are only used when queue
# shuts down to release one by one all pending processes.
sem.release()
try:
# Wraps potentialy blocking calls:
# sem._sem.acquire() in put method,
# _recv_bytes()/_poll(*args) in get method.
yield
finally:
sem.acquire()

def _release_pending_putters(self):
with self._lock_shutdown:
if not self._sem_pending_putters.locked():
self._sem.release()

def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if not self._sem.acquire(block, timeout):
raise Full

if self._is_shutdown():
raise ShutDown
try:
with self._handle_pending_processes(self._sem_pending_putters):
if not self._sem.acquire(block, timeout):
raise Full
finally:
if self._is_shutdown():
self._release_pending_putters()
raise ShutDown

with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()

def _release_pending_getters(self):
with self._lock_shutdown:
if not self._sem_pending_getters.locked():
self._put_sentinel()

def get(self, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if block and timeout is None:
with self._rlock:
res = self._recv_bytes()
self._sem.release()
else:
if block:
deadline = time.monotonic() + timeout
if not self._rlock.acquire(block, timeout):
raise Empty
try:
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout):

if (empty := self.empty()) and self._is_shutdown():
raise ShutDown
try:
with self._handle_pending_processes(self._sem_pending_getters):
if block and timeout is None:
with self._rlock:
res = self._recv_bytes()
self._sem.release()
else:
if block:
deadline = time.monotonic() + timeout
if not self._rlock.acquire(block, timeout):
raise Empty
elif not self._poll():
raise Empty
res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()
# unserialize the data after having released the lock
return _ForkingPickler.loads(res)
try:
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout):
raise Empty
elif not self._poll():
raise Empty
res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()
finally:
if self._is_shutdown() and empty:
self._release_pending_getters()
raise ShutDown

item = _ForkingPickler.loads(res)
if self._is_shutdown() \
and isinstance(item, _ShutdownSentinel):
# A pending getter process is just unblocked,
# Unblock a next one if exists.
self._release_pending_getters()
raise ShutDown

return item

def qsize(self):
# Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
Expand All @@ -135,6 +210,57 @@ def get_nowait(self):
def put_nowait(self, obj):
return self.put(obj, False)

def _clear(self):
with self._rlock:
while self._poll():
self._recv_bytes()

def _put_sentinel(self):
# When put a sentinel into an empty queue,
# dont forget to call to _sem.acquire in order to
# maintain a correct count of acquire/release
# calls for BoudedSempaphore.
self._sem.acquire()

with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(_sentinel_shutdown)
self._notempty.notify()

def shutdown(self, immediate=False):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")

with self._lock_shutdown:
if self._is_shutdown():
raise RuntimeError(f"Queue {self!r} already shut down")

is_pending_getters = not self._sem_pending_getters.locked()
is_pending_putters = not self._sem_pending_putters.locked()
str_shutdown = f"shutdown -> immediate:{immediate}"
str_shutdown += f"/PGetters:{is_pending_getters}" \
f"/PPutters:{is_pending_putters}" \
f"/Empty:{self.empty()}" \
f"/Full:{self.full()}"
debug(str_shutdown)
self._set_shutdown(immediate)

# Shut down is immediatly and there is no pending getter,
# we purge the queue (pipe). If there are datas into the buffer
# the 'feeder' thread should erase all of them.
if immediate and not is_pending_getters:
self._clear()

# Starting release one pending getter process.
# Put a first shutdown sentinel data into the pipe.
if self.empty() and is_pending_getters:
self._put_sentinel()

# Starting release one pending putter processes.
if is_pending_putters:
self._sem.release()

def close(self):
self._closed = True
close = self._close
Expand Down Expand Up @@ -180,7 +306,7 @@ def _start_thread(self):
args=(self._buffer, self._notempty, self._send_bytes,
self._wlock, self._reader.close, self._writer.close,
self._ignore_epipe, self._on_queue_feeder_error,
self._sem),
self._sem, self._sem_flag_shutdown_immediate),
name='QueueFeederThread',
daemon=True,
)
Expand Down Expand Up @@ -228,7 +354,8 @@ def _finalize_close(buffer, notempty):

@staticmethod
def _feed(buffer, notempty, send_bytes, writelock, reader_close,
writer_close, ignore_epipe, onerror, queue_sem):
writer_close, ignore_epipe, onerror, queue_sem,
flag_shutdown_immediate):
debug('starting thread to feed data to pipe')
nacquire = notempty.acquire
nrelease = notempty.release
Expand All @@ -240,7 +367,7 @@ def _feed(buffer, notempty, send_bytes, writelock, reader_close,
wrelease = writelock.release
else:
wacquire = None

is_shutdown_immediate = lambda: not flag_shutdown_immediate.locked()
while 1:
try:
nacquire()
Expand All @@ -258,6 +385,14 @@ def _feed(buffer, notempty, send_bytes, writelock, reader_close,
writer_close()
return

# When queue shuts down immediatly, do not insert
# regular data in pipe, only shutdown sentinel.
if is_shutdown_immediate() \
and not isinstance(obj, _ShutdownSentinel):
debug("Queue shuts down immediatly, " \
"don't feed regular data to pipe")
continue

# serialize the data before acquiring the lock
obj = _ForkingPickler.dumps(obj)
if wacquire is None:
Expand Down Expand Up @@ -301,6 +436,12 @@ def _on_queue_feeder_error(e, obj):
__class_getitem__ = classmethod(types.GenericAlias)


# Sentinel item used to release pending getter processes
# when queue shuts down.
class _ShutdownSentinel: pass
_sentinel_shutdown = _ShutdownSentinel()


_sentinel = object()

#
Expand Down Expand Up @@ -328,8 +469,16 @@ def __setstate__(self, state):
def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if not self._sem.acquire(block, timeout):
raise Full
if self._is_shutdown():
raise ShutDown
try:
with self._handle_pending_processes(self._sem_pending_putters):
if not self._sem.acquire(block, timeout):
raise Full
finally:
if self._is_shutdown():
self._release_pending_putters()
raise ShutDown

with self._notempty, self._cond:
if self._thread is None:
Expand All @@ -350,6 +499,17 @@ def join(self):
if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait()

def _clear(self):
super()._clear()

# Data could be in the buffer, not in the pipe.
# Call acquire method of '_unfinished_tasks' Semaphore
# until counter is zero.
with self._cond:
while not self._unfinished_tasks.locked():
self._unfinished_tasks.acquire(block=False)
self._cond.notify_all()

#
# Simplified Queue type -- really just a locked pipe
#
Expand Down
Loading
Loading