StatsManager: implemented queuing mechanism to fix thread contention issues. Code refactoring done.

This commit is contained in:
Shreyas Zare
2020-05-23 17:05:30 +05:30
parent 7a1e2a4396
commit ad25dee604

View File

@@ -54,6 +54,9 @@ namespace DnsServerCore.Dns
const int MAINTENANCE_TIMER_INITIAL_INTERVAL = 60000;
const int MAINTENANCE_TIMER_INTERVAL = 60000;
readonly ConcurrentQueue<StatsQueueItem> _queue = new ConcurrentQueue<StatsQueueItem>();
readonly Thread _consumerThread;
#endregion
#region constructor
@@ -80,8 +83,33 @@ namespace DnsServerCore.Dns
{
_log.Write(ex);
}
}, null, MAINTENANCE_TIMER_INITIAL_INTERVAL, MAINTENANCE_TIMER_INTERVAL);
//stats consumer thread
_consumerThread = new Thread(delegate ()
{
try
{
while (!_disposed)
{
while (_queue.TryDequeue(out StatsQueueItem item))
{
StatCounter statCounter = _lastHourStatCounters[item._dateTime.Minute];
if (statCounter != null)
statCounter.Update(item._query, item._responseType, item._responseTag, item._clientIpAddress);
}
Thread.Sleep(100);
}
}
catch (Exception ex)
{
_log.Write(ex);
}
});
_consumerThread.IsBackground = true;
_consumerThread.Start();
}
#endregion
@@ -340,14 +368,6 @@ namespace DnsServerCore.Dns
}
}
private void Update(DnsQuestionRecord query, StatsResponseType responseType, object responseTag, IPAddress clientIpAddress)
{
StatCounter statCounter = _lastHourStatCounters[DateTime.UtcNow.Minute];
if (statCounter != null)
statCounter.Update(query, responseType, responseTag, clientIpAddress);
}
#endregion
#region public
@@ -356,7 +376,7 @@ namespace DnsServerCore.Dns
{
StatsResponseType responseType;
switch (response.Header.RCODE)
switch (response.RCODE)
{
case DnsResponseCode.NoError:
responseType = StatsResponseType.NoError;
@@ -378,10 +398,14 @@ namespace DnsServerCore.Dns
return;
}
if (response.Header.QDCOUNT > 0)
Update(response.Question[0], responseType, response.Tag, clientIpAddress);
StatsQueueItem item;
if (response.QDCOUNT > 0)
item = new StatsQueueItem(response.Question[0], responseType, response.Tag as string, clientIpAddress);
else
Update(new DnsQuestionRecord("", DnsResourceRecordType.ANY, DnsClass.IN), responseType, response.Tag, clientIpAddress);
item = new StatsQueueItem(new DnsQuestionRecord("", DnsResourceRecordType.ANY, DnsClass.IN), responseType, response.Tag as string, clientIpAddress);
_queue.Enqueue(item);
}
public Dictionary<string, List<KeyValuePair<string, int>>> GetLastHourStats()
@@ -965,11 +989,11 @@ namespace DnsServerCore.Dns
int _totalCacheHit;
int _totalBlocked;
readonly ConcurrentDictionary<string, Counter> _queryDomains = new ConcurrentDictionary<string, Counter>(100, 100);
readonly ConcurrentDictionary<string, Counter> _queryBlockedDomains = new ConcurrentDictionary<string, Counter>(100, 100);
readonly ConcurrentDictionary<DnsResourceRecordType, Counter> _queryTypes = new ConcurrentDictionary<DnsResourceRecordType, Counter>(100, 10);
readonly ConcurrentDictionary<IPAddress, Counter> _clientIpAddresses = new ConcurrentDictionary<IPAddress, Counter>(100, 100);
readonly ConcurrentDictionary<DnsQuestionRecord, Counter> _queries = new ConcurrentDictionary<DnsQuestionRecord, Counter>(100, 100);
readonly ConcurrentDictionary<string, Counter> _queryDomains = new ConcurrentDictionary<string, Counter>();
readonly ConcurrentDictionary<string, Counter> _queryBlockedDomains = new ConcurrentDictionary<string, Counter>();
readonly ConcurrentDictionary<DnsResourceRecordType, Counter> _queryTypes = new ConcurrentDictionary<DnsResourceRecordType, Counter>();
readonly ConcurrentDictionary<IPAddress, Counter> _clientIpAddresses = new ConcurrentDictionary<IPAddress, Counter>();
readonly ConcurrentDictionary<DnsQuestionRecord, Counter> _queries = new ConcurrentDictionary<DnsQuestionRecord, Counter>();
#endregion
@@ -1046,7 +1070,7 @@ namespace DnsServerCore.Dns
#region private
private List<KeyValuePair<string, int>> GetTopList(List<KeyValuePair<string, int>> list, int limit)
private static List<KeyValuePair<string, int>> GetTopList(List<KeyValuePair<string, int>> list, int limit)
{
list.Sort(delegate (KeyValuePair<string, int> item1, KeyValuePair<string, int> item2)
{
@@ -1059,6 +1083,11 @@ namespace DnsServerCore.Dns
return list;
}
private static Counter GetNewCounter<T>(T key)
{
return new Counter();
}
#endregion
#region public
@@ -1068,7 +1097,7 @@ namespace DnsServerCore.Dns
_locked = true;
}
public void Update(DnsQuestionRecord query, StatsResponseType responseType, object responseTag, IPAddress clientIpAddress)
public void Update(DnsQuestionRecord query, StatsResponseType responseType, string responseTag, IPAddress clientIpAddress)
{
if (_locked)
return;
@@ -1076,6 +1105,8 @@ namespace DnsServerCore.Dns
if (clientIpAddress.IsIPv4MappedToIPv6)
clientIpAddress = clientIpAddress.MapToIPv4();
string domain = query.Name.ToLower();
Interlocked.Increment(ref _totalQueries);
switch (responseType)
@@ -1083,8 +1114,8 @@ namespace DnsServerCore.Dns
case StatsResponseType.NoError:
if (!"blocked".Equals(responseTag)) //skip blocked domains
{
_queryDomains.GetOrAdd(query.Name.ToLower(), new Counter()).Increment();
_queries.GetOrAdd(query, new Counter()).Increment();
_queryDomains.GetOrAdd(domain, GetNewCounter).Increment();
_queries.GetOrAdd(query, GetNewCounter).Increment();
}
Interlocked.Increment(ref _totalNoError);
@@ -1118,13 +1149,13 @@ namespace DnsServerCore.Dns
break;
case "blocked":
_queryBlockedDomains.GetOrAdd(query.Name.ToLower(), new Counter()).Increment();
_queryBlockedDomains.GetOrAdd(domain, GetNewCounter).Increment();
Interlocked.Increment(ref _totalBlocked);
break;
}
_queryTypes.GetOrAdd(query.Type, new Counter()).Increment();
_clientIpAddresses.GetOrAdd(clientIpAddress, new Counter()).Increment();
_queryTypes.GetOrAdd(query.Type, GetNewCounter).Increment();
_clientIpAddresses.GetOrAdd(clientIpAddress, GetNewCounter).Increment();
}
public void Merge(StatCounter statCounter)
@@ -1144,19 +1175,19 @@ namespace DnsServerCore.Dns
_totalBlocked += statCounter._totalBlocked;
foreach (KeyValuePair<string, Counter> queryDomain in statCounter._queryDomains)
_queryDomains.GetOrAdd(queryDomain.Key, new Counter()).Merge(queryDomain.Value);
_queryDomains.GetOrAdd(queryDomain.Key, GetNewCounter).Merge(queryDomain.Value);
foreach (KeyValuePair<string, Counter> queryBlockedDomain in statCounter._queryBlockedDomains)
_queryBlockedDomains.GetOrAdd(queryBlockedDomain.Key, new Counter()).Merge(queryBlockedDomain.Value);
_queryBlockedDomains.GetOrAdd(queryBlockedDomain.Key, GetNewCounter).Merge(queryBlockedDomain.Value);
foreach (KeyValuePair<DnsResourceRecordType, Counter> queryType in statCounter._queryTypes)
_queryTypes.GetOrAdd(queryType.Key, new Counter()).Merge(queryType.Value);
_queryTypes.GetOrAdd(queryType.Key, GetNewCounter).Merge(queryType.Value);
foreach (KeyValuePair<IPAddress, Counter> clientIpAddress in statCounter._clientIpAddresses)
_clientIpAddresses.GetOrAdd(clientIpAddress.Key, new Counter()).Merge(clientIpAddress.Value);
_clientIpAddresses.GetOrAdd(clientIpAddress.Key, GetNewCounter).Merge(clientIpAddress.Value);
foreach (KeyValuePair<DnsQuestionRecord, Counter> query in statCounter._queries)
_queries.GetOrAdd(query.Key, new Counter()).Merge(query.Value);
_queries.GetOrAdd(query.Key, GetNewCounter).Merge(query.Value);
}
public void WriteTo(BinaryWriter bW)
@@ -1365,5 +1396,31 @@ namespace DnsServerCore.Dns
#endregion
}
}
class StatsQueueItem
{
#region variables
public readonly DateTime _dateTime;
public readonly DnsQuestionRecord _query;
public readonly StatsResponseType _responseType;
public readonly string _responseTag;
public readonly IPAddress _clientIpAddress;
#endregion
#region constructor
public StatsQueueItem(DnsQuestionRecord query, StatsResponseType responseType, string responseTag, IPAddress clientIpAddress)
{
_dateTime = DateTime.UtcNow;
_query = query;
_responseType = responseType;
_responseTag = responseTag;
_clientIpAddress = clientIpAddress;
}
#endregion
}
}
}