diff --git a/NEWS b/NEWS index 2e61bae237..497138927b 100644 --- a/NEWS +++ b/NEWS @@ -16,6 +16,15 @@ Breaking Changes new ``OpaqueVal::DoSerializeData`` and ``OpaqueVal::DoUnserializeData`` methods. +* Certain internal methods on the broker and logging classes have been changed to + accept std::vector parameters instead of threading::Value** + to leverage automatic memory management, reduce the number of allocations + and use move semantics to express ownership. + + The DoWrite() and HookLogWrite() methods which can be provided by plugins + are not affected by this change, so we keep backwards compatibility with + existing log writers. + New Functionality ----------------- diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 2b7aa9bbb8..cbf9f55b91 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -680,8 +680,8 @@ bool Manager::PublishLogCreate(EnumVal* stream, EnumVal* writer, const logging:: return true; } -bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int num_fields, - const threading::Value* const* vals) { +bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, const string& path, + const logging::detail::LogRecord& rec) { if ( bstate->endpoint.is_shutdown() ) return true; @@ -709,16 +709,17 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int fmt.StartWrite(); - bool success = fmt.Write(num_fields, "num_fields"); + // Cast to int for binary compatibility. + bool success = fmt.Write(static_cast(rec.size()), "num_fields"); if ( ! success ) { reporter->Error("Failed to remotely log stream %s: num_fields serialization failed", stream_id); return false; } - for ( int i = 0; i < num_fields; ++i ) { - if ( ! vals[i]->Write(&fmt) ) { - reporter->Error("Failed to remotely log stream %s: field %d serialization failed", stream_id, i); + for ( size_t i = 0; i < rec.size(); ++i ) { + if ( ! rec[i].Write(&fmt) ) { + reporter->Error("Failed to remotely log stream %s: field %zu serialization failed", stream_id, i); return false; } } @@ -1375,16 +1376,10 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) { return false; } - auto vals = new threading::Value*[num_fields]; + logging::detail::LogRecord rec(num_fields); for ( int i = 0; i < num_fields; ++i ) { - vals[i] = new threading::Value; - - if ( ! vals[i]->Read(&fmt) ) { - for ( int j = 0; j <= i; ++j ) - delete vals[j]; - - delete[] vals; + if ( ! rec[i].Read(&fmt) ) { reporter->Warning("failed to unserialize remote log field %d for stream: %s", i, c_str_safe(stream_id_name).c_str()); @@ -1392,7 +1387,7 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) { } } - log_mgr->WriteFromRemote(stream_id->AsEnumVal(), writer_id->AsEnumVal(), path, num_fields, vals); + log_mgr->WriteFromRemote(stream_id->AsEnumVal(), writer_id->AsEnumVal(), path, std::move(rec)); fmt.EndRead(); return true; } diff --git a/src/broker/Manager.h b/src/broker/Manager.h index f748f4c5b0..979d51f20a 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -223,18 +223,16 @@ public: const broker::endpoint_info& peer = NoPeer); /** - * Send a log entry to any interested peers. The topic name used is - * implicitly "bro/log/". + * Send a log entry to any interested peers. + * * @param stream the stream to which the log entry belongs. * @param writer the writer to use for outputting this log entry. * @param path the log path to output the log entry to. - * @param num_vals the number of fields to log. - * @param vals the log values to log, of size num_vals. - * See the Broker::SendFlags record type. + * @param rec the log record. * @return true if the message is sent successfully. */ - bool PublishLogWrite(EnumVal* stream, EnumVal* writer, std::string path, int num_vals, - const threading::Value* const* vals); + bool PublishLogWrite(EnumVal* stream, EnumVal* writer, const std::string& path, + const logging::detail::LogRecord& rec); /** * Automatically send an event to any interested peers whenever it is diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 5a46cbcd33..69a39822e7 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -1153,21 +1153,25 @@ bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr c } // Alright, can do the write now. + auto rec = RecordToLogRecord(stream, filter, columns.get()); - threading::Value** vals = RecordToFilterVals(stream, filter, columns.get()); + if ( zeek::plugin_mgr->HavePluginForHook(zeek::plugin::HOOK_LOG_WRITE) ) { + // The current HookLogWrite API takes a threading::Value**. + // Fabricate the pointer array on the fly. Mutation is allowed. + std::vector vals(rec.size()); + for ( size_t i = 0; i < rec.size(); i++ ) + vals[i] = &rec[i]; - if ( ! PLUGIN_HOOK_WITH_RESULT(HOOK_LOG_WRITE, - HookLogWrite(filter->writer->GetType()->AsEnumType()->Lookup( - filter->writer->InternalInt()), - filter->name, *info, filter->num_fields, filter->fields, vals), - true) ) { - DeleteVals(filter->num_fields, vals); + bool res = + zeek::plugin_mgr->HookLogWrite(filter->writer->GetType()->AsEnumType()->Lookup( + filter->writer->InternalInt()), + filter->name, *info, filter->num_fields, filter->fields, &vals[0]); + if ( ! res ) { + DBG_LOG(DBG_LOGGING, "Hook prevented writing to filter '%s' on stream '%s'", filter->name.c_str(), + stream->name.c_str()); -#ifdef DEBUG - DBG_LOG(DBG_LOGGING, "Hook prevented writing to filter '%s' on stream '%s'", filter->name.c_str(), - stream->name.c_str()); -#endif - return true; + return true; + } } assert(w != stream->writers.end()); @@ -1175,7 +1179,7 @@ bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr c // Write takes ownership of vals. assert(writer); - writer->Write(filter->num_fields, vals); + writer->Write(std::move(rec)); #ifdef DEBUG DBG_LOG(DBG_LOGGING, "Wrote record to filter '%s' on stream '%s'", filter->name.c_str(), stream->name.c_str()); @@ -1385,35 +1389,38 @@ bool Manager::SetMaxDelayQueueSize(const EnumValPtr& id, zeek_uint_t queue_size) return true; } -threading::Value* Manager::ValToLogVal(std::optional& val, Type* ty) { +threading::Value Manager::ValToLogVal(std::optional& val, Type* ty) { if ( ! val ) - return new threading::Value(ty->Tag(), false); + return {ty->Tag(), false}; - threading::Value* lval = new threading::Value(ty->Tag()); + threading::Value lval{ty->Tag()}; - switch ( lval->type ) { + switch ( lval.type ) { case TYPE_BOOL: - case TYPE_INT: lval->val.int_val = val->AsInt(); break; + case TYPE_INT: lval.val.int_val = val->AsInt(); break; case TYPE_ENUM: { const char* s = ty->AsEnumType()->Lookup(val->AsInt()); if ( s ) { auto len = strlen(s); - lval->val.string_val.data = util::copy_string(s, len); - lval->val.string_val.length = len; + lval.val.string_val.data = util::copy_string(s, len); + lval.val.string_val.length = len; } else { auto err_msg = "enum type does not contain value:" + std::to_string(val->AsInt()); ty->Error(err_msg.c_str()); - lval->val.string_val.data = util::copy_string("", 0); - lval->val.string_val.length = 0; + lval.val.string_val.data = util::copy_string("", 0); + lval.val.string_val.length = 0; } break; } - case TYPE_COUNT: lval->val.uint_val = val->AsCount(); break; + case TYPE_COUNT: { + lval.val.uint_val = val->AsCount(); + break; + } case TYPE_PORT: { auto p = val->AsCount(); @@ -1427,26 +1434,26 @@ threading::Value* Manager::ValToLogVal(std::optional& val, Type* ty) { else if ( pm == ICMP_PORT_MASK ) pt = TRANSPORT_ICMP; - lval->val.port_val.port = p & ~PORT_SPACE_MASK; - lval->val.port_val.proto = pt; + lval.val.port_val.port = p & ~PORT_SPACE_MASK; + lval.val.port_val.proto = pt; break; } - case TYPE_SUBNET: val->AsSubNet()->Get().ConvertToThreadingValue(&lval->val.subnet_val); break; + case TYPE_SUBNET: val->AsSubNet()->Get().ConvertToThreadingValue(&lval.val.subnet_val); break; - case TYPE_ADDR: val->AsAddr()->Get().ConvertToThreadingValue(&lval->val.addr_val); break; + case TYPE_ADDR: val->AsAddr()->Get().ConvertToThreadingValue(&lval.val.addr_val); break; case TYPE_DOUBLE: case TYPE_TIME: - case TYPE_INTERVAL: lval->val.double_val = val->AsDouble(); break; + case TYPE_INTERVAL: lval.val.double_val = val->AsDouble(); break; case TYPE_STRING: { const String* s = val->AsString()->AsString(); char* buf = new char[s->Len()]; memcpy(buf, s->Bytes(), s->Len()); - lval->val.string_val.data = buf; - lval->val.string_val.length = s->Len(); + lval.val.string_val.data = buf; + lval.val.string_val.length = s->Len(); break; } @@ -1454,8 +1461,8 @@ threading::Value* Manager::ValToLogVal(std::optional& val, Type* ty) { const File* f = val->AsFile(); const char* s = f->Name(); auto len = strlen(s); - lval->val.string_val.data = util::copy_string(s, len); - lval->val.string_val.length = len; + lval.val.string_val.data = util::copy_string(s, len); + lval.val.string_val.length = len; break; } @@ -1465,8 +1472,8 @@ threading::Value* Manager::ValToLogVal(std::optional& val, Type* ty) { f->Describe(&d); const char* s = d.Description(); auto len = strlen(s); - lval->val.string_val.data = util::copy_string(s, len); - lval->val.string_val.length = len; + lval.val.string_val.data = util::copy_string(s, len); + lval.val.string_val.length = len; break; } @@ -1483,12 +1490,12 @@ threading::Value* Manager::ValToLogVal(std::optional& val, Type* ty) { auto& set_t = tbl_t->GetIndexTypes()[0]; bool is_managed = ZVal::IsManagedType(set_t); - lval->val.set_val.size = set->Length(); - lval->val.set_val.vals = new threading::Value*[lval->val.set_val.size]; + lval.val.set_val.size = set->Length(); + lval.val.set_val.vals = new threading::Value*[lval.val.set_val.size]; - for ( zeek_int_t i = 0; i < lval->val.set_val.size; i++ ) { + for ( zeek_int_t i = 0; i < lval.val.set_val.size; i++ ) { std::optional s_i = ZVal(set->Idx(i), set_t); - lval->val.set_val.vals[i] = ValToLogVal(s_i, set_t.get()); + lval.val.set_val.vals[i] = new threading::Value(ValToLogVal(s_i, set_t.get())); if ( is_managed ) ZVal::DeleteManagedType(*s_i); } @@ -1498,26 +1505,26 @@ threading::Value* Manager::ValToLogVal(std::optional& val, Type* ty) { case TYPE_VECTOR: { VectorVal* vec = val->AsVector(); - lval->val.vector_val.size = vec->Size(); - lval->val.vector_val.vals = new threading::Value*[lval->val.vector_val.size]; + lval.val.vector_val.size = vec->Size(); + lval.val.vector_val.vals = new threading::Value*[lval.val.vector_val.size]; auto& vv = vec->RawVec(); auto& vt = vec->GetType()->Yield(); - for ( zeek_int_t i = 0; i < lval->val.vector_val.size; i++ ) { - lval->val.vector_val.vals[i] = ValToLogVal(vv[i], vt.get()); + for ( zeek_int_t i = 0; i < lval.val.vector_val.size; i++ ) { + lval.val.vector_val.vals[i] = new threading::Value(ValToLogVal(vv[i], vt.get())); } break; } - default: reporter->InternalError("unsupported type %s for log_write", type_name(lval->type)); + default: reporter->InternalError("unsupported type %s for log_write", type_name(lval.type)); } return lval; } -threading::Value** Manager::RecordToFilterVals(const Stream* stream, Filter* filter, RecordVal* columns) { +detail::LogRecord Manager::RecordToLogRecord(const Stream* stream, Filter* filter, RecordVal* columns) { RecordValPtr ext_rec; if ( filter->num_ext_fields > 0 ) { @@ -1527,7 +1534,9 @@ threading::Value** Manager::RecordToFilterVals(const Stream* stream, Filter* fil ext_rec = {AdoptRef{}, res.release()->AsRecordVal()}; } - threading::Value** vals = new threading::Value*[filter->num_fields]; + // Allocate storage for all vals. + detail::LogRecord vals; + vals.reserve(filter->num_fields); for ( int i = 0; i < filter->num_fields; ++i ) { std::optional val; @@ -1535,7 +1544,7 @@ threading::Value** Manager::RecordToFilterVals(const Stream* stream, Filter* fil if ( i < filter->num_ext_fields ) { if ( ! ext_rec ) { // executing function did not return record. Send empty for all vals. - vals[i] = new threading::Value(filter->fields[i]->type, false); + vals.emplace_back(filter->fields[i]->type, false); continue; } @@ -1557,7 +1566,7 @@ threading::Value** Manager::RecordToFilterVals(const Stream* stream, Filter* fil if ( ! val ) { // Value, or any of its parents, is not set. - vals[i] = new threading::Value(filter->fields[i]->type, false); + vals.emplace_back(filter->fields[i]->type, false); break; } @@ -1565,7 +1574,7 @@ threading::Value** Manager::RecordToFilterVals(const Stream* stream, Filter* fil } if ( val ) - vals[i] = ValToLogVal(val, vt); + vals.emplace_back(ValToLogVal(val, vt)); } return vals; @@ -1688,16 +1697,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBacken return winfo->writer; } -void Manager::DeleteVals(int num_fields, threading::Value** vals) { - // Note this code is duplicated in WriterBackend::DeleteVals(). - for ( int i = 0; i < num_fields; i++ ) - delete vals[i]; - - delete[] vals; -} - -bool Manager::WriteFromRemote(EnumVal* id, EnumVal* writer, const string& path, int num_fields, - threading::Value** vals) { +bool Manager::WriteFromRemote(EnumVal* id, EnumVal* writer, const string& path, detail::LogRecord&& rec) { Stream* stream = FindStream(id); if ( ! stream ) { @@ -1707,12 +1707,10 @@ bool Manager::WriteFromRemote(EnumVal* id, EnumVal* writer, const string& path, id->Describe(&desc); DBG_LOG(DBG_LOGGING, "unknown stream %s in Manager::Write()", desc.Description()); #endif - DeleteVals(num_fields, vals); return false; } if ( ! stream->enabled ) { - DeleteVals(num_fields, vals); return true; } @@ -1725,11 +1723,10 @@ bool Manager::WriteFromRemote(EnumVal* id, EnumVal* writer, const string& path, id->Describe(&desc); DBG_LOG(DBG_LOGGING, "unknown writer %s in Manager::Write()", desc.Description()); #endif - DeleteVals(num_fields, vals); return false; } - w->second->writer->Write(num_fields, vals); + w->second->writer->Write(std::move(rec)); DBG_LOG(DBG_LOGGING, "Wrote pre-filtered record to path '%s' on stream '%s'", path.c_str(), stream->name.c_str()); diff --git a/src/logging/Manager.h b/src/logging/Manager.h index 43b13892aa..81eedb47ef 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -268,9 +268,10 @@ public: const threading::Field* const* fields); /** - * Writes out log entries that have already passed through all - * filters (and have raised any events). This is meant called for logs - * received already processed from remote. + * Writes out log entries received from remote nodes. + * + * The given record has passed through all policy filters and raised events + * on the sending node. It's only meant to be written out. * * @param stream The enum value corresponding to the log stream. * @@ -278,13 +279,11 @@ public: * * @param path The path of the target log stream to write to. * - * @param num_fields The number of log values to write. - * - * @param vals An array of log values to write, of size num_fields. - * The method takes ownership of the array. + * @param rec Representation of the log record to write. + + * @return Returns true if the record was processed successfully. */ - bool WriteFromRemote(EnumVal* stream, EnumVal* writer, const std::string& path, int num_fields, - threading::Value** vals); + bool WriteFromRemote(EnumVal* id, EnumVal* writer, const std::string& path, detail::LogRecord&& rec); /** * Announces all instantiated writers to a given Broker peer. @@ -365,9 +364,6 @@ protected: bool FinishedRotation(WriterFrontend* writer, const char* new_name, const char* old_name, double open, double close, bool success, bool terminating); - // Deletes the values as passed into Write(). - void DeleteVals(int num_fields, threading::Value** vals); - private: struct Filter; struct Stream; @@ -376,9 +372,9 @@ private: bool TraverseRecord(Stream* stream, Filter* filter, RecordType* rt, TableVal* include, TableVal* exclude, const std::string& path, const std::list& indices); - threading::Value** RecordToFilterVals(const Stream* stream, Filter* filter, RecordVal* columns); + detail::LogRecord RecordToLogRecord(const Stream* stream, Filter* filter, RecordVal* columns); + threading::Value ValToLogVal(std::optional& val, Type* ty); - threading::Value* ValToLogVal(std::optional& val, Type* ty); Stream* FindStream(EnumVal* id); void RemoveDisabledWriters(Stream* stream); void InstallRotationTimer(WriterInfo* winfo); diff --git a/src/logging/WriterBackend.cc b/src/logging/WriterBackend.cc index 30a12e74f0..e3b64c89aa 100644 --- a/src/logging/WriterBackend.cc +++ b/src/logging/WriterBackend.cc @@ -181,7 +181,7 @@ bool WriterBackend::Init(int arg_num_fields, const Field* const* arg_fields) { return true; } -bool WriterBackend::Write(int arg_num_fields, int num_writes, Value*** vals) { +bool WriterBackend::Write(int arg_num_fields, zeek::Span records) { // Double-check that the arguments match. If we get this from remote, // something might be mixed up. if ( num_fields != arg_num_fields ) { @@ -191,22 +191,20 @@ bool WriterBackend::Write(int arg_num_fields, int num_writes, Value*** vals) { Debug(DBG_LOGGING, msg); #endif - DeleteVals(num_writes, vals); DisableFrontend(); return false; } // Double-check all the types match. - for ( int j = 0; j < num_writes; j++ ) { + for ( size_t j = 0; j < records.size(); j++ ) { for ( int i = 0; i < num_fields; ++i ) { - if ( vals[j][i]->type != fields[i]->type ) { + if ( records[j][i].type != fields[i]->type ) { #ifdef DEBUG const char* msg = Fmt("Field #%d type doesn't match in WriterBackend::Write() (%d vs. %d)", i, - vals[j][i]->type, fields[i]->type); + records[j][i].type, fields[i]->type); Debug(DBG_LOGGING, msg); #endif DisableFrontend(); - DeleteVals(num_writes, vals); return false; } } @@ -215,16 +213,27 @@ bool WriterBackend::Write(int arg_num_fields, int num_writes, Value*** vals) { bool success = true; if ( ! Failed() ) { - for ( int j = 0; j < num_writes; j++ ) { - success = DoWrite(num_fields, fields, vals[j]); + // Populate a Value* array for backwards compat with plugin + // provided WriterBackend implementations that expect to + // receive a threading::Value**. + // + // We keep the raw pointer for this API, as threading::Value + // itself manages strings, sets and vectors using raw pointers, + // so this seems more consistent than mixing. + std::vector valps(num_fields); + + for ( size_t j = 0; j < records.size(); j++ ) { + auto& write_vals = records[j]; + for ( int f = 0; f < num_fields; f++ ) + valps[f] = &write_vals[f]; + + success = DoWrite(num_fields, fields, &valps[0]); if ( ! success ) break; } } - DeleteVals(num_writes, vals); - if ( ! success ) DisableFrontend(); diff --git a/src/logging/WriterBackend.h b/src/logging/WriterBackend.h index 9902d9f429..3cb09d2584 100644 --- a/src/logging/WriterBackend.h +++ b/src/logging/WriterBackend.h @@ -4,6 +4,7 @@ #pragma once +#include "zeek/Span.h" #include "zeek/logging/Component.h" #include "zeek/threading/MsgThread.h" @@ -13,6 +14,12 @@ class data; namespace zeek::logging { +namespace detail { + +using LogRecord = std::vector; + +} + class WriterFrontend; /** @@ -137,21 +144,16 @@ public: bool Init(int num_fields, const threading::Field* const* fields); /** - * Writes one log entry. + * Write a batch of log records. * * @param num_fields: The number of log fields for this stream. The * value must match what was passed to Init(). * - * @param An array of size \a num_fields with the log values. Their - * types must match with the field passed to Init(). The method - * takes ownership of \a vals.. - * - * Returns false if an error occurred, in which case the writer must - * not be used any further. + * @param records Span of LogRecord instances to write out. * * @return False if an error occurred. */ - bool Write(int num_fields, int num_writes, threading::Value*** vals); + bool Write(int arg_num_fields, zeek::Span records); /** * Sets the buffering status for the writer, assuming the writer diff --git a/src/logging/WriterFrontend.cc b/src/logging/WriterFrontend.cc index d1fae0d3f9..49f7960cd3 100644 --- a/src/logging/WriterFrontend.cc +++ b/src/logging/WriterFrontend.cc @@ -1,6 +1,7 @@ #include "zeek/logging/WriterFrontend.h" #include "zeek/RunState.h" +#include "zeek/Span.h" #include "zeek/broker/Manager.h" #include "zeek/logging/Manager.h" #include "zeek/logging/WriterBackend.h" @@ -50,18 +51,16 @@ private: class WriteMessage final : public threading::InputMessage { public: - WriteMessage(WriterBackend* backend, int num_fields, int num_writes, Value*** vals) + WriteMessage(WriterBackend* backend, int num_fields, std::vector&& records) : threading::InputMessage("Write", backend), num_fields(num_fields), - num_writes(num_writes), - vals(vals) {} + records(std::move(records)) {} - bool Process() override { return Object()->Write(num_fields, num_writes, vals); } + bool Process() override { return Object()->Write(num_fields, zeek::Span{records}); } private: int num_fields; - int num_writes; - Value*** vals; + std::vector records; }; class SetBufMessage final : public threading::InputMessage { @@ -89,7 +88,8 @@ private: // Frontend methods. WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVal* arg_stream, EnumVal* arg_writer, - bool arg_local, bool arg_remote) { + bool arg_local, bool arg_remote) + : write_buffer(detail::WriteBuffer(WRITER_BUFFER_SIZE)) { stream = arg_stream; writer = arg_writer; Ref(stream); @@ -99,8 +99,6 @@ WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVa buf = true; local = arg_local; remote = arg_remote; - write_buffer = nullptr; - write_buffer_pos = 0; info = new WriterBackend::WriterInfo(arg_info); num_fields = 0; @@ -173,37 +171,28 @@ void WriterFrontend::Init(int arg_num_fields, const Field* const* arg_fields) { } } -void WriterFrontend::Write(int arg_num_fields, Value** vals) { - if ( disabled ) { - DeleteVals(arg_num_fields, vals); - return; - } +void WriterFrontend::Write(detail::LogRecord&& arg_vals) { + std::vector vals = std::move(arg_vals); - if ( arg_num_fields != num_fields ) { - reporter->Warning("WriterFrontend %s expected %d fields in write, got %d. Skipping line.", name, num_fields, - arg_num_fields); - DeleteVals(arg_num_fields, vals); + if ( disabled ) + return; + + if ( vals.size() != static_cast(num_fields) ) { + reporter->Warning("WriterFrontend %s expected %d fields in write, got %zu. Skipping line.", name, num_fields, + vals.size()); return; } if ( remote ) { - broker_mgr->PublishLogWrite(stream, writer, info->path, num_fields, vals); + broker_mgr->PublishLogWrite(stream, writer, info->path, vals); } - if ( ! backend ) { - DeleteVals(arg_num_fields, vals); + if ( ! backend ) return; - } - if ( ! write_buffer ) { - // Need new buffer. - write_buffer = new Value**[WRITER_BUFFER_SIZE]; - write_buffer_pos = 0; - } + write_buffer.WriteRecord(std::move(vals)); - write_buffer[write_buffer_pos++] = vals; - - if ( write_buffer_pos >= WRITER_BUFFER_SIZE || ! buf || run_state::terminating ) + if ( write_buffer.Full() || ! buf || run_state::terminating ) // Buffer full (or no buffering desired or terminating). FlushWriteBuffer(); } @@ -214,16 +203,12 @@ void WriterFrontend::FlushWriteBuffer() { return; } - if ( ! write_buffer_pos ) + if ( write_buffer.Empty() ) // Nothing to do. return; if ( backend ) - backend->SendIn(new WriteMessage(backend, num_fields, write_buffer_pos, write_buffer)); - - // Clear buffer (no delete, we pass ownership to child thread.) - write_buffer = nullptr; - write_buffer_pos = 0; + backend->SendIn(new WriteMessage(backend, num_fields, std::move(write_buffer).TakeRecords())); } void WriterFrontend::SetBuf(bool enabled) { @@ -263,24 +248,6 @@ void WriterFrontend::Rotate(const char* rotated_path, double open, double close, log_mgr->FinishedRotation(this, nullptr, nullptr, 0, 0, false, terminating); } -void WriterFrontend::DeleteVals(int num_fields, Value** vals) { - // Note this code is duplicated in Manager::DeleteVals(). - for ( int i = 0; i < num_fields; i++ ) - delete vals[i]; - - delete[] vals; -} - -void WriterFrontend::CleanupWriteBuffer() { - if ( ! write_buffer || write_buffer_pos == 0 ) - return; - - for ( int j = 0; j < write_buffer_pos; j++ ) - DeleteVals(num_fields, write_buffer[j]); - - delete[] write_buffer; - write_buffer = nullptr; - write_buffer_pos = 0; -} +void WriterFrontend::CleanupWriteBuffer() { write_buffer.Clear(); } } // namespace zeek::logging diff --git a/src/logging/WriterFrontend.h b/src/logging/WriterFrontend.h index dfd984b502..af860de22c 100644 --- a/src/logging/WriterFrontend.h +++ b/src/logging/WriterFrontend.h @@ -8,6 +8,71 @@ namespace zeek::logging { class Manager; + +namespace detail { + +/** + * Implements a buffer accumulating log records in \a WriterFrontend instance + * before passing them to \a WriterBackend instances. + * + * \see WriterFrontend::Write + */ +class WriteBuffer { +public: + /** + * Constructor. + */ + explicit WriteBuffer(size_t buffer_size) : buffer_size(buffer_size) {} + + /** + * Push a record to the buffer. + * + * @param record The records vals. + */ + void WriteRecord(LogRecord&& record) { records.emplace_back(std::move(record)); } + + /** + * Moves the records out of the buffer and resets it. + * + * @return The currently buffered log records. + */ + std::vector TakeRecords() && { + auto tmp = std::move(records); + + // Re-initialize the buffer. + records.clear(); + records.reserve(buffer_size); + + return tmp; + } + + /** + * @return The size of the buffer. + */ + size_t Size() const { return records.size(); } + + /** + * @return True if buffer is empty. + */ + size_t Empty() const { return records.empty(); } + + /** + * @return True if size equals or exceeds configured buffer size. + */ + bool Full() const { return records.size() >= buffer_size; } + + /** + * Clear the records buffer. + */ + void Clear() { records.clear(); } + +private: + size_t buffer_size; + std::vector records; +}; + +} // namespace detail + /** * Bridge class between the logging::Manager and backend writer threads. The * Manager instantiates one \a WriterFrontend for each open logging filter. @@ -84,13 +149,14 @@ public: * FlushWriteBuffer(). The backend writer triggers this with a * message at every heartbeat. * - * See WriterBackend::Writer() for arguments (except that this method - * takes only a single record, not an array). The method takes - * ownership of \a vals. + * If the frontend has remote logging enabled, the record is also + * published to interested peers. * + * @param rec Representation of the log record. Callee takes ownership. + * This method must only be called from the main thread. */ - void Write(int num_fields, threading::Value** vals); + void Write(detail::LogRecord&& rec); /** * Sets the buffering state. @@ -185,8 +251,6 @@ public: protected: friend class Manager; - void DeleteVals(int num_fields, threading::Value** vals); - EnumVal* stream; EnumVal* writer; @@ -204,8 +268,7 @@ protected: // Buffer for bulk writes. static const int WRITER_BUFFER_SIZE = 1000; - int write_buffer_pos; // Position of next write in buffer. - threading::Value*** write_buffer; // Buffer of size WRITER_BUFFER_SIZE. + detail::WriteBuffer write_buffer; // Buffer of size WRITER_BUFFER_SIZE. private: void CleanupWriteBuffer();