pythonadvanced45 minutes

Build a Concurrent Task Scheduler with Priority Queues

Create a Python mini-project that simulates a concurrent task scheduler managing tasks with different priorities. The scheduler should execute tasks in priority order and support concurrent execution with a fixed number of worker threads.

Challenge prompt

Design and implement a task scheduler in Python that handles multiple tasks assigned with different priority levels. The scheduler must execute tasks in order of highest priority first and support concurrent execution using a fixed pool of worker threads. Tasks are represented as functions with variable execution times. Your scheduler should allow adding new tasks dynamically and should output the start and finish times of each task. Ensure thread-safe access to the internal data and proper shutdown of workers once all tasks finish.

Guidance

  • Use Python's 'queue.PriorityQueue' to manage tasks based on priority.
  • Utilize the 'threading' module to create a pool of worker threads.
  • Ensure thread-safe operations when adding and removing tasks from the queue.
  • Implement graceful shutdown for worker threads once all tasks are completed.

Hints

  • Represent tasks as tuples (priority, task_id, task_function) to ensure reliable priority sorting.
  • Use threading.Event or a sentinel value to signal workers to exit after processing all tasks.
  • Consider using thread-safe print or logging to output start and finish times in real-time.

Starter code

import threading
import queue
import time

def example_task(duration, task_name):
    print(f"Starting {task_name}")
    time.sleep(duration)
    print(f"Finished {task_name}")

class TaskScheduler:
    def __init__(self, num_workers):
        self.task_queue = queue.PriorityQueue()
        self.workers = []
        self.num_workers = num_workers

    def add_task(self, priority, task_id, task_func):
        # Add a task to the priority queue
        pass

    def worker(self):
        while True:
            # Get the next task and execute
            pass

    def run(self):
        # Start worker threads
        pass

    def wait_completion(self):
        # Wait for all tasks to complete and shutdown workers
        pass

# Example usage:
scheduler = TaskScheduler(num_workers=3)
scheduler.add_task(2, 'task1', lambda: example_task(2, 'task1'))
scheduler.add_task(1, 'task2', lambda: example_task(1, 'task2'))
scheduler.add_task(3, 'task3', lambda: example_task(3, 'task3'))
scheduler.run()
scheduler.wait_completion()

Expected output

Starting task2 Finished task2 Starting task1 Finished task1 Starting task3 Finished task3

Core concepts

threadingpriority queueconcurrencythread-safe programming

Challenge a Friend

Send this duel to someone else and see if they can solve it.