转载一篇老外的消息队列服务文章
接着上一篇:http://blog.csdn.net/joyhen/article/details/38534569
原文:http://www.codeproject.com/Articles/821220/Throwing-a-Great-Block
using System;
namespace MessagingQueueing
{
using System.Collections.Concurrent;
using System.Threading;
class Program
{
static void Main(string[] args)
{
using (MyQueue queue = new MyQueue())
{
for (int msgIdx = 1; msgIdx < 21; msgIdx++)
{
queue.AddLog(new MyMessage
{
MessageId = msgIdx,
Message = string.Format("Message text # {0:#,##0}", msgIdx)
});
}
}
Console.ReadKey();
}
}
class MyQueue : IDisposable
{
private BlockingCollection<MyMessage> messageQueue;
private Thread dequeueThread;
bool stopped = true;
bool isStopping = false;
public MyQueue()
{
messageQueue = new BlockingCollection<MyMessage>(new ConcurrentQueue<MyMessage>());
dequeueThread = new Thread(new ThreadStart(DequeueMessageThread));
dequeueThread.Name = "TransactionPostThread";
dequeueThread.Start();
stopped = false;
}
private void DequeueMessageThread()
{
try
{
while (true)
{
MyMessage message = messageQueue.Take();
Console.WriteLine("Dequeueing: " + message.ToString());
if (messageQueue.IsCompleted)
{
break;
}
}
}
catch (InvalidOperationException)
{
// if invalid op it's because queue was completed
}
catch (ThreadAbortException)
{
// Thread aborted due to queue issue, ignore
}
catch (Exception)
{
throw;
}
}
public void AddLog(MyMessage message)
{
Console.WriteLine("Enqueueing: " + message.ToString());
messageQueue.Add(message);
}
/// <summary>
/// 实现IDisposable接口方法
/// </summary>
public void Dispose()
{
Dispose(false);
}
private void Dispose(bool fromDestructor)
{
isStopping = true;
int logShutdownTimeout = 30000;
Console.WriteLine("Shutting down queue. Waiting for dequeue thread completion.");
// Signal queue that we're shutting down
messageQueue.CompleteAdding();
// Wait for thread to complete before exiting
do
{
if (!dequeueThread.Join(logShutdownTimeout))
{
// Queue thread may be stuck. Check for items in queue and kill thread if empty
if (messageQueue.Count == 0)
{
System.Diagnostics.Debug.Print("Aborting thread");
dequeueThread.Abort();
break;
}
}
} while (dequeueThread.IsAlive);
Console.WriteLine("Dequeue thread complete.");
if (!fromDestructor)
{
GC.SuppressFinalize(this);
}
stopped = true;
isStopping = false;
}
~MyQueue()
{
Dispose(true);
}
}
class MyMessage
{
public int MessageId { get; set; }
public string Message { get; set; }
public override string ToString()
{
return string.Format("Message with ID {0:#,##0} and value {1}.", MessageId, Message);
}
}
}
还没有评论,来说两句吧...