Building on my last post, I realized that you couldn’t push elements onto the queue from a worker thread, making it pretty much useless. However, if we dispatch the pushes back to the UI thread, it should work, right?
Here’s (what I believe to be) a thread-safe observable priority queue with notifications for use in WPF.
class PriorityQueue<TValue, TPriority> : IEnumerable<TValue>, INotifyCollectionChanged
where TValue : INotifyPropertyChanged
where TPriority : IComparable
{
private SortedDictionary<TPriority, Queue<TValue>> dict;
private int count;
private Dispatcher dispatcher;
public int Count { get { return count; } }
public bool Empty { get { return Count == 0; } }
public PriorityQueue(Dispatcher dispatcher = null)
{
this.dispatcher = dispatcher ?? Dispatcher.CurrentDispatcher;
this.count = 0;
this.dict = new SortedDictionary<TPriority, Queue<TValue>>(new ReverseComparer());
}
private class ReverseComparer : IComparer<TPriority>
{
public int Compare(TPriority x, TPriority y) { return y.CompareTo(x); }
}
public virtual void Push(TValue val, TPriority pri = default(TPriority))
{
if (dispatcher.CheckAccess())
{
++count;
if (!dict.ContainsKey(pri)) dict[pri] = new Queue<TValue>();
dict[pri].Enqueue(val);
OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, val));
}
else
{
dispatcher.Invoke(new Action<TValue, TPriority>(Push), DispatcherPriority.Send, val, pri);
}
}
public virtual TValue Peek()
{
return dict.First().Value.Peek();
}
public virtual TValue Pop()
{
if (dispatcher.CheckAccess())
{
--count;
var pair = dict.First();
var queue = pair.Value;
var val = queue.Dequeue();
if (queue.Count == 0) dict.Remove(pair.Key);
OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Remove, val));
return val;
}
else
{
return (TValue)dispatcher.Invoke(new Func<TValue>(Pop), DispatcherPriority.Send);
}
}
public event NotifyCollectionChangedEventHandler CollectionChanged;
public virtual void OnCollectionChanged(NotifyCollectionChangedEventArgs e)
{
if (CollectionChanged != null)
{
CollectionChanged(this, e);
}
}
public IEnumerator<TValue> GetEnumerator()
{
foreach (var queue in dict.Values)
{
foreach (var value in queue)
{
yield return value;
}
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
where TValue : INotifyPropertyChanged
where TPriority : IComparable
{
private SortedDictionary<TPriority, Queue<TValue>> dict;
private int count;
private Dispatcher dispatcher;
public int Count { get { return count; } }
public bool Empty { get { return Count == 0; } }
public PriorityQueue(Dispatcher dispatcher = null)
{
this.dispatcher = dispatcher ?? Dispatcher.CurrentDispatcher;
this.count = 0;
this.dict = new SortedDictionary<TPriority, Queue<TValue>>(new ReverseComparer());
}
private class ReverseComparer : IComparer<TPriority>
{
public int Compare(TPriority x, TPriority y) { return y.CompareTo(x); }
}
public virtual void Push(TValue val, TPriority pri = default(TPriority))
{
if (dispatcher.CheckAccess())
{
++count;
if (!dict.ContainsKey(pri)) dict[pri] = new Queue<TValue>();
dict[pri].Enqueue(val);
OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, val));
}
else
{
dispatcher.Invoke(new Action<TValue, TPriority>(Push), DispatcherPriority.Send, val, pri);
}
}
public virtual TValue Peek()
{
return dict.First().Value.Peek();
}
public virtual TValue Pop()
{
if (dispatcher.CheckAccess())
{
--count;
var pair = dict.First();
var queue = pair.Value;
var val = queue.Dequeue();
if (queue.Count == 0) dict.Remove(pair.Key);
OnCollectionChanged(new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Remove, val));
return val;
}
else
{
return (TValue)dispatcher.Invoke(new Func<TValue>(Pop), DispatcherPriority.Send);
}
}
public event NotifyCollectionChangedEventHandler CollectionChanged;
public virtual void OnCollectionChanged(NotifyCollectionChangedEventArgs e)
{
if (CollectionChanged != null)
{
CollectionChanged(this, e);
}
}
public IEnumerator<TValue> GetEnumerator()
{
foreach (var queue in dict.Values)
{
foreach (var value in queue)
{
yield return value;
}
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
Read this blog post to find out how this works.
6 thoughts on “Thread-Safe Observable Priority Queue for WPF”
could u teach me this?? looks cool 🙂
Stephanie
stephanie? is that your internet name now p cat? this isn’t exactly beginner material…we’d better start you off somewhere a little more basic.
Mark
p cat strikes again!
Stephanie
Clear method missing?
Anathol
@Anathol: I guess I didn’t need one when I was building this. It should be much like the Pop() method though, except you can just do `dict.Clear()`, and the event should be `NotifyCollectionChangedAction.Reset` I think.
Mark
Should the add and delete use a lock?
I think there is a potential race condition.
Paul