
UPDATE – 27/2/2012
I updated the code again to reflect Otto’s comment. The main concern was that 2 threads will enter simultaneously to the “publishing” code. The need for an atomic check if the the queue is now publishing can be done using Interlocked. Refer to Otto’s comments for context. Thanks again, Otto!
UPDATE – 26/2/2012
I updated the code to be a little more safe – there was a problem with a private member “_onPublishExecuted” being called twice(see in comments). Thanks Otto!
Lately I was busy creating a new infrastructure for our logging system. I work on OpenBook – a website that gets a lot of traffic and involves with many different business processes. I started logging errors, info data and more with the excellent log4net to files, but has the logs got bigger and longer it became almost impossible to track it. We also use several IIS servers behind a load – balancer, and the result is a lot of logs on many different servers.
I wanted to create an infrastructure that will be able to have the lowest footprint on the production server, but still be able to log a lot of message to some storage (SQL server, NoSQL server, files..).
The main idea is this:
- Create a singletone/static queue that will collect all messages
- Once it has X messages or after X seconds – it flushes all messages to another data store.
Since we are dealing with high performance systems, I couldn’t afford any locks in this mechanism: I saw that the “flush” action locks the queue and all threads that try to insert new messages are frozen. I tried using the “ReaderWriterLockSlim” (see my post here) but it didn’t match – there are far more writes than reads in this process.
I headed over to the new ConcurrentQueue<T> object. This object implements almost a thread-safe lock-free mechanism. It was perfect for me. All I had to do was add the limitations for number of messages or timeout and this was pretty easy.
So this is it:
public abstract class CappedQueue<T> : ConcurrentQueue<T> where T : class { protected int _capLimit; protected int _timeLimit; protected System.Timers.Timer _timer; protected int _onPublishExecuted; protected ReaderWriterLockSlim _locker; protected CappedQueue() { var config = (CappedQueueConfigSection)ConfigurationManager.GetSection("cappedQueue"); Init(config.CapLimit, config.TimeLimitInSeconds); } protected CappedQueue(int capLimit, int timeLimit) { Init(capLimit, timeLimit); } public event Action<List<T>> OnPublish = delegate { }; public virtual new void Enqueue(T item) { base.Enqueue(item); if (Count >= _capLimit) { Log4NetLogger.Log(eLogLevel.Debug, string.Format("Cap Limit: {0}", _capLimit)); Publish(); } } private void Init(int capLimit, int timeLimit) { _capLimit = capLimit; _timeLimit = timeLimit; _locker = new ReaderWriterLockSlim(); InitTimer(); } protected virtual void InitTimer() { _timer = new System.Timers.Timer(); _timer.AutoReset = false; _timer.Interval = _timeLimit * 1000; _timer.Elapsed += new ElapsedEventHandler((s, e) => { Log4NetLogger.Log(eLogLevel.Debug, string.Format("Time Limit: {0}", _timeLimit)); Publish(); }); _timer.Start(); } protected virtual void Publish() { Task task = new Task(() => { List<T> itemsToLog = new List<T>(); try { if (IsPublishing()) return; StartPublishing(); Log4NetLogger.Log(eLogLevel.Debug, string.Format("Start Dequeue {0} items", Count)); T item; while (TryDequeue(out item)) { itemsToLog.Add(item); } } catch (ThreadAbortException tex) { Log4NetLogger.Log(eLogLevel.Error, "Dequeue items failed", tex); } catch (Exception ex) { Log4NetLogger.Log(eLogLevel.Error, "Dequeue items failed", ex); } finally { Log4NetLogger.Log(eLogLevel.Debug, string.Format("Dequeued {0} items", itemsToLog.Count)); OnPublish(itemsToLog); CompletePublishing(); } }); task.Start(); } private bool IsPublishing() { return (Interlocked.CompareExchange(ref _onPublishExecuted, 1, 0) > 0); } private void StartPublishing() { _timer.Stop(); } private void CompletePublishing() { _timer.Start(); Interlocked.Decrement(ref _onPublishExecuted); } }
The CappedQueue gets 2 parametes in its constructor – one for the amount of objects and one for the interval (in seconds).
It also expose a delegate of type Action that is fired asynchronously when one of the limits had been reached.
2 other things to notice:
- When the “Publish” is fired, it runs a task asynchronously to avoid the locking.
- All message in the queue are transferred to a new collection for further actions, and the main queue is free again.
Happy coding :)
Pingback: Cheatsheet: 2012 01.20 ~ 01.31 - gOODiDEA.NET()