logging: Add telemetry for streams and log writers

This adds one metric per log stream and one metric per log writer (path based)
to track the number of writes on a stream level as well as on a writer level.

    $ curl -sSf localhost:8181/metrics | grep Conn
    zeek_log_writer_writes_total{endpoint="",filter-name="default",module="HTTP",path="http",stream="HTTP::LOG",writer="Log::WRITER_SQLITE"} 1 1677497572770
    zeek_log_stream_writes_total{endpoint="",module="HTTP",stream="HTTP::LOG"} 1 1677497572770

The initial version of this change also included metrics around log
write vetoes, but given no log policies exist in the default configuration
and they are mostly interesting for a few streams/writers only, skip this
for now. These can always be added by the script writer, too.

The difference between the stream level writes and concrete writers can
be used to deduce the number of vetoes (or errors) as a starting point.
This commit is contained in:
Arne Welzel 2023-02-01 17:16:11 +01:00
parent 2b4aa38315
commit 69a98e2cbb
5 changed files with 104 additions and 2 deletions

View file

@ -3,6 +3,7 @@
#include "zeek/logging/Manager.h"
#include <broker/endpoint_info.hh>
#include <optional>
#include <utility>
#include "zeek/Desc.h"
@ -21,6 +22,7 @@
#include "zeek/logging/logging.bif.h"
#include "zeek/plugin/Manager.h"
#include "zeek/plugin/Plugin.h"
#include "zeek/telemetry/Manager.h"
#include "zeek/threading/Manager.h"
#include "zeek/threading/SerialTypes.h"
@ -73,6 +75,10 @@ struct Manager::WriterInfo
bool from_remote = false;
bool hook_initialized = false;
string instantiating_filter;
telemetry::IntCounter total_writes;
WriterInfo(telemetry::IntCounter total_writes) : total_writes(total_writes) { }
};
struct Manager::Stream
@ -92,6 +98,8 @@ struct Manager::Stream
bool enable_remote = false;
std::optional<telemetry::IntCounter> total_writes; // Initialized on first write.
~Stream();
};
@ -132,7 +140,16 @@ Manager::Stream::~Stream()
delete *f;
}
Manager::Manager() : plugin::ComponentManager<logging::Component>("Log", "Writer")
Manager::Manager()
: plugin::ComponentManager<logging::Component>("Log", "Writer"),
total_log_stream_writes_family(telemetry_mgr->CounterFamily(
"zeek", "log-stream-writes", {"module", "stream"},
"Total number of log writes for the given stream.", "1", true)),
total_log_writer_writes_family(telemetry_mgr->CounterFamily(
"zeek", "log-writer-writes", {"writer", "module", "stream", "filter-name", "path"},
"Total number of log writes passed to a concrete log writer not vetoed by stream or "
"filter policies.",
"1", true))
{
rotations_pending = 0;
}
@ -734,6 +751,16 @@ bool Manager::Write(EnumVal* id, RecordVal* columns_arg)
}
}
if ( ! stream->total_writes )
{
std::string module_name = detail::extract_module_name(stream->name.c_str());
std::initializer_list<telemetry::LabelView> labels{{"module", module_name},
{"stream", stream->name}};
stream->total_writes = total_log_stream_writes_family.GetOrAdd(labels);
}
stream->total_writes->Inc();
// Send to each of our filters.
for ( list<Filter*>::iterator i = stream->filters.begin(); i != stream->filters.end(); ++i )
{
@ -905,6 +932,9 @@ bool Manager::Write(EnumVal* id, RecordVal* columns_arg)
if ( ! writer )
return false;
// Find the newly inserted WriterInfo record.
w = stream->writers.find(wpp);
}
// Alright, can do the write now.
@ -927,6 +957,9 @@ bool Manager::Write(EnumVal* id, RecordVal* columns_arg)
return true;
}
assert(w != stream->writers.end());
w->second->total_writes.Inc();
// Write takes ownership of vals.
assert(writer);
writer->Write(filter->num_fields, vals);
@ -1170,7 +1203,17 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBacken
return w->second->writer;
}
WriterInfo* winfo = new WriterInfo;
// Initialize metric for this frontend.
std::string stream_module_name = detail::extract_module_name(stream->name.c_str());
std::string writer_name = writer->GetType()->AsEnumType()->Lookup(writer->AsEnum());
std::initializer_list<telemetry::LabelView> labels{{"writer", writer_name},
{"module", stream_module_name},
{"stream", stream->name},
{"filter-name", instantiating_filter},
{"path", info->path}};
WriterInfo* winfo = new WriterInfo(
zeek::log_mgr->total_log_writer_writes_family.GetOrAdd(labels));
winfo->type = writer->Ref()->AsEnumVal();
winfo->writer = nullptr;
winfo->open_time = run_state::network_time;