Thread-Safe Observable Priority Queue for WPF

Building on my last post, I realized that you couldn’t push elements onto the queue from a worker thread, making it pretty much useless. However, if we dispatch the pushes back to the UI thread, it should work, right?

Here’s (what I believe to be) a thread-safe observable priority queue with notifications for use in WPF.

class PriorityQueue<TValue, TPriority> : IEnumerable<TValue>, INotifyCollectionChanged
    where TValue : INotifyPropertyChanged
    where TPriority : IComparable
{
    private SortedDictionary<TPriority, Queue<TValue>> dict;
    private int count;
    private Dispatcher dispatcher;

    public int Count { get { return count; } }
    public bool Empty { get { return Count == 0; } }

    public PriorityQueue(Dispatcher dispatcher = null)
    {
        this.dispatcher = dispatcher ?? Dispatcher.CurrentDispatcher;
        this.count = 0;
        this.dict = new SortedDictionary<TPriority, Queue<TValue>>(new ReverseComparer());
    }

    private class ReverseComparer : IComparer<TPriority>
    {
        public int Compare(TPriority x, TPriority y) { return y.CompareTo(x); }
    }

    public virtual void Push(TValue val, TPriority pri = default(TPriority))
    {
        if (dispatcher.CheckAccess())
        {
            ++count;
            if (!dict.ContainsKey(pri)) dict[pri] = new Queue<TValue>();
            dict[pri].Enqueue(val);
            OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, val));
        }
        else
        {
            dispatcher.Invoke(new Action<TValue, TPriority>(Push), DispatcherPriority.Send, val, pri);
        }
    }

    public virtual TValue Peek()
    {
        return dict.First().Value.Peek();
    }

    public virtual TValue Pop()
    {
        if (dispatcher.CheckAccess())
        {
            --count;
            var pair = dict.First();
            var queue = pair.Value;
            var val = queue.Dequeue();
            if (queue.Count == 0) dict.Remove(pair.Key);
            OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Remove, val));
            return val;
        }
        else
        {
            return (TValue)dispatcher.Invoke(new Func<TValue>(Pop), DispatcherPriority.Send);
        }
    }

    public event NotifyCollectionChangedEventHandler CollectionChanged;

    public virtual void OnCollectionChanged(NotifyCollectionChangedEventArgs e)
    {
        if (CollectionChanged != null)
        {
            CollectionChanged(this, e);
        }
    }

    public IEnumerator<TValue> GetEnumerator()
    {
        foreach (var queue in dict.Values)
        {
            foreach (var value in queue)
            {
                yield return value;
            }
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

Read this blog post to find out how this works.

6 thoughts on “Thread-Safe Observable Priority Queue for WPF

  1. stephanie? is that your internet name now p cat? this isn’t exactly beginner material…we’d better start you off somewhere a little more basic.

  2. @Anathol: I guess I didn’t need one when I was building this. It should be much like the Pop() method though, except you can just do `dict.Clear()`, and the event should be `NotifyCollectionChangedAction.Reset` I think.

  3. Should the add and delete use a lock?
    I think there is a potential race condition.

Leave a Reply

Your email address will not be published. Required fields are marked *