Build a Concurrent Task Scheduler with Priority Queues
Create a Python mini-project that simulates a concurrent task scheduler managing tasks with different priorities. The scheduler should execute tasks in priority order and support concurrent execution with a fixed number of worker threads.
Challenge prompt
Design and implement a task scheduler in Python that handles multiple tasks assigned with different priority levels. The scheduler must execute tasks in order of highest priority first and support concurrent execution using a fixed pool of worker threads. Tasks are represented as functions with variable execution times. Your scheduler should allow adding new tasks dynamically and should output the start and finish times of each task. Ensure thread-safe access to the internal data and proper shutdown of workers once all tasks finish.
Guidance
- • Use Python's 'queue.PriorityQueue' to manage tasks based on priority.
- • Utilize the 'threading' module to create a pool of worker threads.
- • Ensure thread-safe operations when adding and removing tasks from the queue.
- • Implement graceful shutdown for worker threads once all tasks are completed.
Hints
- • Represent tasks as tuples (priority, task_id, task_function) to ensure reliable priority sorting.
- • Use threading.Event or a sentinel value to signal workers to exit after processing all tasks.
- • Consider using thread-safe print or logging to output start and finish times in real-time.
Starter code
import threading
import queue
import time
def example_task(duration, task_name):
print(f"Starting {task_name}")
time.sleep(duration)
print(f"Finished {task_name}")
class TaskScheduler:
def __init__(self, num_workers):
self.task_queue = queue.PriorityQueue()
self.workers = []
self.num_workers = num_workers
def add_task(self, priority, task_id, task_func):
# Add a task to the priority queue
pass
def worker(self):
while True:
# Get the next task and execute
pass
def run(self):
# Start worker threads
pass
def wait_completion(self):
# Wait for all tasks to complete and shutdown workers
pass
# Example usage:
scheduler = TaskScheduler(num_workers=3)
scheduler.add_task(2, 'task1', lambda: example_task(2, 'task1'))
scheduler.add_task(1, 'task2', lambda: example_task(1, 'task2'))
scheduler.add_task(3, 'task3', lambda: example_task(3, 'task3'))
scheduler.run()
scheduler.wait_completion()Expected output
Starting task2 Finished task2 Starting task1 Finished task1 Starting task3 Finished task3
Core concepts
Challenge a Friend
Send this duel to someone else and see if they can solve it.