Skip to content Skip to sidebar Skip to footer

Python - What Is Queue.task_done() Used For?

I wrote a script that has multiple threads (created with threading.Thread) fetching URLs from a Queue using queue.get_nowait(), and then processing the HTML. I am new to multi-thre

Solution 1:

Queue.task_done is not there for the workers' benefit. It is there to support Queue.join.


If I give you a box of work assignments, do I care about when you've taken everything out of the box?

No. I care about when the work is done. Looking at an empty box doesn't tell me that. You and 5 other guys might still be working on stuff you took out of the box.

Queue.task_done lets workers say when a task is done. Someone waiting for all the work to be done with Queue.join will wait until enough task_done calls have been made, not when the queue is empty.


eigenfield points out in the comments that it seems really weird for a queue to have task_done/join methods. That's true, but it's really a naming problem. The queue module has bad name choices that make it sound like a general-purpose queue library, when it's really a thread communication library.

It'd be weird for a general-purpose queue to have task_done/join methods, but it's entirely reasonable for an inter-thread message channel to have a way to indicate that messages have been processed. If the class was called thread_communication.MessageChannel instead of queue.Queue and task_done was called message_processed, the intent would be a lot clearer.

(If you need a general-purpose queue rather than an inter-thread message channel, use collections.deque.)

Solution 2:

.task_done() is used to mark .join() that the processing is done.

💡 If you use .join() and don't call .task_done() for every processed item, your script will hang forever.


Ain't nothin' like a short example;

import logging
import queue
import threading
import time

items_queue = queue.Queue()
running = Falsedefitems_queue_worker():
    while running:
        try:
            item = items_queue.get(timeout=0.01)
            if item isNone:
                continuetry:
                process_item(item)
            finally:
                items_queue.task_done()

        except queue.Empty:
            passexcept:
            logging.exception('error while processing item')


defprocess_item(item):
    print('processing {} started...'.format(item))
    time.sleep(0.5)
    print('processing {} done'.format(item))


if __name__ == '__main__':
    running = True# Create 10 items_queue_worker threads
    worker_threads = 10for _ inrange(worker_threads):
        threading.Thread(target=items_queue_worker).start()

    # Populate your queue with datafor i inrange(100):
        items_queue.put(i)

    # Wait for all items to finish processing
    items_queue.join()

    running = False

Solution 3:

"Read the source, Luke!" -- Obi-one Codobi

The source for ayncio.queue is pretty short.

  • the number of unfinished tasks goes up by one when you put to the queue.
  • it goes down by one with you call task_done
  • join() awaits there being no unfinished tasks.

This makes join useful if and only if you are calling task_done(). Using the classic bank analogy:

  • people come in the doors and get in line; door is a producer doing a q.put()
  • when a teller is idle and a person is in line, they go to the teller window. teller does a q.get().
  • When the teller has finished helping the person, they are ready for the next one. teller does a q.task_done()
  • at 5 p.m., the doors are locked door task finishes
  • you wait until both the line is empty and each teller has finished helping the person in front of them. await q.join(tellers)
  • then you send the tellers home, who are now all idling with an empty queue. for teller in tellers: teller.cancel()

Without the task_done(), you cannot know every teller is done with people. You cannot send a teller home while they have a person at his or her window.

Solution 4:

Could someone provide me with a code example (ideally using urllib, file I/O, or something other than fibonacci numbers and printing "Hello") that shows me how this function would be used in practical applications?

@user2357112's answer nicely explains the purpose of task_done, but lacks the requested example. Here is a function that calculates checksums of an arbitrary number of files and returns a dict mapping each file name to the corresponding checksum. Internal to the function, the work is divided among a several threads.

The function uses of Queue.join to wait until the workers have finished their assigned tasks, so it is safe to return the dictionary to the caller. It is a convenient way to wait for all files being processed, as opposed to them being merely dequeued.

import threading, queue, hashlib

def_work(q, checksums):
    whileTrue:
        filename = q.get()
        if filename isNone:
            q.put(None)
            breaktry:
            sha = hashlib.sha256()
            withopen(filename, 'rb') as f:
                for chunk initer(lambda: f.read(65536), b''):
                    sha.update(chunk)
            checksums[filename] = sha.digest()
        finally:
            q.task_done()

defcalc_checksums(files):
    q = queue.Queue()
    checksums = {}
    for i inrange(1):
        threading.Thread(target=_work, args=(q, checksums)).start()
    for f in files:
        q.put(f)
    q.join()
    q.put(None)  # tell workers to exitreturn checksums

A note on the GIL: since the code in hashlib internally releases the GIL while calculating the checksum, using multiple threads yields a measurable (1.75x-2x depending on Python version) speedup compared to the single-threaded variant.

Post a Comment for "Python - What Is Queue.task_done() Used For?"