Skip to content Skip to sidebar Skip to footer

Data Type For A "closable" Queue To Handle A Stream Of Items For Multiple Producers And Consumers

Is there a specific type of Queue that is 'closable', and is suitable for when there are multiple producers, consumers, and the data comes from a stream (so its not known when it w

Solution 1:

I'd call that a self-latching queue.

For your primary requirement, combine the queue with a condition variable check that gracefully latches (shuts down) the queue when all producers have vacated:

classSelfLatchingQueue(LatchingQueue):
  ...
  def__init__(self, num_producers):
    ...

  defclose(self):
    '''Called by a producer to indicate that it is done producing'''

    ... perhaps check that current thread is a known producer? ...

    with self.a_mutex:self._num_active_producers -= 1ifself._num_active_producers <= 0:
        # Future put()s throw QueueLatched. get()s will empty the queue# and then throw QueueEmpty thereafterself.latch() # Guess what superclass implements this?

For your secondary requirement (#3 in the original post, finished producers apparently block until all consumers are finished), I'd perhaps use a barrier or just another condition variable. This could be implemented in a subclass of the SelfLatchingQueue, of course, but without knowing the codebase I'd keep this behavior separate from the automatic latching.

Post a Comment for "Data Type For A "closable" Queue To Handle A Stream Of Items For Multiple Producers And Consumers"