logging: Implement get_delay_queue_size()

Primarily for introspection given that re-delaying may exceed
queue sizes.
This commit is contained in:
Arne Welzel 2023-11-23 21:09:41 +01:00
parent f0e67022fd
commit 2dbb467ba2
8 changed files with 213 additions and 0 deletions

View file

@ -719,6 +719,13 @@ export {
## ##
## Returns: ``T`` on success, else ``F``. ## Returns: ``T`` on success, else ``F``.
global set_max_delay_queue_size: function(id: Log::ID, queue_size: count): bool; global set_max_delay_queue_size: function(id: Log::ID, queue_size: count): bool;
## Get the current size of the delay queue for a stream.
##
## id: The ID associated with a logging stream.
##
## Returns: The current size of the delay queue, or -1 on error.
global get_delay_queue_size: function(id: Log::ID): int;
} }
global all_streams: table[ID] of Stream = table(); global all_streams: table[ID] of Stream = table();
@ -1066,3 +1073,8 @@ function set_max_delay_queue_size(id: Log::ID, max_size: count): bool
return T; return T;
} }
function get_delay_queue_size(id: Log::ID): int
{
return Log::__get_delay_queue_size(id);
}

View file

@ -1312,6 +1312,14 @@ bool Manager::SetMaxDelayInterval(const EnumValPtr& id, double delay) {
return true; return true;
} }
zeek_int_t Manager::GetDelayQueueSize(const EnumValPtr& id) {
Stream* stream = FindStream(id.get());
if ( ! stream )
return -1;
return stream->delay_queue.size();
}
bool Manager::SetMaxDelayQueueSize(const EnumValPtr& id, zeek_uint_t queue_size) { bool Manager::SetMaxDelayQueueSize(const EnumValPtr& id, zeek_uint_t queue_size) {
Stream* stream = FindStream(id.get()); Stream* stream = FindStream(id.get());
if ( ! stream ) if ( ! stream )

View file

@ -236,6 +236,15 @@ public:
*/ */
bool SetMaxDelayQueueSize(const EnumValPtr& id, zeek_uint_t max_queue_length); bool SetMaxDelayQueueSize(const EnumValPtr& id, zeek_uint_t max_queue_length);
/**
* Returns the current size for the delay queue for the stream identified by \a id.
*
* @param id The enum value corresponding to the log stream.
*
* @return The size of the delay queue or -1 on error.
*/
zeek_int_t GetDelayQueueSize(const EnumValPtr& id);
/** /**
* Create a new log writer frontend. This is exposed so that the * Create a new log writer frontend. This is exposed so that the
* communication system can recreate remote log streams locally. * communication system can recreate remote log streams locally.

View file

@ -145,3 +145,12 @@ function Log::__set_max_delay_queue_size%(id: Log::ID, max_queue_size: count%):
bool result = zeek::log_mgr->SetMaxDelayQueueSize(idptr, max_queue_size); bool result = zeek::log_mgr->SetMaxDelayQueueSize(idptr, max_queue_size);
return zeek::val_mgr->Bool(result); return zeek::val_mgr->Bool(result);
%} %}
function Log::__get_delay_queue_size%(id: Log::ID%): int
%{
auto idptr = enum_ref(id);
if ( ! idptr )
return zeek::val_mgr->Bool(false);
return zeek::val_mgr->Int(zeek::log_mgr->GetDelayQueueSize(idptr));
%}

View file

@ -0,0 +1 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.

View file

@ -0,0 +1,75 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
0.0, queue_size non-existing, -1
0.0, queue_size existing, 0
1362692526.869344, new_packet, 1, queue size, 0
1362692526.869344, log_stream_policy, LOG, [ts=1362692526.869344, post_ts=<uninitialized>, write_ts=<uninitialized>, msg=packet number 1]
1362692526.869344, queue_size post write, 1
1362692526.939084, post_delay_cb, [ts=1362692526.869344, post_ts=1362692526.939084, write_ts=<uninitialized>, msg=packet number 1], 69.0 msecs 740.056992 usecs
1362692526.939084, log_policy
1362692526.939084, new_packet, 2, queue size, 0
1362692526.939084, log_stream_policy, LOG, [ts=1362692526.939084, post_ts=<uninitialized>, write_ts=<uninitialized>, msg=packet number 2]
1362692526.939084, queue_size post write, 1
1362692526.939378, new_packet, 3, queue size, 1
1362692526.939378, log_stream_policy, LOG, [ts=1362692526.939378, post_ts=<uninitialized>, write_ts=<uninitialized>, msg=packet number 3]
1362692526.939378, queue_size post write, 2
1362692526.939527, new_packet, 4, queue size, 2
1362692526.939527, log_stream_policy, LOG, [ts=1362692526.939527, post_ts=<uninitialized>, write_ts=<uninitialized>, msg=packet number 4]
1362692526.939527, queue_size post write, 3
1362692527.008509, post_delay_cb, [ts=1362692526.939084, post_ts=1362692527.008509, write_ts=<uninitialized>, msg=packet number 2], 69.0 msecs 424.86763 usecs
1362692527.008509, log_policy
1362692527.008509, post_delay_cb, [ts=1362692526.939378, post_ts=1362692527.008509, write_ts=<uninitialized>, msg=packet number 3], 69.0 msecs 130.897522 usecs
1362692527.008509, log_policy
1362692527.008509, post_delay_cb, [ts=1362692526.939527, post_ts=1362692527.008509, write_ts=<uninitialized>, msg=packet number 4], 68.0 msecs 981.88591 usecs
1362692527.008509, log_policy
1362692527.008509, new_packet, 5, queue size, 0
1362692527.008509, log_stream_policy, LOG, [ts=1362692527.008509, post_ts=<uninitialized>, write_ts=<uninitialized>, msg=packet number 5]
1362692527.008509, queue_size post write, 1
1362692527.009512, post_delay_cb, [ts=1362692527.008509, post_ts=1362692527.009512, write_ts=<uninitialized>, msg=packet number 5], 1.0 msec 3.026962 usecs
1362692527.009512, log_policy
1362692527.009512, new_packet, 6, queue size, 0
1362692527.009512, log_stream_policy, LOG, [ts=1362692527.009512, post_ts=<uninitialized>, write_ts=<uninitialized>, msg=packet number 6]
1362692527.009512, queue_size post write, 1
1362692527.009721, new_packet, 7, queue size, 1
1362692527.009721, log_stream_policy, LOG, [ts=1362692527.009721, post_ts=<uninitialized>, write_ts=<uninitialized>, msg=packet number 7]
1362692527.009721, queue_size post write, 2
1362692527.009765, new_packet, 8, queue size, 2
1362692527.009765, log_stream_policy, LOG, [ts=1362692527.009765, post_ts=<uninitialized>, write_ts=<uninitialized>, msg=packet number 8]
1362692527.009765, queue_size post write, 3
1362692527.009775, new_packet, 9, queue size, 3
1362692527.009775, log_stream_policy, LOG, [ts=1362692527.009775, post_ts=<uninitialized>, write_ts=<uninitialized>, msg=packet number 9]
1362692527.009775, queue_size post write, 4
1362692527.009855, new_packet, 10, queue size, 4
1362692527.009855, log_stream_policy, LOG, [ts=1362692527.009855, post_ts=<uninitialized>, write_ts=<uninitialized>, msg=packet number 10]
1362692527.009855, queue_size post write, 5
1362692527.009887, new_packet, 11, queue size, 5
1362692527.009887, log_stream_policy, LOG, [ts=1362692527.009887, post_ts=<uninitialized>, write_ts=<uninitialized>, msg=packet number 11]
1362692527.009887, queue_size post write, 6
1362692527.011846, post_delay_cb, [ts=1362692527.009512, post_ts=1362692527.011846, write_ts=<uninitialized>, msg=packet number 6], 2.0 msecs 334.117889 usecs
1362692527.011846, log_policy
1362692527.011846, post_delay_cb, [ts=1362692527.009721, post_ts=1362692527.011846, write_ts=<uninitialized>, msg=packet number 7], 2.0 msecs 125.024796 usecs
1362692527.011846, log_policy
1362692527.011846, post_delay_cb, [ts=1362692527.009765, post_ts=1362692527.011846, write_ts=<uninitialized>, msg=packet number 8], 2.0 msecs 81.155777 usecs
1362692527.011846, log_policy
1362692527.011846, post_delay_cb, [ts=1362692527.009775, post_ts=1362692527.011846, write_ts=<uninitialized>, msg=packet number 9], 2.0 msecs 71.142197 usecs
1362692527.011846, log_policy
1362692527.011846, post_delay_cb, [ts=1362692527.009855, post_ts=1362692527.011846, write_ts=<uninitialized>, msg=packet number 10], 1.0 msec 991.033554 usecs
1362692527.011846, log_policy
1362692527.011846, post_delay_cb, [ts=1362692527.009887, post_ts=1362692527.011846, write_ts=<uninitialized>, msg=packet number 11], 1.0 msec 959.085464 usecs
1362692527.011846, log_policy
1362692527.011846, new_packet, 12, queue size, 0
1362692527.011846, log_stream_policy, LOG, [ts=1362692527.011846, post_ts=<uninitialized>, write_ts=<uninitialized>, msg=packet number 12]
1362692527.011846, queue_size post write, 1
1362692527.080828, post_delay_cb, [ts=1362692527.011846, post_ts=1362692527.080828, write_ts=<uninitialized>, msg=packet number 12], 68.0 msecs 981.88591 usecs
1362692527.080828, log_policy
1362692527.080828, new_packet, 13, queue size, 0
1362692527.080828, log_stream_policy, LOG, [ts=1362692527.080828, post_ts=<uninitialized>, write_ts=<uninitialized>, msg=packet number 13]
1362692527.080828, queue_size post write, 1
1362692527.080972, new_packet, 14, queue size, 1
1362692527.080972, log_stream_policy, LOG, [ts=1362692527.080972, post_ts=<uninitialized>, write_ts=<uninitialized>, msg=packet number 14]
1362692527.080972, queue_size post write, 2
1362692527.080972, Pcap::file_done, <...>/get.trace
1362692527.080972, post_delay_cb, [ts=1362692527.080828, post_ts=1362692527.080972, write_ts=<uninitialized>, msg=packet number 13], 144.004822 usecs
1362692527.080972, log_policy
1362692527.080972, post_delay_cb, [ts=1362692527.080972, post_ts=1362692527.080972, write_ts=<uninitialized>, msg=packet number 14], 0 secs
1362692527.080972, log_policy
1362692527.080972, queue_size done, 0

View file

@ -0,0 +1,16 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
ts|post_ts|write_ts|msg
1362692526.869344|1362692526.939084|1362692526.939084|packet number 1
1362692526.939084|1362692527.008509|1362692527.008509|packet number 2
1362692526.939378|1362692527.008509|1362692527.008509|packet number 3
1362692526.939527|1362692527.008509|1362692527.008509|packet number 4
1362692527.008509|1362692527.009512|1362692527.009512|packet number 5
1362692527.009512|1362692527.011846|1362692527.011846|packet number 6
1362692527.009721|1362692527.011846|1362692527.011846|packet number 7
1362692527.009765|1362692527.011846|1362692527.011846|packet number 8
1362692527.009775|1362692527.011846|1362692527.011846|packet number 9
1362692527.009855|1362692527.011846|1362692527.011846|packet number 10
1362692527.009887|1362692527.011846|1362692527.011846|packet number 11
1362692527.011846|1362692527.080828|1362692527.080828|packet number 12
1362692527.080828|1362692527.080972|1362692527.080972|packet number 13
1362692527.080972|1362692527.080972|1362692527.080972|packet number 14

View file

@ -0,0 +1,83 @@
# @TEST-DOC: Delay queue testing.
# @TEST-EXEC: zeek -B logging,tm -b -r $TRACES/http/get.trace test.zeek %INPUT
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stdout
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr
# @TEST-EXEC: zeek-cut -m -F'|' < test.log > test.cut
# @TEST-EXEC: TEST_DIFF_CANONIFIER= btest-diff test.cut
@TEST-START-FILE test.zeek
# Used by all tests below.
# Debug printing
global packet_count = 0;
redef enum Log::ID += {
LOG
};
type Info: record {
ts: time &log;
post_ts: time &log &optional;
write_ts: time &log &optional;
msg: string &log;
};
event new_packet(c: connection, p: pkt_hdr)
{
++packet_count;
print network_time(), "new_packet", packet_count, "queue size", Log::get_delay_queue_size(LOG);
local info = Info($ts=network_time(), $msg=fmt("packet number %s", packet_count));
Log::write(LOG, info);
print network_time(), "queue_size post write", Log::get_delay_queue_size(LOG);
}
hook log_policy(rec: Info, id: Log::ID, filter: Log::Filter)
{
print network_time(), "log_policy";
rec$write_ts = network_time();
}
event Pcap::file_done(p: string)
{
print network_time(), "Pcap::file_done", p;
}
@TEST-END-FILE test.zeek
event zeek_init()
{
print network_time(), "queue_size non-existing", Log::get_delay_queue_size(LOG);
Log::create_stream(LOG, [
$columns=Info,
$path="test",
$policy=log_policy,
$max_delay_interval=1msec,
]);
print network_time(), "queue_size existing", Log::get_delay_queue_size(LOG);
}
event zeek_done()
{
print network_time(), "queue_size done", Log::get_delay_queue_size(LOG);
}
hook Log::log_stream_policy(rec: Info, id: Log::ID)
{
if ( id != LOG )
return;
local now = network_time();
print now, "log_stream_policy", id, rec;
Log::delay(id, rec, function[now](rec2: Info, id2: Log::ID): bool {
local delayed_for = network_time() - now;
rec2$post_ts = network_time();
print network_time(), "post_delay_cb", rec2, delayed_for;
return T;
});
}