QueryLogsSqlite: updated app to support in-memory db and added option to limit max records in db. Refactored code to work async.

This commit is contained in:
Shreyas Zare
2023-12-03 21:18:10 +05:30
parent bb057ab9b1
commit 0e283d0864

View File

@@ -22,11 +22,13 @@ using Microsoft.Data.Sqlite;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data.Common;
using System.IO;
using System.Net;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using TechnitiumLibrary;
using TechnitiumLibrary.Net.Dns;
using TechnitiumLibrary.Net.Dns.ResourceRecords;
@@ -40,19 +42,99 @@ namespace QueryLogsSqlite
bool _enableLogging;
int _maxLogDays;
int _maxLogRecords;
bool _useInMemoryDb;
string _connectionString;
SqliteConnection _inMemoryConnection;
readonly ConcurrentQueue<LogEntry> _queuedLogs = new ConcurrentQueue<LogEntry>();
Timer _queueTimer;
readonly Timer _queueTimer;
const int QUEUE_TIMER_INTERVAL = 10000;
const int BULK_INSERT_COUNT = 1000;
Timer _cleanupTimer;
readonly Timer _cleanupTimer;
const int CLEAN_UP_TIMER_INITIAL_INTERVAL = 5 * 1000;
const int CLEAN_UP_TIMER_PERIODIC_INTERVAL = 15 * 60 * 1000;
#endregion
#region constructor
public App()
{
_queueTimer = new Timer(async delegate (object state)
{
try
{
await BulkInsertLogsAsync();
}
catch (Exception ex)
{
_dnsServer.WriteLog(ex);
}
finally
{
try
{
_queueTimer.Change(QUEUE_TIMER_INTERVAL, Timeout.Infinite);
}
catch (ObjectDisposedException)
{ }
}
});
_cleanupTimer = new Timer(async delegate (object state)
{
try
{
await using (SqliteConnection connection = new SqliteConnection(_connectionString))
{
await connection.OpenAsync();
if (_maxLogRecords > 0)
{
await using (SqliteCommand command = connection.CreateCommand())
{
command.CommandText = "DELETE FROM dns_logs WHERE ROWID IN (SELECT ROWID FROM dns_logs ORDER BY ROWID DESC LIMIT -1 OFFSET @maxLogRecords);";
command.Parameters.AddWithValue("@maxLogRecords", _maxLogRecords);
await command.ExecuteNonQueryAsync();
}
}
if (_maxLogDays > 0)
{
await using (SqliteCommand command = connection.CreateCommand())
{
command.CommandText = "DELETE FROM dns_logs WHERE timestamp < @timestamp;";
command.Parameters.AddWithValue("@timestamp", DateTime.UtcNow.AddDays(_maxLogDays * -1));
await command.ExecuteNonQueryAsync();
}
}
}
}
catch (Exception ex)
{
_dnsServer.WriteLog(ex);
}
finally
{
try
{
_cleanupTimer.Change(CLEAN_UP_TIMER_PERIODIC_INTERVAL, Timeout.Infinite);
}
catch (ObjectDisposedException)
{ }
}
});
}
#endregion
#region IDisposable
public void Dispose()
@@ -60,18 +142,19 @@ namespace QueryLogsSqlite
_enableLogging = false; //turn off logging
if (_queueTimer is not null)
{
_queueTimer.Dispose();
_queueTimer = null;
}
if (_cleanupTimer is not null)
{
_cleanupTimer.Dispose();
_cleanupTimer = null;
BulkInsertLogsAsync().Sync(); //flush any pending logs
if (_inMemoryConnection is not null)
{
_inMemoryConnection.Dispose();
_inMemoryConnection = null;
}
BulkInsertLogs(); //flush any pending logs
SqliteConnection.ClearAllPools(); //close db file
}
@@ -79,7 +162,7 @@ namespace QueryLogsSqlite
#region private
private void BulkInsertLogs()
private async Task BulkInsertLogsAsync()
{
try
{
@@ -95,13 +178,13 @@ namespace QueryLogsSqlite
if (logs.Count < 1)
break;
using (SqliteConnection connection = new SqliteConnection(_connectionString))
await using (SqliteConnection connection = new SqliteConnection(_connectionString))
{
connection.Open();
await connection.OpenAsync();
using (SqliteTransaction transaction = connection.BeginTransaction())
await using (DbTransaction transaction = await connection.BeginTransactionAsync())
{
using (SqliteCommand command = connection.CreateCommand())
await using (SqliteCommand command = connection.CreateCommand())
{
command.CommandText = "INSERT INTO dns_logs (timestamp, client_ip, protocol, response_type, rcode, qname, qtype, qclass, answer) VALUES (@timestamp, @client_ip, @protocol, @response_type, @rcode, @qname, @qtype, @qclass, @answer);";
@@ -166,10 +249,10 @@ namespace QueryLogsSqlite
paramAnswer.Value = answer;
}
command.ExecuteNonQuery();
await command.ExecuteNonQueryAsync();
}
transaction.Commit();
await transaction.CommitAsync();
}
}
}
@@ -188,29 +271,57 @@ namespace QueryLogsSqlite
#region public
public Task InitializeAsync(IDnsServer dnsServer, string config)
public async Task InitializeAsync(IDnsServer dnsServer, string config)
{
_dnsServer = dnsServer;
using JsonDocument jsonDocument = JsonDocument.Parse(config);
JsonElement jsonConfig = jsonDocument.RootElement;
_enableLogging = jsonConfig.GetProperty("enableLogging").GetBoolean();
_maxLogDays = jsonConfig.GetProperty("maxLogDays").GetInt32();
_enableLogging = jsonConfig.GetPropertyValue("enableLogging", true);
_maxLogDays = jsonConfig.GetPropertyValue("maxLogDays", 0);
_maxLogRecords = jsonConfig.GetPropertyValue("maxLogRecords", 0);
_useInMemoryDb = jsonConfig.GetPropertyValue("useInMemoryDb", false);
string sqliteDbPath = jsonConfig.GetProperty("sqliteDbPath").GetString();
string connectionString = jsonConfig.GetProperty("connectionString").GetString();
if (!Path.IsPathRooted(sqliteDbPath))
sqliteDbPath = Path.Combine(_dnsServer.ApplicationFolder, sqliteDbPath);
_connectionString = connectionString.Replace("{sqliteDbPath}", sqliteDbPath);
using (SqliteConnection connection = new SqliteConnection(_connectionString))
if (_useInMemoryDb)
{
connection.Open();
if (_inMemoryConnection is null)
{
SqliteConnection.ClearAllPools(); //close db file, if any
using (SqliteCommand command = connection.CreateCommand())
_connectionString = "Data Source=QueryLogs;Mode=Memory;Cache=Shared";
_inMemoryConnection = new SqliteConnection(_connectionString);
await _inMemoryConnection.OpenAsync();
}
}
else
{
if (_inMemoryConnection is not null)
{
await _inMemoryConnection.DisposeAsync();
_inMemoryConnection = null;
}
string sqliteDbPath = jsonConfig.GetPropertyValue("sqliteDbPath", "querylogs.db");
string connectionString = jsonConfig.GetPropertyValue("connectionString", "Data Source='{sqliteDbPath}'; Cache=Shared;");
if (!Path.IsPathRooted(sqliteDbPath))
sqliteDbPath = Path.Combine(_dnsServer.ApplicationFolder, sqliteDbPath);
connectionString = connectionString.Replace("{sqliteDbPath}", sqliteDbPath);
if ((_connectionString is not null) && !_connectionString.Equals(connectionString))
SqliteConnection.ClearAllPools(); //close previous db file
_connectionString = connectionString;
}
await using (SqliteConnection connection = new SqliteConnection(_connectionString))
{
await connection.OpenAsync();
await using (SqliteCommand command = connection.CreateCommand())
{
command.CommandText = @"
CREATE TABLE IF NOT EXISTS dns_logs
@@ -227,161 +338,104 @@ CREATE TABLE IF NOT EXISTS dns_logs
answer TEXT
);
";
command.ExecuteNonQuery();
await command.ExecuteNonQueryAsync();
}
using (SqliteCommand command = connection.CreateCommand())
await using (SqliteCommand command = connection.CreateCommand())
{
command.CommandText = "CREATE INDEX IF NOT EXISTS index_timestamp ON dns_logs (timestamp);";
command.ExecuteNonQuery();
await command.ExecuteNonQueryAsync();
}
using (SqliteCommand command = connection.CreateCommand())
await using (SqliteCommand command = connection.CreateCommand())
{
command.CommandText = "CREATE INDEX IF NOT EXISTS index_client_ip ON dns_logs (client_ip);";
command.ExecuteNonQuery();
await command.ExecuteNonQueryAsync();
}
using (SqliteCommand command = connection.CreateCommand())
await using (SqliteCommand command = connection.CreateCommand())
{
command.CommandText = "CREATE INDEX IF NOT EXISTS index_protocol ON dns_logs (protocol);";
command.ExecuteNonQuery();
await command.ExecuteNonQueryAsync();
}
using (SqliteCommand command = connection.CreateCommand())
await using (SqliteCommand command = connection.CreateCommand())
{
command.CommandText = "CREATE INDEX IF NOT EXISTS index_response_type ON dns_logs (response_type);";
command.ExecuteNonQuery();
await command.ExecuteNonQueryAsync();
}
using (SqliteCommand command = connection.CreateCommand())
await using (SqliteCommand command = connection.CreateCommand())
{
command.CommandText = "CREATE INDEX IF NOT EXISTS index_rcode ON dns_logs (rcode);";
command.ExecuteNonQuery();
await command.ExecuteNonQueryAsync();
}
using (SqliteCommand command = connection.CreateCommand())
await using (SqliteCommand command = connection.CreateCommand())
{
command.CommandText = "CREATE INDEX IF NOT EXISTS index_qname ON dns_logs (qname);";
command.ExecuteNonQuery();
await command.ExecuteNonQueryAsync();
}
using (SqliteCommand command = connection.CreateCommand())
await using (SqliteCommand command = connection.CreateCommand())
{
command.CommandText = "CREATE INDEX IF NOT EXISTS index_qtype ON dns_logs (qtype);";
command.ExecuteNonQuery();
await command.ExecuteNonQueryAsync();
}
using (SqliteCommand command = connection.CreateCommand())
await using (SqliteCommand command = connection.CreateCommand())
{
command.CommandText = "CREATE INDEX IF NOT EXISTS index_qclass ON dns_logs (qclass);";
command.ExecuteNonQuery();
await command.ExecuteNonQueryAsync();
}
using (SqliteCommand command = connection.CreateCommand())
await using (SqliteCommand command = connection.CreateCommand())
{
command.CommandText = "CREATE INDEX IF NOT EXISTS index_timestamp_client_ip ON dns_logs (timestamp, client_ip);";
command.ExecuteNonQuery();
await command.ExecuteNonQueryAsync();
}
using (SqliteCommand command = connection.CreateCommand())
await using (SqliteCommand command = connection.CreateCommand())
{
command.CommandText = "CREATE INDEX IF NOT EXISTS index_timestamp_qname ON dns_logs (timestamp, qname);";
command.ExecuteNonQuery();
await command.ExecuteNonQueryAsync();
}
using (SqliteCommand command = connection.CreateCommand())
await using (SqliteCommand command = connection.CreateCommand())
{
command.CommandText = "CREATE INDEX IF NOT EXISTS index_client_qname ON dns_logs (client_ip, qname);";
command.ExecuteNonQuery();
await command.ExecuteNonQueryAsync();
}
using (SqliteCommand command = connection.CreateCommand())
await using (SqliteCommand command = connection.CreateCommand())
{
command.CommandText = "CREATE INDEX IF NOT EXISTS index_query ON dns_logs (qname, qtype);";
command.ExecuteNonQuery();
await command.ExecuteNonQueryAsync();
}
using (SqliteCommand command = connection.CreateCommand())
await using (SqliteCommand command = connection.CreateCommand())
{
command.CommandText = "CREATE INDEX IF NOT EXISTS index_all ON dns_logs (timestamp, client_ip, protocol, response_type, rcode, qname, qtype, qclass);";
command.ExecuteNonQuery();
await command.ExecuteNonQueryAsync();
}
}
if (_enableLogging)
{
_queueTimer = new Timer(delegate (object state)
{
try
{
BulkInsertLogs();
}
catch (Exception ex)
{
_dnsServer.WriteLog(ex);
}
finally
{
if (_queueTimer is not null)
_queueTimer.Change(QUEUE_TIMER_INTERVAL, Timeout.Infinite);
}
});
_queueTimer.Change(QUEUE_TIMER_INTERVAL, Timeout.Infinite);
}
else
{
if (_queueTimer is not null)
{
_queueTimer.Dispose();
_queueTimer = null;
}
}
if (_maxLogDays < 1)
{
if (_cleanupTimer is not null)
{
_cleanupTimer.Dispose();
_cleanupTimer = null;
}
}
else
{
_cleanupTimer = new Timer(delegate (object state)
{
try
{
using (SqliteConnection connection = new SqliteConnection(_connectionString))
{
connection.Open();
using (SqliteCommand command = connection.CreateCommand())
{
command.CommandText = "DELETE FROM dns_logs WHERE timestamp < @timestamp;";
command.Parameters.AddWithValue("@timestamp", DateTime.UtcNow.AddDays(_maxLogDays * -1));
command.ExecuteNonQuery();
}
}
}
catch (Exception ex)
{
_dnsServer.WriteLog(ex);
}
finally
{
if (_cleanupTimer is not null)
_cleanupTimer.Change(CLEAN_UP_TIMER_PERIODIC_INTERVAL, Timeout.Infinite);
}
});
_queueTimer.Change(Timeout.Infinite, Timeout.Infinite);
if ((_maxLogDays > 0) || (_maxLogRecords > 0))
_cleanupTimer.Change(CLEAN_UP_TIMER_INITIAL_INTERVAL, Timeout.Infinite);
}
else
_cleanupTimer.Change(Timeout.Infinite, Timeout.Infinite);
return Task.CompletedTask;
if (!jsonConfig.TryGetProperty("maxLogRecords", out _))
{
config = config.Replace("\"sqliteDbPath\"", "\"maxLogRecords\": 0,\r\n \"useInMemoryDb\": false,\r\n \"sqliteDbPath\"");
await File.WriteAllTextAsync(Path.Combine(dnsServer.ApplicationFolder, "dnsApp.config"), config);
}
}
public Task InsertLogAsync(DateTime timestamp, DnsDatagram request, IPEndPoint remoteEP, DnsTransportProtocol protocol, DnsDatagram response)
@@ -392,7 +446,7 @@ CREATE TABLE IF NOT EXISTS dns_logs
return Task.CompletedTask;
}
public Task<DnsLogPage> QueryLogsAsync(long pageNumber, int entriesPerPage, bool descendingOrder, DateTime? start, DateTime? end, IPAddress clientIpAddress, DnsTransportProtocol? protocol, DnsServerResponseType? responseType, DnsResponseCode? rcode, string qname, DnsResourceRecordType? qtype, DnsClass? qclass)
public async Task<DnsLogPage> QueryLogsAsync(long pageNumber, int entriesPerPage, bool descendingOrder, DateTime? start, DateTime? end, IPAddress clientIpAddress, DnsTransportProtocol? protocol, DnsServerResponseType? responseType, DnsResponseCode? rcode, string qname, DnsResourceRecordType? qtype, DnsClass? qclass)
{
if (pageNumber == 0)
pageNumber = 1;
@@ -442,14 +496,14 @@ CREATE TABLE IF NOT EXISTS dns_logs
if (!string.IsNullOrEmpty(whereClause))
whereClause = whereClause.Substring(0, whereClause.Length - 5);
using (SqliteConnection connection = new SqliteConnection(_connectionString))
await using (SqliteConnection connection = new SqliteConnection(_connectionString))
{
connection.Open();
await connection.OpenAsync();
//find total entries
long totalEntries;
using (SqliteCommand command = connection.CreateCommand())
await using (SqliteCommand command = connection.CreateCommand())
{
command.CommandText = "SELECT Count(*) FROM dns_logs" + (string.IsNullOrEmpty(whereClause) ? ";" : " WHERE " + whereClause + ";");
@@ -480,7 +534,7 @@ CREATE TABLE IF NOT EXISTS dns_logs
if (qclass is not null)
command.Parameters.AddWithValue("@qclass", (ushort)qclass);
totalEntries = (long)command.ExecuteScalar();
totalEntries = (long)await command.ExecuteScalarAsync();
}
long totalPages = (totalEntries / entriesPerPage) + (totalEntries % entriesPerPage > 0 ? 1 : 0);
@@ -504,7 +558,7 @@ CREATE TABLE IF NOT EXISTS dns_logs
List<DnsLogEntry> entries = new List<DnsLogEntry>(entriesPerPage);
using (SqliteCommand command = connection.CreateCommand())
await using (SqliteCommand command = connection.CreateCommand())
{
command.CommandText = @"
SELECT * FROM (
@@ -559,9 +613,9 @@ ORDER BY row_num" + (descendingOrder ? " DESC" : "");
if (qclass is not null)
command.Parameters.AddWithValue("@qclass", (ushort)qclass);
using (SqliteDataReader reader = command.ExecuteReader())
await using (SqliteDataReader reader = await command.ExecuteReaderAsync())
{
while (reader.Read())
while (await reader.ReadAsync())
{
DnsQuestionRecord question;
@@ -582,7 +636,7 @@ ORDER BY row_num" + (descendingOrder ? " DESC" : "");
}
}
return Task.FromResult(new DnsLogPage(pageNumber, totalPages, totalEntries, entries));
return new DnsLogPage(pageNumber, totalPages, totalEntries, entries);
}
}