Performance improvements

Signed-off-by: Zafer Balkan <zafer@zaferbalkan.com>
This commit is contained in:
Zafer Balkan
2024-10-03 15:14:29 +03:00
parent 7425191a50
commit fb42498485
10 changed files with 309 additions and 123 deletions

View File

@@ -41,22 +41,20 @@ namespace LogExporter
private const int QUEUE_TIMER_INTERVAL = 10000;
private readonly IReadOnlyList<DnsLogEntry> _emptyList = [];
private readonly ExportManager _exportManager = new ExportManager();
private BlockingCollection<LogEntry> _logBuffer;
private readonly object _queueTimerLock = new object();
private BufferManagementConfig _config;
private BufferManagementConfig? _config;
private IDnsServer _dnsServer;
private BlockingCollection<LogEntry> _logBuffer;
private Timer _queueTimer;
private bool disposedValue;
private readonly IReadOnlyList<DnsLogEntry> _emptyList = [];
#endregion variables
#region constructor
@@ -82,10 +80,7 @@ namespace LogExporter
{
if (disposing)
{
lock (_queueTimerLock)
{
_queueTimer?.Dispose();
}
_queueTimer?.Dispose();
ExportLogsAsync().Sync(); //flush any pending logs
@@ -120,22 +115,10 @@ namespace LogExporter
}
RegisterExportTargets();
lock (_queueTimerLock)
if (_exportManager.HasStrategy())
{
_queueTimer = new Timer(async (object _) =>
{
try
{
await ExportLogsAsync();
}
catch (Exception ex)
{
_dnsServer.WriteLog(ex);
}
}, null, QUEUE_TIMER_INTERVAL, Timeout.Infinite);
_queueTimer = new Timer(HandleExportLogCallback, state: null, QUEUE_TIMER_INTERVAL, Timeout.Infinite);
}
return Task.CompletedTask;
}
@@ -146,41 +129,51 @@ namespace LogExporter
return Task.CompletedTask;
}
public async Task<DnsLogPage> QueryLogsAsync(long pageNumber, int entriesPerPage, bool descendingOrder, DateTime? start, DateTime? end, IPAddress clientIpAddress, DnsTransportProtocol? protocol, DnsServerResponseType? responseType, DnsResponseCode? rcode, string qname, DnsResourceRecordType? qtype, DnsClass? qclass)
public Task<DnsLogPage> QueryLogsAsync(long pageNumber, int entriesPerPage, bool descendingOrder, DateTime? start, DateTime? end, IPAddress clientIpAddress, DnsTransportProtocol? protocol, DnsServerResponseType? responseType, DnsResponseCode? rcode, string qname, DnsResourceRecordType? qtype, DnsClass? qclass)
{
return await Task.FromResult(new DnsLogPage(0, 0, 0, _emptyList));
return Task.FromResult(new DnsLogPage(0, 0, 0, _emptyList));
}
#endregion public
#region private
private async Task ExportLogsAsync(CancellationToken cancellationToken = default)
private async Task ExportLogsAsync()
{
var logs = new List<LogEntry>(BULK_INSERT_COUNT);
// Process logs within the timer interval, then let the timer reschedule
while (logs.Count <= BULK_INSERT_COUNT && _logBuffer.TryTake(out var log))
{
logs.Add(log);
}
// If we have any logs to process, export them
if (logs.Count > 0)
{
await _exportManager.ImplementStrategyForAsync(logs);
}
}
private async void HandleExportLogCallback(object? state)
{
try
{
var logs = new List<LogEntry>(BULK_INSERT_COUNT);
while (!cancellationToken.IsCancellationRequested)
{
while ((logs.Count < BULK_INSERT_COUNT) && _logBuffer.TryTake(out LogEntry? log))
{
if (log != null)
logs.Add(log);
}
if (logs.Count > 0)
{
await _exportManager.ImplementStrategyForAsync(logs, cancellationToken);
logs.Clear();
}
}
await ExportLogsAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
_dnsServer?.WriteLog(ex);
}
finally
{
try
{
_queueTimer.Change(QUEUE_TIMER_INTERVAL, Timeout.Infinite);
}
catch (ObjectDisposedException)
{ }
}
}
private void RegisterExportTargets()

View File

@@ -40,7 +40,7 @@ namespace LogExporter
// Load configuration from JSON
public static BufferManagementConfig? Deserialize(string json)
{
return JsonSerializer.Deserialize<BufferManagementConfig>(json);
return JsonSerializer.Deserialize<BufferManagementConfig>(json, DnsConfigSerializerOptions.Default);
}
}
@@ -79,4 +79,18 @@ namespace LogExporter
[JsonPropertyName("headers")]
public Dictionary<string, string>? Headers { get; set; }
}
// Setup reusable options with a single instance
public static class DnsConfigSerializerOptions
{
public static readonly JsonSerializerOptions Default = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // Convert properties to camelCase
Encoder = System.Text.Encodings.Web.JavaScriptEncoder.UnsafeRelaxedJsonEscaping, // For safe encoding
NumberHandling = JsonNumberHandling.Strict,
AllowTrailingCommas = true, // Allow trailing commas in JSON
DictionaryKeyPolicy = JsonNamingPolicy.CamelCase, // Convert dictionary keys to camelCase
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull // Ignore null values
};
}
}

View File

@@ -0,0 +1,132 @@
using System;
using System.Buffers;
namespace LogExporter
{
public class GrowableBuffer<T> : IBufferWriter<T>, IDisposable
{
// Gets the current length of the buffer contents
public int Length => _position;
// Initial capacity to be used in the constructor
private const int DefaultInitialCapacity = 256;
private Memory<T> _buffer;
private int _position;
private bool disposedValue;
public GrowableBuffer(int initialCapacity = DefaultInitialCapacity)
{
_buffer = new Memory<T>(ArrayPool<T>.Shared.Rent(initialCapacity));
_position = 0;
}
// IBufferWriter<T> implementation
public void Advance(int count)
{
if (count < 0 || _position + count > _buffer.Length)
throw new ArgumentOutOfRangeException(nameof(count));
_position += count;
}
// Appends a single element to the buffer
public void Append(T item)
{
EnsureCapacity(1);
_buffer.Span[_position++] = item;
}
// Appends a span of elements to the buffer
public void Append(ReadOnlySpan<T> span)
{
EnsureCapacity(span.Length);
span.CopyTo(_buffer.Span[_position..]);
_position += span.Length;
}
// Clears the buffer for reuse without reallocating
public void Clear() => _position = 0;
public Memory<T> GetMemory(int sizeHint = 0)
{
EnsureCapacity(sizeHint);
return _buffer[_position..];
}
public Span<T> GetSpan(int sizeHint = 0)
{
EnsureCapacity(sizeHint);
return _buffer.Span[_position..];
}
// Returns the buffer contents as an array
public T[] ToArray()
{
T[] result = new T[_position];
_buffer.Span[.._position].CopyTo(result);
return result;
}
// Returns the buffer contents as a ReadOnlySpan<T>
public ReadOnlySpan<T> ToSpan() => _buffer.Span[.._position];
public override string ToString() => _buffer.Span[.._position].ToString();
// Ensures the buffer has enough capacity to add more elements
private void EnsureCapacity(int additionalCapacity)
{
if (_position + additionalCapacity > _buffer.Length)
{
GrowBuffer(_position + additionalCapacity);
}
}
// Grows the buffer to accommodate the required capacity
private void GrowBuffer(int requiredCapacity)
{
int newCapacity = Math.Max(_buffer.Length * 2, requiredCapacity);
// Rent a larger buffer from the pool
T[] newArray = ArrayPool<T>.Shared.Rent(newCapacity);
Memory<T> newBuffer = new Memory<T>(newArray);
// Copy current contents to the new buffer
_buffer.Span[.._position].CopyTo(newBuffer.Span);
// Return old buffer to the pool
ArrayPool<T>.Shared.Return(_buffer.ToArray());
// Assign the new buffer
_buffer = newBuffer;
}
#region IDisposable
public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
ArrayPool<T>.Shared.Return(_buffer.ToArray());
_buffer = Memory<T>.Empty;
_position = 0;
}
}
disposedValue = true;
}
#endregion IDisposable
}
}

View File

@@ -1,11 +1,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text.Json.Serialization;
using System.Text;
using System.Text.Json;
using TechnitiumLibrary.Net.Dns;
using TechnitiumLibrary.Net.Dns.ResourceRecords;
using System.Collections.Generic;
namespace LogExporter
{
@@ -43,7 +43,7 @@ namespace LogExporter
QuestionName = questionRecord.Name,
QuestionType = questionRecord.Type,
QuestionClass = questionRecord.Class,
Size = questionRecord.UncompressedLength
Size = questionRecord.UncompressedLength,
}));
}
@@ -58,7 +58,7 @@ namespace LogExporter
RecordClass = record.Class,
RecordTtl = record.TTL,
Size = record.UncompressedLength,
DnssecStatus = record.DnssecStatus
DnssecStatus = record.DnssecStatus,
}));
}
@@ -73,11 +73,6 @@ namespace LogExporter
}
}
public override string ToString()
{
return JsonSerializer.Serialize(this, DnsLogSerializerOptions.Default);
}
public class Question
{
public string QuestionName { get; set; }
@@ -95,34 +90,74 @@ namespace LogExporter
public int Size { get; set; }
public DnssecStatus DnssecStatus { get; set; }
}
}
// Custom DateTime converter to handle UTC serialization in ISO 8601 format
public class JsonDateTimeConverter : JsonConverter<DateTime>
{
public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
public ReadOnlySpan<char> AsSpan()
{
var dts = reader.GetString();
return dts == null ? DateTime.MinValue : DateTime.Parse(dts);
// Initialize a ValueStringBuilder with some initial capacity
var buffer = new GrowableBuffer<byte>(256);
using var writer = new Utf8JsonWriter(buffer);
// Manually serialize the LogEntry as JSON
writer.WriteStartObject();
writer.WriteString("timestamp", Timestamp.ToUniversalTime().ToString("O"));
writer.WriteString("clientIp", ClientIp);
writer.WriteNumber("clientPort", ClientPort);
writer.WriteBoolean("dnssecOk", DnssecOk);
writer.WriteString("protocol", Protocol.ToString());
writer.WriteString("responseCode", ResponseCode.ToString());
// Write Questions array
writer.WriteStartArray("questions");
foreach (var question in Questions)
{
writer.WriteStartObject();
writer.WriteString("questionName", question.QuestionName);
writer.WriteString("questionType", question.QuestionType.ToString());
writer.WriteString("questionClass", question.QuestionClass.ToString());
writer.WriteNumber("size", question.Size);
writer.WriteEndObject();
}
writer.WriteEndArray();
// Write Answers array (if exists)
if (Answers != null && Answers.Count > 0)
{
writer.WriteStartArray("answers");
foreach (var answer in Answers)
{
writer.WriteStartObject();
writer.WriteString("recordType", answer.RecordType.ToString());
writer.WriteString("recordData", answer.RecordData);
writer.WriteString("recordClass", answer.RecordClass.ToString());
writer.WriteNumber("recordTtl", answer.RecordTtl);
writer.WriteNumber("size", answer.Size);
writer.WriteString("dnssecStatus", answer.DnssecStatus.ToString());
writer.WriteEndObject();
}
writer.WriteEndArray();
}
writer.WriteEndObject();
writer.Flush();
return ConvertBytesToChars(buffer.ToSpan());
}
public override void Write(Utf8JsonWriter writer, DateTime value, JsonSerializerOptions options)
public static Span<char> ConvertBytesToChars(ReadOnlySpan<byte> byteSpan)
{
writer.WriteStringValue(value.ToUniversalTime().ToString("yyyy-MM-ddTHH:mm:ss.fffZ"));
}
}
// Calculate the maximum required length for the char array
int maxCharCount = Encoding.UTF8.GetCharCount(byteSpan);
// Setup reusable options with a single instance
public static class DnsLogSerializerOptions
{
public static readonly JsonSerializerOptions Default = new JsonSerializerOptions
{
WriteIndented = false, // Newline delimited logs should not be multiline
PropertyNamingPolicy = JsonNamingPolicy.CamelCase, // Convert properties to camelCase
Converters = { new JsonStringEnumConverter(), new JsonDateTimeConverter() }, // Handle enums and DateTime conversion
Encoder = System.Text.Encodings.Web.JavaScriptEncoder.UnsafeRelaxedJsonEscaping, // For safe encoding
NumberHandling = JsonNumberHandling.Strict,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull // Ignore null values
};
}
// Allocate a char array large enough to hold the converted characters
char[] charArray = new char[maxCharCount];
// Decode the byteSpan into the char array
int actualCharCount = Encoding.UTF8.GetChars(byteSpan, charArray);
// Return a span of only the relevant portion of the char array
return new Span<char>(charArray, 0, actualCharCount);
}
};
}

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace LogExporter.Strategy
@@ -43,25 +42,24 @@ namespace LogExporter.Strategy
#region public
public IExportStrategy? GetStrategy<T>() where T : IExportStrategy
{
_exportStrategies.TryGetValue(typeof(T), out var strategy);
return strategy;
}
public async Task ImplementStrategyForAsync(List<LogEntry> logs, CancellationToken cancellationToken = default)
{
foreach (var strategy in _exportStrategies.Values)
{
await strategy.ExportLogsAsync(logs, cancellationToken);
}
}
public void AddOrReplaceStrategy(IExportStrategy strategy)
{
_exportStrategies[strategy.GetType()] = strategy;
}
public bool HasStrategy()
{
return _exportStrategies.Count > 0;
}
public async Task ImplementStrategyForAsync(List<LogEntry> logs)
{
foreach (var strategy in _exportStrategies.Values)
{
await strategy.ExportAsync(logs).ConfigureAwait(false);
}
}
#endregion public
}
}

View File

@@ -17,16 +17,18 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace LogExporter.Strategy
{
public class FileExportStrategy : IExportStrategy
public partial class FileExportStrategy : IExportStrategy
{
#region variables
private static readonly SemaphoreSlim _fileSemaphore = new SemaphoreSlim(1, 1);
@@ -35,6 +37,7 @@ namespace LogExporter.Strategy
private bool disposedValue;
#endregion variables
#region constructor
@@ -42,33 +45,46 @@ namespace LogExporter.Strategy
public FileExportStrategy(string filePath)
{
_filePath = filePath;
}
#endregion constructor
#region public
public async Task ExportLogsAsync(List<LogEntry> logs, CancellationToken cancellationToken = default)
public Task ExportAsync(List<LogEntry> logs)
{
var jsonLogs = new StringBuilder(logs.Count);
var buffer = new GrowableBuffer<char>();
foreach (var log in logs)
{
jsonLogs.AppendLine(log.ToString());
buffer.Append(log.AsSpan());
buffer.Append('\n');
}
Flush(buffer.ToSpan());
return Task.CompletedTask;
}
private void Flush(ReadOnlySpan<char> jsonLogs)
{
// Wait to enter the semaphore
await _fileSemaphore.WaitAsync(cancellationToken);
_fileSemaphore.Wait();
try
{
// Use a FileStream with exclusive access
using var fileStream = new FileStream(_filePath, FileMode.Append, FileAccess.Write, FileShare.None);
using var writer = new StreamWriter(fileStream);
await writer.WriteAsync(jsonLogs.ToString());
var fileStream = new FileStream(_filePath, FileMode.Append, FileAccess.Write, FileShare.Write);
var writer = new StreamWriter(fileStream);
writer.Write(jsonLogs);
writer.Close();
fileStream.Dispose();
}
catch (Exception ex)
{
Debug.WriteLine(ex);
}
finally
{
// Release the semaphore
_fileSemaphore.Release();
_ = _fileSemaphore.Release();
}
}
@@ -96,6 +112,7 @@ namespace LogExporter.Strategy
disposedValue = true;
}
}
#endregion IDisposable
}
}

View File

@@ -21,7 +21,6 @@ using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace LogExporter.Strategy
@@ -56,18 +55,20 @@ namespace LogExporter.Strategy
#region public
public async Task ExportLogsAsync(List<LogEntry> logs, CancellationToken cancellationToken = default)
public async Task ExportAsync(List<LogEntry> logs)
{
var jsonLogs = new StringBuilder(logs.Count);
var buffer = new GrowableBuffer<char>();
foreach (var log in logs)
{
jsonLogs.AppendLine(log.ToString());
buffer.Append(log.AsSpan());
buffer.Append('\n');
}
var content = buffer.ToString() ?? string.Empty;
var request = new HttpRequestMessage
{
RequestUri = new Uri(_endpoint),
Method = new HttpMethod(_method),
Content = new StringContent(jsonLogs.ToString(), Encoding.UTF8, "application/json")
Content = new StringContent( content, Encoding.UTF8, "application/json")
};
if (_headers != null)
@@ -78,7 +79,7 @@ namespace LogExporter.Strategy
}
}
var response = await _httpClient.SendAsync(request, cancellationToken);
var response = await _httpClient.SendAsync(request);
if (!response.IsSuccessStatusCode)
{
throw new Exception($"Failed to export logs to {_endpoint}: {response.StatusCode}");

View File

@@ -19,7 +19,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace LogExporter.Strategy
@@ -29,6 +28,6 @@ namespace LogExporter.Strategy
/// </summary>
public interface IExportStrategy: IDisposable
{
Task ExportLogsAsync(List<LogEntry> logs, CancellationToken cancellationToken = default);
Task ExportAsync(List<LogEntry> logs);
}
}

View File

@@ -23,7 +23,6 @@ using SyslogNet.Client.Transport;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace LogExporter.Strategy
@@ -79,14 +78,13 @@ namespace LogExporter.Strategy
#region public
public Task ExportLogsAsync(List<LogEntry> logs, CancellationToken cancellationToken = default)
public Task ExportAsync(List<LogEntry> logs)
{
return Task.Run(() =>
{
var messages = new List<SyslogMessage>(logs.Select(Convert));
_sender.Send(messages, _serializer);
}
, cancellationToken);
});
}
#endregion public
@@ -120,7 +118,7 @@ namespace LogExporter.Strategy
private SyslogMessage Convert(LogEntry log)
{
// Create the structured data with all key details from LogEntry
var elements = new StructuredDataElement(_sdId, new Dictionary<string, string>
var elements = new StructuredDataElement(_sdId, new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase)
{
{ "timestamp", log.Timestamp.ToUniversalTime().ToString("yyyy-MM-ddTHH:mm:ss.fffZ") },
{ "clientIp", log.ClientIp },
@@ -173,17 +171,16 @@ namespace LogExporter.Strategy
string questionSummary = log.Questions?.Count > 0
? string.Join("; ", log.Questions.Select(q =>
$"QNAME: {q.QuestionName}; QTYPE: {q.QuestionType?.ToString() ?? "unknown"}; QCLASS: {q.QuestionClass?.ToString() ?? "unknown"}"))
: "No Questions";
: string.Empty;
// Build the answer summary in the desired format
string answerSummary = log.Answers?.Count > 0
? string.Join(", ", log.Answers.Select(a => a.RecordData))
: "No Answers";
: string.Empty;
// Construct the message summary string to match the desired format
string messageSummary = $"{questionSummary}; RCODE: {log.ResponseCode}; ANSWER: [{answerSummary}]";
// Create and return the syslog message
return new SyslogMessage(
log.Timestamp,

View File

@@ -2,7 +2,7 @@
"maxLogEntries": 1000,
"file": {
"path": "/var/log/dns_logs.json",
"enabled": true
"enabled": false
},
"http": {
"endpoint": "http://example.com/logs",
@@ -10,12 +10,12 @@
"headers": {
"Authorization": "Bearer abc123"
},
"enabled": true
"enabled": false
},
"syslog": {
"address": "127.0.0.1",
"port": 514,
"protocol": "UDP",
"enabled": true
"enabled": false
}
}