Thread-Safe Observable Priority Queue for WPF

Building on my last post, I realized that you couldn’t push elements onto the queue from a worker thread, making it pretty much useless. However, if we dispatch the pushes back to the UI thread, it should work, right?

Here’s (what I believe to be) a thread-safe observable priority queue with notifications for use in WPF.

class PriorityQueue<TValue, TPriority> : IEnumerable<TValue>, INotifyCollectionChanged
    where TValue : INotifyPropertyChanged
    where TPriority : IComparable
{
    private SortedDictionary<TPriority, Queue<TValue>> dict;
    private int count;
    private Dispatcher dispatcher;

    public int Count { get { return count; } }
    public bool Empty { get { return Count == 0; } }

    public PriorityQueue(Dispatcher dispatcher = null)
    {
        this.dispatcher = dispatcher ?? Dispatcher.CurrentDispatcher;
        this.count = 0;
        this.dict = new SortedDictionary<TPriority, Queue<TValue>>(new ReverseComparer());
    }

    private class ReverseComparer : IComparer<TPriority>
    {
        public int Compare(TPriority x, TPriority y) { return y.CompareTo(x); }
    }

    public virtual void Push(TValue val, TPriority pri = default(TPriority))
    {
        if (dispatcher.CheckAccess())
        {
            ++count;
            if (!dict.ContainsKey(pri)) dict[pri] = new Queue<TValue>();
            dict[pri].Enqueue(val);
            OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, val));
        }
        else
        {
            dispatcher.Invoke(new Action<TValue, TPriority>(Push), DispatcherPriority.Send, val, pri);
        }
    }

    public virtual TValue Peek()
    {
        return dict.First().Value.Peek();
    }

    public virtual TValue Pop()
    {
        if (dispatcher.CheckAccess())
        {
            --count;
            var pair = dict.First();
            var queue = pair.Value;
            var val = queue.Dequeue();
            if (queue.Count == 0) dict.Remove(pair.Key);
            OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Remove, val));
            return val;
        }
        else
        {
            return (TValue)dispatcher.Invoke(new Func<TValue>(Pop), DispatcherPriority.Send);
        }
    }

    public event NotifyCollectionChangedEventHandler CollectionChanged;

    public virtual void OnCollectionChanged(NotifyCollectionChangedEventArgs e)
    {
        if (CollectionChanged != null)
        {
            CollectionChanged(this, e);
        }
    }

    public IEnumerator<TValue> GetEnumerator()
    {
        foreach (var queue in dict.Values)
        {
            foreach (var value in queue)
            {
                yield return value;
            }
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

Read this blog post to find out how this works.

Observable Priority Queue

Just started playing around with WPF in VS 2010. They have this ObservableCollection class which you can bind to your DataGrid or ListControl and then when you add or remove items from it, the control is refreshed automatically. However, I wanted to use my PriorityQueue class that I posted about earlier, so I modified it a bit:

class PriorityQueue<TValue> : PriorityQueue<TValue, int> where TValue : INotifyPropertyChanged { }
class PriorityQueue<TValue, TPriority> : IEnumerable<TValue>, INotifyCollectionChanged
    where TValue : INotifyPropertyChanged
    where TPriority : IComparable
{
    private SortedDictionary<TPriority, Queue<TValue>> dict = new SortedDictionary<TPriority, Queue<TValue>>(new ReverseComparer());
    private int count = 0;

    public int Count { get { return count; } }
    public bool Empty { get { return Count == 0; } }

    private class ReverseComparer : IComparer<TPriority>
    {
        public int Compare(TPriority x, TPriority y) { return y.CompareTo(x); }
    }

    public void Enqueue(TValue val, TPriority pri = default(TPriority))
    {
        ++count;
        if (!dict.ContainsKey(pri)) dict[pri] = new Queue<TValue>();
        dict[pri].Enqueue(val);
        OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, val));
    }

    public TValue Dequeue()
    {
        --count;
        var pair = dict.First();
        var queue = pair.Value;
        var val = queue.Dequeue();
        if (queue.Count == 0) dict.Remove(pair.Key);
        OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Remove, val));
        return val;
    }

    public event NotifyCollectionChangedEventHandler CollectionChanged;

    public virtual void OnCollectionChanged(NotifyCollectionChangedEventArgs e)
    {
        if (CollectionChanged != null)
        {
            CollectionChanged(this, e);
        }
    }

    public IEnumerator<TValue> GetEnumerator()
    {
        foreach (var queue in dict.Values)
        {
            foreach (var value in queue)
            {
                yield return value;
            }
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

Priorities are sorted in descending order (higher value = higher priority).

Also discovered that I could just use “yield return” instead of having to write a custom Enumerator class too! Very nice. Especially since I wouldn’t have known how to write it :)