[TestFixture]public class QueueRunnerTest{ private int _count = 0; [Test] public void Test100ExpectSuccess() { Queue<int> queue = new Queue<int>(); for (int i = 0; i < 100; i++) queue.Enqueue(i); _count = 0; QueueRunner<int> runner = new QueueRunner<int>(Work, queue); runner.RunAndWait(); Assert.That(_count, Is.EqualTo(100)); } private void Work(object obj) { Console.WriteLine("Working " + obj); Interlocked.Increment(ref _count); }}class QueueRunner<T>{ private readonly object _syncLock = new object(); private readonly Queue<T> _queue = new Queue<T>(); private int _running = 0; private readonly int _maxThreads; private readonly ParameterizedThreadStart _workerDelegate; private int _toBeCompleted = 0; public QueueRunner(ParameterizedThreadStart workerDelegate, Queue<T> queue) : this(workerDelegate, queue, 5) { } public QueueRunner(ParameterizedThreadStart workerDelegate, Queue<T> queue, int maxThreads) { _workerDelegate = workerDelegate; _maxThreads = maxThreads; _queue = queue; _toBeCompleted = _queue.Count; } public void RunAndWait() { new Thread(Start).Start(); Wait(); } private void Start() { lock (_syncLock) while (_queue.Count > 0) { while (_running < _maxThreads) { new Thread(DelegateWrapper).Start(_queue.Dequeue()); Interlocked.Increment(ref _running); } Monitor.Wait(_syncLock); } } private void DelegateWrapper(object state) { _workerDelegate(state); DecrementCount(); } private void Wait() { lock (_syncLock) while (_toBeCompleted > 0) Monitor.Wait(_syncLock); } private void DecrementCount() { Interlocked.Decrement(ref _running); Interlocked.Decrement(ref _toBeCompleted); lock (_syncLock) Monitor.PulseAll(_syncLock); }}
I'm Dusty Candland a software developer in Colorado.
Email