Gal Segal's Blog

Thoughts of a programmer with a soul

Oct 24

Thread Safe High Performance Capped Queue

Tags:

Thread Safe High Performance Capped Queue

UPDATE – 27/2/2012

I updated the code again to reflect Otto’s comment. The main concern was that 2 threads will enter simultaneously to the “publishing” code. The need for an atomic check if the the queue is now publishing can be done using Interlocked. Refer to Otto’s comments for context. Thanks again, Otto!

UPDATE – 26/2/2012

I updated the code to be a little more safe – there was a problem with a private member “_onPublishExecuted” being called twice(see in comments). Thanks Otto!
Lately I was busy creating a new infrastructure for our logging system. I work on OpenBook - a website that gets a lot of traffic and involves with many different business processes. I started logging errors, info data and more with the excellent log4net to files, but has the logs got bigger and longer it became almost impossible  to track it. We also use several IIS servers behind a load – balancer, and the result is a lot of logs on many different servers.

I wanted to create an infrastructure that will be able to have the lowest footprint on the production server, but still be able to log a lot of message to some storage (SQL server, NoSQL server, files..).

The main idea is this:

  1. Create a singletone/static queue that will collect all messages
  2. Once it has X messages or after X seconds – it flushes all messages to another data store.

Since we are dealing with high performance systems, I couldn’t afford any locks in this mechanism: I saw that the “flush” action locks the queue and all threads that try to insert new messages are frozen. I tried using the “ReaderWriterLockSlim” (see my post here) but it didn’t match – there are far more writes than reads in this process.

I headed over to the new ConcurrentQueue<T> object. This object implements almost a thread-safe lock-free mechanism. It was perfect for me. All I had to do was add the limitations for number of messages or timeout and this was pretty easy.

So this is it:

public abstract class CappedQueue<T> : ConcurrentQueue<T> where T : class
{
	protected int _capLimit;
	protected int _timeLimit;
	protected System.Timers.Timer _timer;
	protected int _onPublishExecuted;
	protected ReaderWriterLockSlim _locker;

	protected CappedQueue()
	{
		var config = (CappedQueueConfigSection)ConfigurationManager.GetSection("cappedQueue");
		Init(config.CapLimit, config.TimeLimitInSeconds);
	}

	protected CappedQueue(int capLimit, int timeLimit)
	{
		Init(capLimit, timeLimit);
	}

	public event Action<List<T>> OnPublish = delegate { };

	public virtual new void Enqueue(T item)
	{
		base.Enqueue(item);
		if (Count >= _capLimit)
		{
			Log4NetLogger.Log(eLogLevel.Debug, string.Format("Cap Limit: {0}", _capLimit));
			Publish();
		}
	}

	private void Init(int capLimit, int timeLimit)
	{
		_capLimit = capLimit;
		_timeLimit = timeLimit;
		_locker = new ReaderWriterLockSlim();
		InitTimer();
	}

	protected virtual void InitTimer()
	{
		_timer = new System.Timers.Timer();
		_timer.AutoReset = false;
		_timer.Interval = _timeLimit * 1000;
		_timer.Elapsed += new ElapsedEventHandler((s, e) =>
		{
			Log4NetLogger.Log(eLogLevel.Debug, string.Format("Time Limit: {0}", _timeLimit));
			Publish();
		});
		_timer.Start();
	}

	protected virtual void Publish()
	{
		Task task = new Task(() =>
		{
			List<T> itemsToLog = new List<T>();
			try
			{
				if (IsPublishing())
					return;

				StartPublishing();

				Log4NetLogger.Log(eLogLevel.Debug, string.Format("Start Dequeue {0} items", Count));
				T item;
				while (TryDequeue(out item))
				{
					itemsToLog.Add(item);
				}
			}
			catch (ThreadAbortException tex)
			{
				Log4NetLogger.Log(eLogLevel.Error, "Dequeue items failed", tex);
			}
			catch (Exception ex)
			{
				Log4NetLogger.Log(eLogLevel.Error, "Dequeue items failed", ex);
			}
			finally
			{
				Log4NetLogger.Log(eLogLevel.Debug, string.Format("Dequeued {0} items", itemsToLog.Count));
				OnPublish(itemsToLog);
				CompletePublishing();
			}
		});
		task.Start();
	}

	private bool IsPublishing()
	{
		return (Interlocked.CompareExchange(ref _onPublishExecuted, 1, 0) > 0);
	}

	private void StartPublishing()
	{
		_timer.Stop();
	}

	private void CompletePublishing()
	{
		_timer.Start();
		Interlocked.Decrement(ref _onPublishExecuted);
	}
}

The CappedQueue gets 2 parametes in its constructor – one for the amount of objects and one for the interval (in seconds).
It also expose a delegate of type Action that is fired asynchronously when one of the limits had been reached.

2 other things to notice:

  1. When the “Publish”  is fired, it runs a task asynchronously to avoid the locking.
  2. All message in the queue are transferred to a new collection for further actions, and the main queue is free again.

Happy coding :)

Back to top
  • Pingback: Cheatsheet: 2012 01.20 ~ 01.31 - gOODiDEA.NET

  • Otto Gebb

    What if two threads pass line 51 simultaneously?

    • http://gal-segal.com Gal Segal

      You are right. I updated the code to support read/write on the private member “_onPublishExecuted” wrapped with an extension method you can find in the previous post. Thanks!

      • Otto Gebb

        I’m not sure that solves the problem I was trying to point out. Two threads may still simultaneously enter the main publishing procedure (when they pass line 70 at the same time). In that case the order of published records is not guaranteed to be the same in which they were enqueued. I suggest using Interlocked here. It decreases readability, though, so you will probably want to add comments to those lines of code.
        1. Change the type of _onPublishExecuted from bool to int because Interlocked doesn’t work with bool.
        2. In the beginning of the “try” block (so that a ThreadAbortException occurring right after the exchange doesn’t leave the class in an inconsistent state):
        if (Interlocked.CompareExchange(ref _onPublishExecuted, 1, 0) > 0)
        {
        return;
        }
        3. In the finally block:
        Interlocked.Decrement(ref _onPublishExecuted);

        • http://gal-segal.com Gal Segal

          First, thanks for your thoughts. I changed the code as you suggested.

          • Otto Gebb

            Sorry to bother you again, but now there seems to be a bug in your code: an excessive Interlocked.Add. I suggested something slightly different. If you replace line 63 with _timer.Stop(); and remove lines 95-99 inclusive, the code will correspond to what I was thinking of. You don’t need the Add, because you check the flag indicating that the operation is being executed and, if not, set the flag atomically, in one line (92). Interlocked.CompareExchange does the job for you: both checking and setting the flag.

          • http://gal-segal.com Gal Segal

            Done. Again, many thanks.

          • Otto Gebb

            Now it looks correct to me. Thanks for sharing the code!