From 245fd0c94fc54d2a46a8169c15ee762194391a0d Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Tue, 20 Aug 2024 13:19:46 +0200 Subject: [PATCH] broker/logging: Change threading::Value** usage std::vector instead This allows to leverage automatic memory management, less allocations and using move semantics for expressing ownership. This breaks the existing logging and broker API, but keeps the plugin DoWrite() and HookLogWrite() methods functioning. It further changes ValToLogVal to return a threading::Value rather than a threading::Value*. The vector_val and set_val fields unfortunately use the same pointer-to-array-of-pointers approach. this can'tbe changed as it'd break backwards compatibility for plugin provided input readers and log writers. --- NEWS | 9 +++ src/broker/Manager.cc | 25 +++---- src/broker/Manager.h | 12 ++-- src/logging/Manager.cc | 121 +++++++++++++++++----------------- src/logging/Manager.h | 24 +++---- src/logging/WriterBackend.cc | 29 +++++--- src/logging/WriterBackend.h | 18 ++--- src/logging/WriterFrontend.cc | 77 +++++++--------------- src/logging/WriterFrontend.h | 79 +++++++++++++++++++--- 9 files changed, 215 insertions(+), 179 deletions(-) diff --git a/NEWS b/NEWS index 2e61bae237..497138927b 100644 --- a/NEWS +++ b/NEWS @@ -16,6 +16,15 @@ Breaking Changes new ``OpaqueVal::DoSerializeData`` and ``OpaqueVal::DoUnserializeData`` methods. +* Certain internal methods on the broker and logging classes have been changed to + accept std::vector parameters instead of threading::Value** + to leverage automatic memory management, reduce the number of allocations + and use move semantics to express ownership. + + The DoWrite() and HookLogWrite() methods which can be provided by plugins + are not affected by this change, so we keep backwards compatibility with + existing log writers. + New Functionality ----------------- diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 2b7aa9bbb8..cbf9f55b91 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -680,8 +680,8 @@ bool Manager::PublishLogCreate(EnumVal* stream, EnumVal* writer, const logging:: return true; } -bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int num_fields, - const threading::Value* const* vals) { +bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, const string& path, + const logging::detail::LogRecord& rec) { if ( bstate->endpoint.is_shutdown() ) return true; @@ -709,16 +709,17 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int fmt.StartWrite(); - bool success = fmt.Write(num_fields, "num_fields"); + // Cast to int for binary compatibility. + bool success = fmt.Write(static_cast(rec.size()), "num_fields"); if ( ! success ) { reporter->Error("Failed to remotely log stream %s: num_fields serialization failed", stream_id); return false; } - for ( int i = 0; i < num_fields; ++i ) { - if ( ! vals[i]->Write(&fmt) ) { - reporter->Error("Failed to remotely log stream %s: field %d serialization failed", stream_id, i); + for ( size_t i = 0; i < rec.size(); ++i ) { + if ( ! rec[i].Write(&fmt) ) { + reporter->Error("Failed to remotely log stream %s: field %zu serialization failed", stream_id, i); return false; } } @@ -1375,16 +1376,10 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) { return false; } - auto vals = new threading::Value*[num_fields]; + logging::detail::LogRecord rec(num_fields); for ( int i = 0; i < num_fields; ++i ) { - vals[i] = new threading::Value; - - if ( ! vals[i]->Read(&fmt) ) { - for ( int j = 0; j <= i; ++j ) - delete vals[j]; - - delete[] vals; + if ( ! rec[i].Read(&fmt) ) { reporter->Warning("failed to unserialize remote log field %d for stream: %s", i, c_str_safe(stream_id_name).c_str()); @@ -1392,7 +1387,7 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) { } } - log_mgr->WriteFromRemote(stream_id->AsEnumVal(), writer_id->AsEnumVal(), path, num_fields, vals); + log_mgr->WriteFromRemote(stream_id->AsEnumVal(), writer_id->AsEnumVal(), path, std::move(rec)); fmt.EndRead(); return true; } diff --git a/src/broker/Manager.h b/src/broker/Manager.h index f748f4c5b0..979d51f20a 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -223,18 +223,16 @@ public: const broker::endpoint_info& peer = NoPeer); /** - * Send a log entry to any interested peers. The topic name used is - * implicitly "bro/log/". + * Send a log entry to any interested peers. + * * @param stream the stream to which the log entry belongs. * @param writer the writer to use for outputting this log entry. * @param path the log path to output the log entry to. - * @param num_vals the number of fields to log. - * @param vals the log values to log, of size num_vals. - * See the Broker::SendFlags record type. + * @param rec the log record. * @return true if the message is sent successfully. */ - bool PublishLogWrite(EnumVal* stream, EnumVal* writer, std::string path, int num_vals, - const threading::Value* const* vals); + bool PublishLogWrite(EnumVal* stream, EnumVal* writer, const std::string& path, + const logging::detail::LogRecord& rec); /** * Automatically send an event to any interested peers whenever it is diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 5a46cbcd33..69a39822e7 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -1153,21 +1153,25 @@ bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr c } // Alright, can do the write now. + auto rec = RecordToLogRecord(stream, filter, columns.get()); - threading::Value** vals = RecordToFilterVals(stream, filter, columns.get()); + if ( zeek::plugin_mgr->HavePluginForHook(zeek::plugin::HOOK_LOG_WRITE) ) { + // The current HookLogWrite API takes a threading::Value**. + // Fabricate the pointer array on the fly. Mutation is allowed. + std::vector vals(rec.size()); + for ( size_t i = 0; i < rec.size(); i++ ) + vals[i] = &rec[i]; - if ( ! PLUGIN_HOOK_WITH_RESULT(HOOK_LOG_WRITE, - HookLogWrite(filter->writer->GetType()->AsEnumType()->Lookup( - filter->writer->InternalInt()), - filter->name, *info, filter->num_fields, filter->fields, vals), - true) ) { - DeleteVals(filter->num_fields, vals); + bool res = + zeek::plugin_mgr->HookLogWrite(filter->writer->GetType()->AsEnumType()->Lookup( + filter->writer->InternalInt()), + filter->name, *info, filter->num_fields, filter->fields, &vals[0]); + if ( ! res ) { + DBG_LOG(DBG_LOGGING, "Hook prevented writing to filter '%s' on stream '%s'", filter->name.c_str(), + stream->name.c_str()); -#ifdef DEBUG - DBG_LOG(DBG_LOGGING, "Hook prevented writing to filter '%s' on stream '%s'", filter->name.c_str(), - stream->name.c_str()); -#endif - return true; + return true; + } } assert(w != stream->writers.end()); @@ -1175,7 +1179,7 @@ bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr c // Write takes ownership of vals. assert(writer); - writer->Write(filter->num_fields, vals); + writer->Write(std::move(rec)); #ifdef DEBUG DBG_LOG(DBG_LOGGING, "Wrote record to filter '%s' on stream '%s'", filter->name.c_str(), stream->name.c_str()); @@ -1385,35 +1389,38 @@ bool Manager::SetMaxDelayQueueSize(const EnumValPtr& id, zeek_uint_t queue_size) return true; } -threading::Value* Manager::ValToLogVal(std::optional& val, Type* ty) { +threading::Value Manager::ValToLogVal(std::optional& val, Type* ty) { if ( ! val ) - return new threading::Value(ty->Tag(), false); + return {ty->Tag(), false}; - threading::Value* lval = new threading::Value(ty->Tag()); + threading::Value lval{ty->Tag()}; - switch ( lval->type ) { + switch ( lval.type ) { case TYPE_BOOL: - case TYPE_INT: lval->val.int_val = val->AsInt(); break; + case TYPE_INT: lval.val.int_val = val->AsInt(); break; case TYPE_ENUM: { const char* s = ty->AsEnumType()->Lookup(val->AsInt()); if ( s ) { auto len = strlen(s); - lval->val.string_val.data = util::copy_string(s, len); - lval->val.string_val.length = len; + lval.val.string_val.data = util::copy_string(s, len); + lval.val.string_val.length = len; } else { auto err_msg = "enum type does not contain value:" + std::to_string(val->AsInt()); ty->Error(err_msg.c_str()); - lval->val.string_val.data = util::copy_string("", 0); - lval->val.string_val.length = 0; + lval.val.string_val.data = util::copy_string("", 0); + lval.val.string_val.length = 0; } break; } - case TYPE_COUNT: lval->val.uint_val = val->AsCount(); break; + case TYPE_COUNT: { + lval.val.uint_val = val->AsCount(); + break; + } case TYPE_PORT: { auto p = val->AsCount(); @@ -1427,26 +1434,26 @@ threading::Value* Manager::ValToLogVal(std::optional& val, Type* ty) { else if ( pm == ICMP_PORT_MASK ) pt = TRANSPORT_ICMP; - lval->val.port_val.port = p & ~PORT_SPACE_MASK; - lval->val.port_val.proto = pt; + lval.val.port_val.port = p & ~PORT_SPACE_MASK; + lval.val.port_val.proto = pt; break; } - case TYPE_SUBNET: val->AsSubNet()->Get().ConvertToThreadingValue(&lval->val.subnet_val); break; + case TYPE_SUBNET: val->AsSubNet()->Get().ConvertToThreadingValue(&lval.val.subnet_val); break; - case TYPE_ADDR: val->AsAddr()->Get().ConvertToThreadingValue(&lval->val.addr_val); break; + case TYPE_ADDR: val->AsAddr()->Get().ConvertToThreadingValue(&lval.val.addr_val); break; case TYPE_DOUBLE: case TYPE_TIME: - case TYPE_INTERVAL: lval->val.double_val = val->AsDouble(); break; + case TYPE_INTERVAL: lval.val.double_val = val->AsDouble(); break; case TYPE_STRING: { const String* s = val->AsString()->AsString(); char* buf = new char[s->Len()]; memcpy(buf, s->Bytes(), s->Len()); - lval->val.string_val.data = buf; - lval->val.string_val.length = s->Len(); + lval.val.string_val.data = buf; + lval.val.string_val.length = s->Len(); break; } @@ -1454,8 +1461,8 @@ threading::Value* Manager::ValToLogVal(std::optional& val, Type* ty) { const File* f = val->AsFile(); const char* s = f->Name(); auto len = strlen(s); - lval->val.string_val.data = util::copy_string(s, len); - lval->val.string_val.length = len; + lval.val.string_val.data = util::copy_string(s, len); + lval.val.string_val.length = len; break; } @@ -1465,8 +1472,8 @@ threading::Value* Manager::ValToLogVal(std::optional& val, Type* ty) { f->Describe(&d); const char* s = d.Description(); auto len = strlen(s); - lval->val.string_val.data = util::copy_string(s, len); - lval->val.string_val.length = len; + lval.val.string_val.data = util::copy_string(s, len); + lval.val.string_val.length = len; break; } @@ -1483,12 +1490,12 @@ threading::Value* Manager::ValToLogVal(std::optional& val, Type* ty) { auto& set_t = tbl_t->GetIndexTypes()[0]; bool is_managed = ZVal::IsManagedType(set_t); - lval->val.set_val.size = set->Length(); - lval->val.set_val.vals = new threading::Value*[lval->val.set_val.size]; + lval.val.set_val.size = set->Length(); + lval.val.set_val.vals = new threading::Value*[lval.val.set_val.size]; - for ( zeek_int_t i = 0; i < lval->val.set_val.size; i++ ) { + for ( zeek_int_t i = 0; i < lval.val.set_val.size; i++ ) { std::optional s_i = ZVal(set->Idx(i), set_t); - lval->val.set_val.vals[i] = ValToLogVal(s_i, set_t.get()); + lval.val.set_val.vals[i] = new threading::Value(ValToLogVal(s_i, set_t.get())); if ( is_managed ) ZVal::DeleteManagedType(*s_i); } @@ -1498,26 +1505,26 @@ threading::Value* Manager::ValToLogVal(std::optional& val, Type* ty) { case TYPE_VECTOR: { VectorVal* vec = val->AsVector(); - lval->val.vector_val.size = vec->Size(); - lval->val.vector_val.vals = new threading::Value*[lval->val.vector_val.size]; + lval.val.vector_val.size = vec->Size(); + lval.val.vector_val.vals = new threading::Value*[lval.val.vector_val.size]; auto& vv = vec->RawVec(); auto& vt = vec->GetType()->Yield(); - for ( zeek_int_t i = 0; i < lval->val.vector_val.size; i++ ) { - lval->val.vector_val.vals[i] = ValToLogVal(vv[i], vt.get()); + for ( zeek_int_t i = 0; i < lval.val.vector_val.size; i++ ) { + lval.val.vector_val.vals[i] = new threading::Value(ValToLogVal(vv[i], vt.get())); } break; } - default: reporter->InternalError("unsupported type %s for log_write", type_name(lval->type)); + default: reporter->InternalError("unsupported type %s for log_write", type_name(lval.type)); } return lval; } -threading::Value** Manager::RecordToFilterVals(const Stream* stream, Filter* filter, RecordVal* columns) { +detail::LogRecord Manager::RecordToLogRecord(const Stream* stream, Filter* filter, RecordVal* columns) { RecordValPtr ext_rec; if ( filter->num_ext_fields > 0 ) { @@ -1527,7 +1534,9 @@ threading::Value** Manager::RecordToFilterVals(const Stream* stream, Filter* fil ext_rec = {AdoptRef{}, res.release()->AsRecordVal()}; } - threading::Value** vals = new threading::Value*[filter->num_fields]; + // Allocate storage for all vals. + detail::LogRecord vals; + vals.reserve(filter->num_fields); for ( int i = 0; i < filter->num_fields; ++i ) { std::optional val; @@ -1535,7 +1544,7 @@ threading::Value** Manager::RecordToFilterVals(const Stream* stream, Filter* fil if ( i < filter->num_ext_fields ) { if ( ! ext_rec ) { // executing function did not return record. Send empty for all vals. - vals[i] = new threading::Value(filter->fields[i]->type, false); + vals.emplace_back(filter->fields[i]->type, false); continue; } @@ -1557,7 +1566,7 @@ threading::Value** Manager::RecordToFilterVals(const Stream* stream, Filter* fil if ( ! val ) { // Value, or any of its parents, is not set. - vals[i] = new threading::Value(filter->fields[i]->type, false); + vals.emplace_back(filter->fields[i]->type, false); break; } @@ -1565,7 +1574,7 @@ threading::Value** Manager::RecordToFilterVals(const Stream* stream, Filter* fil } if ( val ) - vals[i] = ValToLogVal(val, vt); + vals.emplace_back(ValToLogVal(val, vt)); } return vals; @@ -1688,16 +1697,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBacken return winfo->writer; } -void Manager::DeleteVals(int num_fields, threading::Value** vals) { - // Note this code is duplicated in WriterBackend::DeleteVals(). - for ( int i = 0; i < num_fields; i++ ) - delete vals[i]; - - delete[] vals; -} - -bool Manager::WriteFromRemote(EnumVal* id, EnumVal* writer, const string& path, int num_fields, - threading::Value** vals) { +bool Manager::WriteFromRemote(EnumVal* id, EnumVal* writer, const string& path, detail::LogRecord&& rec) { Stream* stream = FindStream(id); if ( ! stream ) { @@ -1707,12 +1707,10 @@ bool Manager::WriteFromRemote(EnumVal* id, EnumVal* writer, const string& path, id->Describe(&desc); DBG_LOG(DBG_LOGGING, "unknown stream %s in Manager::Write()", desc.Description()); #endif - DeleteVals(num_fields, vals); return false; } if ( ! stream->enabled ) { - DeleteVals(num_fields, vals); return true; } @@ -1725,11 +1723,10 @@ bool Manager::WriteFromRemote(EnumVal* id, EnumVal* writer, const string& path, id->Describe(&desc); DBG_LOG(DBG_LOGGING, "unknown writer %s in Manager::Write()", desc.Description()); #endif - DeleteVals(num_fields, vals); return false; } - w->second->writer->Write(num_fields, vals); + w->second->writer->Write(std::move(rec)); DBG_LOG(DBG_LOGGING, "Wrote pre-filtered record to path '%s' on stream '%s'", path.c_str(), stream->name.c_str()); diff --git a/src/logging/Manager.h b/src/logging/Manager.h index 43b13892aa..81eedb47ef 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -268,9 +268,10 @@ public: const threading::Field* const* fields); /** - * Writes out log entries that have already passed through all - * filters (and have raised any events). This is meant called for logs - * received already processed from remote. + * Writes out log entries received from remote nodes. + * + * The given record has passed through all policy filters and raised events + * on the sending node. It's only meant to be written out. * * @param stream The enum value corresponding to the log stream. * @@ -278,13 +279,11 @@ public: * * @param path The path of the target log stream to write to. * - * @param num_fields The number of log values to write. - * - * @param vals An array of log values to write, of size num_fields. - * The method takes ownership of the array. + * @param rec Representation of the log record to write. + + * @return Returns true if the record was processed successfully. */ - bool WriteFromRemote(EnumVal* stream, EnumVal* writer, const std::string& path, int num_fields, - threading::Value** vals); + bool WriteFromRemote(EnumVal* id, EnumVal* writer, const std::string& path, detail::LogRecord&& rec); /** * Announces all instantiated writers to a given Broker peer. @@ -365,9 +364,6 @@ protected: bool FinishedRotation(WriterFrontend* writer, const char* new_name, const char* old_name, double open, double close, bool success, bool terminating); - // Deletes the values as passed into Write(). - void DeleteVals(int num_fields, threading::Value** vals); - private: struct Filter; struct Stream; @@ -376,9 +372,9 @@ private: bool TraverseRecord(Stream* stream, Filter* filter, RecordType* rt, TableVal* include, TableVal* exclude, const std::string& path, const std::list& indices); - threading::Value** RecordToFilterVals(const Stream* stream, Filter* filter, RecordVal* columns); + detail::LogRecord RecordToLogRecord(const Stream* stream, Filter* filter, RecordVal* columns); + threading::Value ValToLogVal(std::optional& val, Type* ty); - threading::Value* ValToLogVal(std::optional& val, Type* ty); Stream* FindStream(EnumVal* id); void RemoveDisabledWriters(Stream* stream); void InstallRotationTimer(WriterInfo* winfo); diff --git a/src/logging/WriterBackend.cc b/src/logging/WriterBackend.cc index 30a12e74f0..e3b64c89aa 100644 --- a/src/logging/WriterBackend.cc +++ b/src/logging/WriterBackend.cc @@ -181,7 +181,7 @@ bool WriterBackend::Init(int arg_num_fields, const Field* const* arg_fields) { return true; } -bool WriterBackend::Write(int arg_num_fields, int num_writes, Value*** vals) { +bool WriterBackend::Write(int arg_num_fields, zeek::Span records) { // Double-check that the arguments match. If we get this from remote, // something might be mixed up. if ( num_fields != arg_num_fields ) { @@ -191,22 +191,20 @@ bool WriterBackend::Write(int arg_num_fields, int num_writes, Value*** vals) { Debug(DBG_LOGGING, msg); #endif - DeleteVals(num_writes, vals); DisableFrontend(); return false; } // Double-check all the types match. - for ( int j = 0; j < num_writes; j++ ) { + for ( size_t j = 0; j < records.size(); j++ ) { for ( int i = 0; i < num_fields; ++i ) { - if ( vals[j][i]->type != fields[i]->type ) { + if ( records[j][i].type != fields[i]->type ) { #ifdef DEBUG const char* msg = Fmt("Field #%d type doesn't match in WriterBackend::Write() (%d vs. %d)", i, - vals[j][i]->type, fields[i]->type); + records[j][i].type, fields[i]->type); Debug(DBG_LOGGING, msg); #endif DisableFrontend(); - DeleteVals(num_writes, vals); return false; } } @@ -215,16 +213,27 @@ bool WriterBackend::Write(int arg_num_fields, int num_writes, Value*** vals) { bool success = true; if ( ! Failed() ) { - for ( int j = 0; j < num_writes; j++ ) { - success = DoWrite(num_fields, fields, vals[j]); + // Populate a Value* array for backwards compat with plugin + // provided WriterBackend implementations that expect to + // receive a threading::Value**. + // + // We keep the raw pointer for this API, as threading::Value + // itself manages strings, sets and vectors using raw pointers, + // so this seems more consistent than mixing. + std::vector valps(num_fields); + + for ( size_t j = 0; j < records.size(); j++ ) { + auto& write_vals = records[j]; + for ( int f = 0; f < num_fields; f++ ) + valps[f] = &write_vals[f]; + + success = DoWrite(num_fields, fields, &valps[0]); if ( ! success ) break; } } - DeleteVals(num_writes, vals); - if ( ! success ) DisableFrontend(); diff --git a/src/logging/WriterBackend.h b/src/logging/WriterBackend.h index 9902d9f429..3cb09d2584 100644 --- a/src/logging/WriterBackend.h +++ b/src/logging/WriterBackend.h @@ -4,6 +4,7 @@ #pragma once +#include "zeek/Span.h" #include "zeek/logging/Component.h" #include "zeek/threading/MsgThread.h" @@ -13,6 +14,12 @@ class data; namespace zeek::logging { +namespace detail { + +using LogRecord = std::vector; + +} + class WriterFrontend; /** @@ -137,21 +144,16 @@ public: bool Init(int num_fields, const threading::Field* const* fields); /** - * Writes one log entry. + * Write a batch of log records. * * @param num_fields: The number of log fields for this stream. The * value must match what was passed to Init(). * - * @param An array of size \a num_fields with the log values. Their - * types must match with the field passed to Init(). The method - * takes ownership of \a vals.. - * - * Returns false if an error occurred, in which case the writer must - * not be used any further. + * @param records Span of LogRecord instances to write out. * * @return False if an error occurred. */ - bool Write(int num_fields, int num_writes, threading::Value*** vals); + bool Write(int arg_num_fields, zeek::Span records); /** * Sets the buffering status for the writer, assuming the writer diff --git a/src/logging/WriterFrontend.cc b/src/logging/WriterFrontend.cc index d1fae0d3f9..49f7960cd3 100644 --- a/src/logging/WriterFrontend.cc +++ b/src/logging/WriterFrontend.cc @@ -1,6 +1,7 @@ #include "zeek/logging/WriterFrontend.h" #include "zeek/RunState.h" +#include "zeek/Span.h" #include "zeek/broker/Manager.h" #include "zeek/logging/Manager.h" #include "zeek/logging/WriterBackend.h" @@ -50,18 +51,16 @@ private: class WriteMessage final : public threading::InputMessage { public: - WriteMessage(WriterBackend* backend, int num_fields, int num_writes, Value*** vals) + WriteMessage(WriterBackend* backend, int num_fields, std::vector&& records) : threading::InputMessage("Write", backend), num_fields(num_fields), - num_writes(num_writes), - vals(vals) {} + records(std::move(records)) {} - bool Process() override { return Object()->Write(num_fields, num_writes, vals); } + bool Process() override { return Object()->Write(num_fields, zeek::Span{records}); } private: int num_fields; - int num_writes; - Value*** vals; + std::vector records; }; class SetBufMessage final : public threading::InputMessage { @@ -89,7 +88,8 @@ private: // Frontend methods. WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVal* arg_stream, EnumVal* arg_writer, - bool arg_local, bool arg_remote) { + bool arg_local, bool arg_remote) + : write_buffer(detail::WriteBuffer(WRITER_BUFFER_SIZE)) { stream = arg_stream; writer = arg_writer; Ref(stream); @@ -99,8 +99,6 @@ WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVa buf = true; local = arg_local; remote = arg_remote; - write_buffer = nullptr; - write_buffer_pos = 0; info = new WriterBackend::WriterInfo(arg_info); num_fields = 0; @@ -173,37 +171,28 @@ void WriterFrontend::Init(int arg_num_fields, const Field* const* arg_fields) { } } -void WriterFrontend::Write(int arg_num_fields, Value** vals) { - if ( disabled ) { - DeleteVals(arg_num_fields, vals); - return; - } +void WriterFrontend::Write(detail::LogRecord&& arg_vals) { + std::vector vals = std::move(arg_vals); - if ( arg_num_fields != num_fields ) { - reporter->Warning("WriterFrontend %s expected %d fields in write, got %d. Skipping line.", name, num_fields, - arg_num_fields); - DeleteVals(arg_num_fields, vals); + if ( disabled ) + return; + + if ( vals.size() != static_cast(num_fields) ) { + reporter->Warning("WriterFrontend %s expected %d fields in write, got %zu. Skipping line.", name, num_fields, + vals.size()); return; } if ( remote ) { - broker_mgr->PublishLogWrite(stream, writer, info->path, num_fields, vals); + broker_mgr->PublishLogWrite(stream, writer, info->path, vals); } - if ( ! backend ) { - DeleteVals(arg_num_fields, vals); + if ( ! backend ) return; - } - if ( ! write_buffer ) { - // Need new buffer. - write_buffer = new Value**[WRITER_BUFFER_SIZE]; - write_buffer_pos = 0; - } + write_buffer.WriteRecord(std::move(vals)); - write_buffer[write_buffer_pos++] = vals; - - if ( write_buffer_pos >= WRITER_BUFFER_SIZE || ! buf || run_state::terminating ) + if ( write_buffer.Full() || ! buf || run_state::terminating ) // Buffer full (or no buffering desired or terminating). FlushWriteBuffer(); } @@ -214,16 +203,12 @@ void WriterFrontend::FlushWriteBuffer() { return; } - if ( ! write_buffer_pos ) + if ( write_buffer.Empty() ) // Nothing to do. return; if ( backend ) - backend->SendIn(new WriteMessage(backend, num_fields, write_buffer_pos, write_buffer)); - - // Clear buffer (no delete, we pass ownership to child thread.) - write_buffer = nullptr; - write_buffer_pos = 0; + backend->SendIn(new WriteMessage(backend, num_fields, std::move(write_buffer).TakeRecords())); } void WriterFrontend::SetBuf(bool enabled) { @@ -263,24 +248,6 @@ void WriterFrontend::Rotate(const char* rotated_path, double open, double close, log_mgr->FinishedRotation(this, nullptr, nullptr, 0, 0, false, terminating); } -void WriterFrontend::DeleteVals(int num_fields, Value** vals) { - // Note this code is duplicated in Manager::DeleteVals(). - for ( int i = 0; i < num_fields; i++ ) - delete vals[i]; - - delete[] vals; -} - -void WriterFrontend::CleanupWriteBuffer() { - if ( ! write_buffer || write_buffer_pos == 0 ) - return; - - for ( int j = 0; j < write_buffer_pos; j++ ) - DeleteVals(num_fields, write_buffer[j]); - - delete[] write_buffer; - write_buffer = nullptr; - write_buffer_pos = 0; -} +void WriterFrontend::CleanupWriteBuffer() { write_buffer.Clear(); } } // namespace zeek::logging diff --git a/src/logging/WriterFrontend.h b/src/logging/WriterFrontend.h index dfd984b502..af860de22c 100644 --- a/src/logging/WriterFrontend.h +++ b/src/logging/WriterFrontend.h @@ -8,6 +8,71 @@ namespace zeek::logging { class Manager; + +namespace detail { + +/** + * Implements a buffer accumulating log records in \a WriterFrontend instance + * before passing them to \a WriterBackend instances. + * + * \see WriterFrontend::Write + */ +class WriteBuffer { +public: + /** + * Constructor. + */ + explicit WriteBuffer(size_t buffer_size) : buffer_size(buffer_size) {} + + /** + * Push a record to the buffer. + * + * @param record The records vals. + */ + void WriteRecord(LogRecord&& record) { records.emplace_back(std::move(record)); } + + /** + * Moves the records out of the buffer and resets it. + * + * @return The currently buffered log records. + */ + std::vector TakeRecords() && { + auto tmp = std::move(records); + + // Re-initialize the buffer. + records.clear(); + records.reserve(buffer_size); + + return tmp; + } + + /** + * @return The size of the buffer. + */ + size_t Size() const { return records.size(); } + + /** + * @return True if buffer is empty. + */ + size_t Empty() const { return records.empty(); } + + /** + * @return True if size equals or exceeds configured buffer size. + */ + bool Full() const { return records.size() >= buffer_size; } + + /** + * Clear the records buffer. + */ + void Clear() { records.clear(); } + +private: + size_t buffer_size; + std::vector records; +}; + +} // namespace detail + /** * Bridge class between the logging::Manager and backend writer threads. The * Manager instantiates one \a WriterFrontend for each open logging filter. @@ -84,13 +149,14 @@ public: * FlushWriteBuffer(). The backend writer triggers this with a * message at every heartbeat. * - * See WriterBackend::Writer() for arguments (except that this method - * takes only a single record, not an array). The method takes - * ownership of \a vals. + * If the frontend has remote logging enabled, the record is also + * published to interested peers. * + * @param rec Representation of the log record. Callee takes ownership. + * This method must only be called from the main thread. */ - void Write(int num_fields, threading::Value** vals); + void Write(detail::LogRecord&& rec); /** * Sets the buffering state. @@ -185,8 +251,6 @@ public: protected: friend class Manager; - void DeleteVals(int num_fields, threading::Value** vals); - EnumVal* stream; EnumVal* writer; @@ -204,8 +268,7 @@ protected: // Buffer for bulk writes. static const int WRITER_BUFFER_SIZE = 1000; - int write_buffer_pos; // Position of next write in buffer. - threading::Value*** write_buffer; // Buffer of size WRITER_BUFFER_SIZE. + detail::WriteBuffer write_buffer; // Buffer of size WRITER_BUFFER_SIZE. private: void CleanupWriteBuffer();