mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Merge remote-tracking branch 'origin/topic/awelzel/less-threading-star-star'
* origin/topic/awelzel/less-threading-star-star: logging/WriterFrontend: No need for explicit CleanupWriteBuffer() logging: Switch index-assignment of raw pointers to emplace_back() broker/logging: Change threading::Value** usage std::vector instead threading/Value: Support move and copy constructors
This commit is contained in:
commit
f6fdd16b81
11 changed files with 278 additions and 191 deletions
9
NEWS
9
NEWS
|
@ -16,6 +16,15 @@ Breaking Changes
|
|||
new ``OpaqueVal::DoSerializeData`` and ``OpaqueVal::DoUnserializeData``
|
||||
methods.
|
||||
|
||||
* Certain internal methods on the broker and logging classes have been changed to
|
||||
accept std::vector<threading::Value> parameters instead of threading::Value**
|
||||
to leverage automatic memory management, reduce the number of allocations
|
||||
and use move semantics to express ownership.
|
||||
|
||||
The DoWrite() and HookLogWrite() methods which can be provided by plugins
|
||||
are not affected by this change, so we keep backwards compatibility with
|
||||
existing log writers.
|
||||
|
||||
New Functionality
|
||||
-----------------
|
||||
|
||||
|
|
|
@ -680,8 +680,8 @@ bool Manager::PublishLogCreate(EnumVal* stream, EnumVal* writer, const logging::
|
|||
return true;
|
||||
}
|
||||
|
||||
bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int num_fields,
|
||||
const threading::Value* const* vals) {
|
||||
bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, const string& path,
|
||||
const logging::detail::LogRecord& rec) {
|
||||
if ( bstate->endpoint.is_shutdown() )
|
||||
return true;
|
||||
|
||||
|
@ -709,16 +709,17 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int
|
|||
|
||||
fmt.StartWrite();
|
||||
|
||||
bool success = fmt.Write(num_fields, "num_fields");
|
||||
// Cast to int for binary compatibility.
|
||||
bool success = fmt.Write(static_cast<int>(rec.size()), "num_fields");
|
||||
|
||||
if ( ! success ) {
|
||||
reporter->Error("Failed to remotely log stream %s: num_fields serialization failed", stream_id);
|
||||
return false;
|
||||
}
|
||||
|
||||
for ( int i = 0; i < num_fields; ++i ) {
|
||||
if ( ! vals[i]->Write(&fmt) ) {
|
||||
reporter->Error("Failed to remotely log stream %s: field %d serialization failed", stream_id, i);
|
||||
for ( size_t i = 0; i < rec.size(); ++i ) {
|
||||
if ( ! rec[i].Write(&fmt) ) {
|
||||
reporter->Error("Failed to remotely log stream %s: field %zu serialization failed", stream_id, i);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -1375,16 +1376,10 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) {
|
|||
return false;
|
||||
}
|
||||
|
||||
auto vals = new threading::Value*[num_fields];
|
||||
logging::detail::LogRecord rec(num_fields);
|
||||
|
||||
for ( int i = 0; i < num_fields; ++i ) {
|
||||
vals[i] = new threading::Value;
|
||||
|
||||
if ( ! vals[i]->Read(&fmt) ) {
|
||||
for ( int j = 0; j <= i; ++j )
|
||||
delete vals[j];
|
||||
|
||||
delete[] vals;
|
||||
if ( ! rec[i].Read(&fmt) ) {
|
||||
reporter->Warning("failed to unserialize remote log field %d for stream: %s", i,
|
||||
c_str_safe(stream_id_name).c_str());
|
||||
|
||||
|
@ -1392,7 +1387,7 @@ bool Manager::ProcessMessage(std::string_view, broker::zeek::LogWrite& lw) {
|
|||
}
|
||||
}
|
||||
|
||||
log_mgr->WriteFromRemote(stream_id->AsEnumVal(), writer_id->AsEnumVal(), path, num_fields, vals);
|
||||
log_mgr->WriteFromRemote(stream_id->AsEnumVal(), writer_id->AsEnumVal(), path, std::move(rec));
|
||||
fmt.EndRead();
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -223,18 +223,16 @@ public:
|
|||
const broker::endpoint_info& peer = NoPeer);
|
||||
|
||||
/**
|
||||
* Send a log entry to any interested peers. The topic name used is
|
||||
* implicitly "bro/log/<stream-name>".
|
||||
* Send a log entry to any interested peers.
|
||||
*
|
||||
* @param stream the stream to which the log entry belongs.
|
||||
* @param writer the writer to use for outputting this log entry.
|
||||
* @param path the log path to output the log entry to.
|
||||
* @param num_vals the number of fields to log.
|
||||
* @param vals the log values to log, of size num_vals.
|
||||
* See the Broker::SendFlags record type.
|
||||
* @param rec the log record.
|
||||
* @return true if the message is sent successfully.
|
||||
*/
|
||||
bool PublishLogWrite(EnumVal* stream, EnumVal* writer, std::string path, int num_vals,
|
||||
const threading::Value* const* vals);
|
||||
bool PublishLogWrite(EnumVal* stream, EnumVal* writer, const std::string& path,
|
||||
const logging::detail::LogRecord& rec);
|
||||
|
||||
/**
|
||||
* Automatically send an event to any interested peers whenever it is
|
||||
|
|
|
@ -1153,29 +1153,34 @@ bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr c
|
|||
}
|
||||
|
||||
// Alright, can do the write now.
|
||||
auto rec = RecordToLogRecord(stream, filter, columns.get());
|
||||
|
||||
threading::Value** vals = RecordToFilterVals(stream, filter, columns.get());
|
||||
if ( zeek::plugin_mgr->HavePluginForHook(zeek::plugin::HOOK_LOG_WRITE) ) {
|
||||
// The current HookLogWrite API takes a threading::Value**.
|
||||
// Fabricate the pointer array on the fly. Mutation is allowed.
|
||||
std::vector<threading::Value*> vals;
|
||||
vals.reserve(rec.size());
|
||||
for ( auto& v : rec )
|
||||
vals.emplace_back(&v);
|
||||
|
||||
if ( ! PLUGIN_HOOK_WITH_RESULT(HOOK_LOG_WRITE,
|
||||
HookLogWrite(filter->writer->GetType()->AsEnumType()->Lookup(
|
||||
bool res =
|
||||
zeek::plugin_mgr->HookLogWrite(filter->writer->GetType()->AsEnumType()->Lookup(
|
||||
filter->writer->InternalInt()),
|
||||
filter->name, *info, filter->num_fields, filter->fields, vals),
|
||||
true) ) {
|
||||
DeleteVals(filter->num_fields, vals);
|
||||
|
||||
#ifdef DEBUG
|
||||
filter->name, *info, filter->num_fields, filter->fields, &vals[0]);
|
||||
if ( ! res ) {
|
||||
DBG_LOG(DBG_LOGGING, "Hook prevented writing to filter '%s' on stream '%s'", filter->name.c_str(),
|
||||
stream->name.c_str());
|
||||
#endif
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
assert(w != stream->writers.end());
|
||||
w->second->total_writes->Inc();
|
||||
|
||||
// Write takes ownership of vals.
|
||||
assert(writer);
|
||||
writer->Write(filter->num_fields, vals);
|
||||
writer->Write(std::move(rec));
|
||||
|
||||
#ifdef DEBUG
|
||||
DBG_LOG(DBG_LOGGING, "Wrote record to filter '%s' on stream '%s'", filter->name.c_str(), stream->name.c_str());
|
||||
|
@ -1385,35 +1390,38 @@ bool Manager::SetMaxDelayQueueSize(const EnumValPtr& id, zeek_uint_t queue_size)
|
|||
return true;
|
||||
}
|
||||
|
||||
threading::Value* Manager::ValToLogVal(std::optional<ZVal>& val, Type* ty) {
|
||||
threading::Value Manager::ValToLogVal(std::optional<ZVal>& val, Type* ty) {
|
||||
if ( ! val )
|
||||
return new threading::Value(ty->Tag(), false);
|
||||
return {ty->Tag(), false};
|
||||
|
||||
threading::Value* lval = new threading::Value(ty->Tag());
|
||||
threading::Value lval{ty->Tag()};
|
||||
|
||||
switch ( lval->type ) {
|
||||
switch ( lval.type ) {
|
||||
case TYPE_BOOL:
|
||||
case TYPE_INT: lval->val.int_val = val->AsInt(); break;
|
||||
case TYPE_INT: lval.val.int_val = val->AsInt(); break;
|
||||
|
||||
case TYPE_ENUM: {
|
||||
const char* s = ty->AsEnumType()->Lookup(val->AsInt());
|
||||
|
||||
if ( s ) {
|
||||
auto len = strlen(s);
|
||||
lval->val.string_val.data = util::copy_string(s, len);
|
||||
lval->val.string_val.length = len;
|
||||
lval.val.string_val.data = util::copy_string(s, len);
|
||||
lval.val.string_val.length = len;
|
||||
}
|
||||
|
||||
else {
|
||||
auto err_msg = "enum type does not contain value:" + std::to_string(val->AsInt());
|
||||
ty->Error(err_msg.c_str());
|
||||
lval->val.string_val.data = util::copy_string("", 0);
|
||||
lval->val.string_val.length = 0;
|
||||
lval.val.string_val.data = util::copy_string("", 0);
|
||||
lval.val.string_val.length = 0;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TYPE_COUNT: lval->val.uint_val = val->AsCount(); break;
|
||||
case TYPE_COUNT: {
|
||||
lval.val.uint_val = val->AsCount();
|
||||
break;
|
||||
}
|
||||
|
||||
case TYPE_PORT: {
|
||||
auto p = val->AsCount();
|
||||
|
@ -1427,26 +1435,26 @@ threading::Value* Manager::ValToLogVal(std::optional<ZVal>& val, Type* ty) {
|
|||
else if ( pm == ICMP_PORT_MASK )
|
||||
pt = TRANSPORT_ICMP;
|
||||
|
||||
lval->val.port_val.port = p & ~PORT_SPACE_MASK;
|
||||
lval->val.port_val.proto = pt;
|
||||
lval.val.port_val.port = p & ~PORT_SPACE_MASK;
|
||||
lval.val.port_val.proto = pt;
|
||||
break;
|
||||
}
|
||||
|
||||
case TYPE_SUBNET: val->AsSubNet()->Get().ConvertToThreadingValue(&lval->val.subnet_val); break;
|
||||
case TYPE_SUBNET: val->AsSubNet()->Get().ConvertToThreadingValue(&lval.val.subnet_val); break;
|
||||
|
||||
case TYPE_ADDR: val->AsAddr()->Get().ConvertToThreadingValue(&lval->val.addr_val); break;
|
||||
case TYPE_ADDR: val->AsAddr()->Get().ConvertToThreadingValue(&lval.val.addr_val); break;
|
||||
|
||||
case TYPE_DOUBLE:
|
||||
case TYPE_TIME:
|
||||
case TYPE_INTERVAL: lval->val.double_val = val->AsDouble(); break;
|
||||
case TYPE_INTERVAL: lval.val.double_val = val->AsDouble(); break;
|
||||
|
||||
case TYPE_STRING: {
|
||||
const String* s = val->AsString()->AsString();
|
||||
char* buf = new char[s->Len()];
|
||||
memcpy(buf, s->Bytes(), s->Len());
|
||||
|
||||
lval->val.string_val.data = buf;
|
||||
lval->val.string_val.length = s->Len();
|
||||
lval.val.string_val.data = buf;
|
||||
lval.val.string_val.length = s->Len();
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -1454,8 +1462,8 @@ threading::Value* Manager::ValToLogVal(std::optional<ZVal>& val, Type* ty) {
|
|||
const File* f = val->AsFile();
|
||||
const char* s = f->Name();
|
||||
auto len = strlen(s);
|
||||
lval->val.string_val.data = util::copy_string(s, len);
|
||||
lval->val.string_val.length = len;
|
||||
lval.val.string_val.data = util::copy_string(s, len);
|
||||
lval.val.string_val.length = len;
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -1465,8 +1473,8 @@ threading::Value* Manager::ValToLogVal(std::optional<ZVal>& val, Type* ty) {
|
|||
f->Describe(&d);
|
||||
const char* s = d.Description();
|
||||
auto len = strlen(s);
|
||||
lval->val.string_val.data = util::copy_string(s, len);
|
||||
lval->val.string_val.length = len;
|
||||
lval.val.string_val.data = util::copy_string(s, len);
|
||||
lval.val.string_val.length = len;
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -1483,12 +1491,12 @@ threading::Value* Manager::ValToLogVal(std::optional<ZVal>& val, Type* ty) {
|
|||
auto& set_t = tbl_t->GetIndexTypes()[0];
|
||||
bool is_managed = ZVal::IsManagedType(set_t);
|
||||
|
||||
lval->val.set_val.size = set->Length();
|
||||
lval->val.set_val.vals = new threading::Value*[lval->val.set_val.size];
|
||||
lval.val.set_val.size = set->Length();
|
||||
lval.val.set_val.vals = new threading::Value*[lval.val.set_val.size];
|
||||
|
||||
for ( zeek_int_t i = 0; i < lval->val.set_val.size; i++ ) {
|
||||
for ( zeek_int_t i = 0; i < lval.val.set_val.size; i++ ) {
|
||||
std::optional<ZVal> s_i = ZVal(set->Idx(i), set_t);
|
||||
lval->val.set_val.vals[i] = ValToLogVal(s_i, set_t.get());
|
||||
lval.val.set_val.vals[i] = new threading::Value(ValToLogVal(s_i, set_t.get()));
|
||||
if ( is_managed )
|
||||
ZVal::DeleteManagedType(*s_i);
|
||||
}
|
||||
|
@ -1498,26 +1506,26 @@ threading::Value* Manager::ValToLogVal(std::optional<ZVal>& val, Type* ty) {
|
|||
|
||||
case TYPE_VECTOR: {
|
||||
VectorVal* vec = val->AsVector();
|
||||
lval->val.vector_val.size = vec->Size();
|
||||
lval->val.vector_val.vals = new threading::Value*[lval->val.vector_val.size];
|
||||
lval.val.vector_val.size = vec->Size();
|
||||
lval.val.vector_val.vals = new threading::Value*[lval.val.vector_val.size];
|
||||
|
||||
auto& vv = vec->RawVec();
|
||||
auto& vt = vec->GetType()->Yield();
|
||||
|
||||
for ( zeek_int_t i = 0; i < lval->val.vector_val.size; i++ ) {
|
||||
lval->val.vector_val.vals[i] = ValToLogVal(vv[i], vt.get());
|
||||
for ( zeek_int_t i = 0; i < lval.val.vector_val.size; i++ ) {
|
||||
lval.val.vector_val.vals[i] = new threading::Value(ValToLogVal(vv[i], vt.get()));
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
default: reporter->InternalError("unsupported type %s for log_write", type_name(lval->type));
|
||||
default: reporter->InternalError("unsupported type %s for log_write", type_name(lval.type));
|
||||
}
|
||||
|
||||
return lval;
|
||||
}
|
||||
|
||||
threading::Value** Manager::RecordToFilterVals(const Stream* stream, Filter* filter, RecordVal* columns) {
|
||||
detail::LogRecord Manager::RecordToLogRecord(const Stream* stream, Filter* filter, RecordVal* columns) {
|
||||
RecordValPtr ext_rec;
|
||||
|
||||
if ( filter->num_ext_fields > 0 ) {
|
||||
|
@ -1527,7 +1535,9 @@ threading::Value** Manager::RecordToFilterVals(const Stream* stream, Filter* fil
|
|||
ext_rec = {AdoptRef{}, res.release()->AsRecordVal()};
|
||||
}
|
||||
|
||||
threading::Value** vals = new threading::Value*[filter->num_fields];
|
||||
// Allocate storage for all vals.
|
||||
detail::LogRecord vals;
|
||||
vals.reserve(filter->num_fields);
|
||||
|
||||
for ( int i = 0; i < filter->num_fields; ++i ) {
|
||||
std::optional<ZVal> val;
|
||||
|
@ -1535,7 +1545,7 @@ threading::Value** Manager::RecordToFilterVals(const Stream* stream, Filter* fil
|
|||
if ( i < filter->num_ext_fields ) {
|
||||
if ( ! ext_rec ) {
|
||||
// executing function did not return record. Send empty for all vals.
|
||||
vals[i] = new threading::Value(filter->fields[i]->type, false);
|
||||
vals.emplace_back(filter->fields[i]->type, false);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -1557,7 +1567,7 @@ threading::Value** Manager::RecordToFilterVals(const Stream* stream, Filter* fil
|
|||
|
||||
if ( ! val ) {
|
||||
// Value, or any of its parents, is not set.
|
||||
vals[i] = new threading::Value(filter->fields[i]->type, false);
|
||||
vals.emplace_back(filter->fields[i]->type, false);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -1565,7 +1575,7 @@ threading::Value** Manager::RecordToFilterVals(const Stream* stream, Filter* fil
|
|||
}
|
||||
|
||||
if ( val )
|
||||
vals[i] = ValToLogVal(val, vt);
|
||||
vals.emplace_back(ValToLogVal(val, vt));
|
||||
}
|
||||
|
||||
return vals;
|
||||
|
@ -1688,16 +1698,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBacken
|
|||
return winfo->writer;
|
||||
}
|
||||
|
||||
void Manager::DeleteVals(int num_fields, threading::Value** vals) {
|
||||
// Note this code is duplicated in WriterBackend::DeleteVals().
|
||||
for ( int i = 0; i < num_fields; i++ )
|
||||
delete vals[i];
|
||||
|
||||
delete[] vals;
|
||||
}
|
||||
|
||||
bool Manager::WriteFromRemote(EnumVal* id, EnumVal* writer, const string& path, int num_fields,
|
||||
threading::Value** vals) {
|
||||
bool Manager::WriteFromRemote(EnumVal* id, EnumVal* writer, const string& path, detail::LogRecord&& rec) {
|
||||
Stream* stream = FindStream(id);
|
||||
|
||||
if ( ! stream ) {
|
||||
|
@ -1707,12 +1708,10 @@ bool Manager::WriteFromRemote(EnumVal* id, EnumVal* writer, const string& path,
|
|||
id->Describe(&desc);
|
||||
DBG_LOG(DBG_LOGGING, "unknown stream %s in Manager::Write()", desc.Description());
|
||||
#endif
|
||||
DeleteVals(num_fields, vals);
|
||||
return false;
|
||||
}
|
||||
|
||||
if ( ! stream->enabled ) {
|
||||
DeleteVals(num_fields, vals);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -1725,11 +1724,10 @@ bool Manager::WriteFromRemote(EnumVal* id, EnumVal* writer, const string& path,
|
|||
id->Describe(&desc);
|
||||
DBG_LOG(DBG_LOGGING, "unknown writer %s in Manager::Write()", desc.Description());
|
||||
#endif
|
||||
DeleteVals(num_fields, vals);
|
||||
return false;
|
||||
}
|
||||
|
||||
w->second->writer->Write(num_fields, vals);
|
||||
w->second->writer->Write(std::move(rec));
|
||||
|
||||
DBG_LOG(DBG_LOGGING, "Wrote pre-filtered record to path '%s' on stream '%s'", path.c_str(), stream->name.c_str());
|
||||
|
||||
|
|
|
@ -268,9 +268,10 @@ public:
|
|||
const threading::Field* const* fields);
|
||||
|
||||
/**
|
||||
* Writes out log entries that have already passed through all
|
||||
* filters (and have raised any events). This is meant called for logs
|
||||
* received already processed from remote.
|
||||
* Writes out log entries received from remote nodes.
|
||||
*
|
||||
* The given record has passed through all policy filters and raised events
|
||||
* on the sending node. It's only meant to be written out.
|
||||
*
|
||||
* @param stream The enum value corresponding to the log stream.
|
||||
*
|
||||
|
@ -278,13 +279,11 @@ public:
|
|||
*
|
||||
* @param path The path of the target log stream to write to.
|
||||
*
|
||||
* @param num_fields The number of log values to write.
|
||||
*
|
||||
* @param vals An array of log values to write, of size num_fields.
|
||||
* The method takes ownership of the array.
|
||||
* @param rec Representation of the log record to write.
|
||||
|
||||
* @return Returns true if the record was processed successfully.
|
||||
*/
|
||||
bool WriteFromRemote(EnumVal* stream, EnumVal* writer, const std::string& path, int num_fields,
|
||||
threading::Value** vals);
|
||||
bool WriteFromRemote(EnumVal* id, EnumVal* writer, const std::string& path, detail::LogRecord&& rec);
|
||||
|
||||
/**
|
||||
* Announces all instantiated writers to a given Broker peer.
|
||||
|
@ -365,9 +364,6 @@ protected:
|
|||
bool FinishedRotation(WriterFrontend* writer, const char* new_name, const char* old_name, double open, double close,
|
||||
bool success, bool terminating);
|
||||
|
||||
// Deletes the values as passed into Write().
|
||||
void DeleteVals(int num_fields, threading::Value** vals);
|
||||
|
||||
private:
|
||||
struct Filter;
|
||||
struct Stream;
|
||||
|
@ -376,9 +372,9 @@ private:
|
|||
bool TraverseRecord(Stream* stream, Filter* filter, RecordType* rt, TableVal* include, TableVal* exclude,
|
||||
const std::string& path, const std::list<int>& indices);
|
||||
|
||||
threading::Value** RecordToFilterVals(const Stream* stream, Filter* filter, RecordVal* columns);
|
||||
detail::LogRecord RecordToLogRecord(const Stream* stream, Filter* filter, RecordVal* columns);
|
||||
threading::Value ValToLogVal(std::optional<ZVal>& val, Type* ty);
|
||||
|
||||
threading::Value* ValToLogVal(std::optional<ZVal>& val, Type* ty);
|
||||
Stream* FindStream(EnumVal* id);
|
||||
void RemoveDisabledWriters(Stream* stream);
|
||||
void InstallRotationTimer(WriterInfo* winfo);
|
||||
|
|
|
@ -181,7 +181,7 @@ bool WriterBackend::Init(int arg_num_fields, const Field* const* arg_fields) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool WriterBackend::Write(int arg_num_fields, int num_writes, Value*** vals) {
|
||||
bool WriterBackend::Write(int arg_num_fields, zeek::Span<detail::LogRecord> records) {
|
||||
// Double-check that the arguments match. If we get this from remote,
|
||||
// something might be mixed up.
|
||||
if ( num_fields != arg_num_fields ) {
|
||||
|
@ -191,22 +191,20 @@ bool WriterBackend::Write(int arg_num_fields, int num_writes, Value*** vals) {
|
|||
Debug(DBG_LOGGING, msg);
|
||||
#endif
|
||||
|
||||
DeleteVals(num_writes, vals);
|
||||
DisableFrontend();
|
||||
return false;
|
||||
}
|
||||
|
||||
// Double-check all the types match.
|
||||
for ( int j = 0; j < num_writes; j++ ) {
|
||||
for ( size_t j = 0; j < records.size(); j++ ) {
|
||||
for ( int i = 0; i < num_fields; ++i ) {
|
||||
if ( vals[j][i]->type != fields[i]->type ) {
|
||||
if ( records[j][i].type != fields[i]->type ) {
|
||||
#ifdef DEBUG
|
||||
const char* msg = Fmt("Field #%d type doesn't match in WriterBackend::Write() (%d vs. %d)", i,
|
||||
vals[j][i]->type, fields[i]->type);
|
||||
records[j][i].type, fields[i]->type);
|
||||
Debug(DBG_LOGGING, msg);
|
||||
#endif
|
||||
DisableFrontend();
|
||||
DeleteVals(num_writes, vals);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -215,16 +213,30 @@ bool WriterBackend::Write(int arg_num_fields, int num_writes, Value*** vals) {
|
|||
bool success = true;
|
||||
|
||||
if ( ! Failed() ) {
|
||||
for ( int j = 0; j < num_writes; j++ ) {
|
||||
success = DoWrite(num_fields, fields, vals[j]);
|
||||
// Populate a Value* array for backwards compat with plugin
|
||||
// provided WriterBackend implementations that expect to
|
||||
// receive a threading::Value**.
|
||||
//
|
||||
// We keep the raw pointer for this API, as threading::Value
|
||||
// itself manages strings, sets and vectors using raw pointers,
|
||||
// so this is more consistent than mixing.
|
||||
std::vector<Value*> valps;
|
||||
valps.reserve(num_fields);
|
||||
|
||||
for ( size_t j = 0; j < records.size(); j++ ) {
|
||||
auto& write_vals = records[j];
|
||||
for ( int f = 0; f < num_fields; f++ )
|
||||
valps.emplace_back(&write_vals[f]);
|
||||
|
||||
success = DoWrite(num_fields, fields, &valps[0]);
|
||||
|
||||
valps.clear();
|
||||
|
||||
if ( ! success )
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
DeleteVals(num_writes, vals);
|
||||
|
||||
if ( ! success )
|
||||
DisableFrontend();
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include "zeek/Span.h"
|
||||
#include "zeek/logging/Component.h"
|
||||
#include "zeek/threading/MsgThread.h"
|
||||
|
||||
|
@ -13,6 +14,12 @@ class data;
|
|||
|
||||
namespace zeek::logging {
|
||||
|
||||
namespace detail {
|
||||
|
||||
using LogRecord = std::vector<threading::Value>;
|
||||
|
||||
}
|
||||
|
||||
class WriterFrontend;
|
||||
|
||||
/**
|
||||
|
@ -137,21 +144,16 @@ public:
|
|||
bool Init(int num_fields, const threading::Field* const* fields);
|
||||
|
||||
/**
|
||||
* Writes one log entry.
|
||||
* Write a batch of log records.
|
||||
*
|
||||
* @param num_fields: The number of log fields for this stream. The
|
||||
* value must match what was passed to Init().
|
||||
*
|
||||
* @param An array of size \a num_fields with the log values. Their
|
||||
* types must match with the field passed to Init(). The method
|
||||
* takes ownership of \a vals..
|
||||
*
|
||||
* Returns false if an error occurred, in which case the writer must
|
||||
* not be used any further.
|
||||
* @param records Span of LogRecord instances to write out.
|
||||
*
|
||||
* @return False if an error occurred.
|
||||
*/
|
||||
bool Write(int num_fields, int num_writes, threading::Value*** vals);
|
||||
bool Write(int arg_num_fields, zeek::Span<detail::LogRecord> records);
|
||||
|
||||
/**
|
||||
* Sets the buffering status for the writer, assuming the writer
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#include "zeek/logging/WriterFrontend.h"
|
||||
|
||||
#include "zeek/RunState.h"
|
||||
#include "zeek/Span.h"
|
||||
#include "zeek/broker/Manager.h"
|
||||
#include "zeek/logging/Manager.h"
|
||||
#include "zeek/logging/WriterBackend.h"
|
||||
|
@ -50,18 +51,16 @@ private:
|
|||
|
||||
class WriteMessage final : public threading::InputMessage<WriterBackend> {
|
||||
public:
|
||||
WriteMessage(WriterBackend* backend, int num_fields, int num_writes, Value*** vals)
|
||||
WriteMessage(WriterBackend* backend, int num_fields, std::vector<detail::LogRecord>&& records)
|
||||
: threading::InputMessage<WriterBackend>("Write", backend),
|
||||
num_fields(num_fields),
|
||||
num_writes(num_writes),
|
||||
vals(vals) {}
|
||||
records(std::move(records)) {}
|
||||
|
||||
bool Process() override { return Object()->Write(num_fields, num_writes, vals); }
|
||||
bool Process() override { return Object()->Write(num_fields, zeek::Span{records}); }
|
||||
|
||||
private:
|
||||
int num_fields;
|
||||
int num_writes;
|
||||
Value*** vals;
|
||||
std::vector<detail::LogRecord> records;
|
||||
};
|
||||
|
||||
class SetBufMessage final : public threading::InputMessage<WriterBackend> {
|
||||
|
@ -89,7 +88,8 @@ private:
|
|||
// Frontend methods.
|
||||
|
||||
WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVal* arg_stream, EnumVal* arg_writer,
|
||||
bool arg_local, bool arg_remote) {
|
||||
bool arg_local, bool arg_remote)
|
||||
: write_buffer(detail::WriteBuffer(WRITER_BUFFER_SIZE)) {
|
||||
stream = arg_stream;
|
||||
writer = arg_writer;
|
||||
Ref(stream);
|
||||
|
@ -99,8 +99,6 @@ WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVa
|
|||
buf = true;
|
||||
local = arg_local;
|
||||
remote = arg_remote;
|
||||
write_buffer = nullptr;
|
||||
write_buffer_pos = 0;
|
||||
info = new WriterBackend::WriterInfo(arg_info);
|
||||
|
||||
num_fields = 0;
|
||||
|
@ -134,7 +132,6 @@ WriterFrontend::~WriterFrontend() {
|
|||
|
||||
void WriterFrontend::Stop() {
|
||||
if ( disabled ) {
|
||||
CleanupWriteBuffer();
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -173,57 +170,42 @@ void WriterFrontend::Init(int arg_num_fields, const Field* const* arg_fields) {
|
|||
}
|
||||
}
|
||||
|
||||
void WriterFrontend::Write(int arg_num_fields, Value** vals) {
|
||||
if ( disabled ) {
|
||||
DeleteVals(arg_num_fields, vals);
|
||||
return;
|
||||
}
|
||||
void WriterFrontend::Write(detail::LogRecord&& arg_vals) {
|
||||
std::vector<threading::Value> vals = std::move(arg_vals);
|
||||
|
||||
if ( arg_num_fields != num_fields ) {
|
||||
reporter->Warning("WriterFrontend %s expected %d fields in write, got %d. Skipping line.", name, num_fields,
|
||||
arg_num_fields);
|
||||
DeleteVals(arg_num_fields, vals);
|
||||
if ( disabled )
|
||||
return;
|
||||
|
||||
if ( vals.size() != static_cast<size_t>(num_fields) ) {
|
||||
reporter->Warning("WriterFrontend %s expected %d fields in write, got %zu. Skipping line.", name, num_fields,
|
||||
vals.size());
|
||||
return;
|
||||
}
|
||||
|
||||
if ( remote ) {
|
||||
broker_mgr->PublishLogWrite(stream, writer, info->path, num_fields, vals);
|
||||
broker_mgr->PublishLogWrite(stream, writer, info->path, vals);
|
||||
}
|
||||
|
||||
if ( ! backend ) {
|
||||
DeleteVals(arg_num_fields, vals);
|
||||
if ( ! backend )
|
||||
return;
|
||||
}
|
||||
|
||||
if ( ! write_buffer ) {
|
||||
// Need new buffer.
|
||||
write_buffer = new Value**[WRITER_BUFFER_SIZE];
|
||||
write_buffer_pos = 0;
|
||||
}
|
||||
write_buffer.WriteRecord(std::move(vals));
|
||||
|
||||
write_buffer[write_buffer_pos++] = vals;
|
||||
|
||||
if ( write_buffer_pos >= WRITER_BUFFER_SIZE || ! buf || run_state::terminating )
|
||||
if ( write_buffer.Full() || ! buf || run_state::terminating )
|
||||
// Buffer full (or no buffering desired or terminating).
|
||||
FlushWriteBuffer();
|
||||
}
|
||||
|
||||
void WriterFrontend::FlushWriteBuffer() {
|
||||
if ( disabled ) {
|
||||
CleanupWriteBuffer();
|
||||
if ( disabled )
|
||||
return;
|
||||
}
|
||||
|
||||
if ( ! write_buffer_pos )
|
||||
if ( write_buffer.Empty() )
|
||||
// Nothing to do.
|
||||
return;
|
||||
|
||||
if ( backend )
|
||||
backend->SendIn(new WriteMessage(backend, num_fields, write_buffer_pos, write_buffer));
|
||||
|
||||
// Clear buffer (no delete, we pass ownership to child thread.)
|
||||
write_buffer = nullptr;
|
||||
write_buffer_pos = 0;
|
||||
backend->SendIn(new WriteMessage(backend, num_fields, std::move(write_buffer).TakeRecords()));
|
||||
}
|
||||
|
||||
void WriterFrontend::SetBuf(bool enabled) {
|
||||
|
@ -263,24 +245,4 @@ void WriterFrontend::Rotate(const char* rotated_path, double open, double close,
|
|||
log_mgr->FinishedRotation(this, nullptr, nullptr, 0, 0, false, terminating);
|
||||
}
|
||||
|
||||
void WriterFrontend::DeleteVals(int num_fields, Value** vals) {
|
||||
// Note this code is duplicated in Manager::DeleteVals().
|
||||
for ( int i = 0; i < num_fields; i++ )
|
||||
delete vals[i];
|
||||
|
||||
delete[] vals;
|
||||
}
|
||||
|
||||
void WriterFrontend::CleanupWriteBuffer() {
|
||||
if ( ! write_buffer || write_buffer_pos == 0 )
|
||||
return;
|
||||
|
||||
for ( int j = 0; j < write_buffer_pos; j++ )
|
||||
DeleteVals(num_fields, write_buffer[j]);
|
||||
|
||||
delete[] write_buffer;
|
||||
write_buffer = nullptr;
|
||||
write_buffer_pos = 0;
|
||||
}
|
||||
|
||||
} // namespace zeek::logging
|
||||
|
|
|
@ -8,6 +8,66 @@ namespace zeek::logging {
|
|||
|
||||
class Manager;
|
||||
|
||||
|
||||
namespace detail {
|
||||
|
||||
/**
|
||||
* Implements a buffer accumulating log records in \a WriterFrontend instance
|
||||
* before passing them to \a WriterBackend instances.
|
||||
*
|
||||
* \see WriterFrontend::Write
|
||||
*/
|
||||
class WriteBuffer {
|
||||
public:
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
explicit WriteBuffer(size_t buffer_size) : buffer_size(buffer_size) {}
|
||||
|
||||
/**
|
||||
* Push a record to the buffer.
|
||||
*
|
||||
* @param record The records vals.
|
||||
*/
|
||||
void WriteRecord(LogRecord&& record) { records.emplace_back(std::move(record)); }
|
||||
|
||||
/**
|
||||
* Moves the records out of the buffer and resets it.
|
||||
*
|
||||
* @return The currently buffered log records.
|
||||
*/
|
||||
std::vector<LogRecord> TakeRecords() && {
|
||||
auto tmp = std::move(records);
|
||||
|
||||
// Re-initialize the buffer.
|
||||
records.clear();
|
||||
records.reserve(buffer_size);
|
||||
|
||||
return tmp;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The size of the buffer.
|
||||
*/
|
||||
size_t Size() const { return records.size(); }
|
||||
|
||||
/**
|
||||
* @return True if buffer is empty.
|
||||
*/
|
||||
size_t Empty() const { return records.empty(); }
|
||||
|
||||
/**
|
||||
* @return True if size equals or exceeds configured buffer size.
|
||||
*/
|
||||
bool Full() const { return records.size() >= buffer_size; }
|
||||
|
||||
private:
|
||||
size_t buffer_size;
|
||||
std::vector<LogRecord> records;
|
||||
};
|
||||
|
||||
} // namespace detail
|
||||
|
||||
/**
|
||||
* Bridge class between the logging::Manager and backend writer threads. The
|
||||
* Manager instantiates one \a WriterFrontend for each open logging filter.
|
||||
|
@ -84,13 +144,14 @@ public:
|
|||
* FlushWriteBuffer(). The backend writer triggers this with a
|
||||
* message at every heartbeat.
|
||||
*
|
||||
* See WriterBackend::Writer() for arguments (except that this method
|
||||
* takes only a single record, not an array). The method takes
|
||||
* ownership of \a vals.
|
||||
* If the frontend has remote logging enabled, the record is also
|
||||
* published to interested peers.
|
||||
*
|
||||
* @param rec Representation of the log record. Callee takes ownership.
|
||||
|
||||
* This method must only be called from the main thread.
|
||||
*/
|
||||
void Write(int num_fields, threading::Value** vals);
|
||||
void Write(detail::LogRecord&& rec);
|
||||
|
||||
/**
|
||||
* Sets the buffering state.
|
||||
|
@ -185,8 +246,6 @@ public:
|
|||
protected:
|
||||
friend class Manager;
|
||||
|
||||
void DeleteVals(int num_fields, threading::Value** vals);
|
||||
|
||||
EnumVal* stream;
|
||||
EnumVal* writer;
|
||||
|
||||
|
@ -204,11 +263,7 @@ protected:
|
|||
|
||||
// Buffer for bulk writes.
|
||||
static const int WRITER_BUFFER_SIZE = 1000;
|
||||
int write_buffer_pos; // Position of next write in buffer.
|
||||
threading::Value*** write_buffer; // Buffer of size WRITER_BUFFER_SIZE.
|
||||
|
||||
private:
|
||||
void CleanupWriteBuffer();
|
||||
detail::WriteBuffer write_buffer; // Buffer of size WRITER_BUFFER_SIZE.
|
||||
};
|
||||
|
||||
} // namespace zeek::logging
|
||||
|
|
|
@ -82,6 +82,57 @@ std::string Field::TypeName() const {
|
|||
return n;
|
||||
}
|
||||
|
||||
Value::Value(const Value& other) {
|
||||
type = other.type;
|
||||
subtype = other.subtype;
|
||||
present = other.present;
|
||||
|
||||
switch ( other.type ) {
|
||||
case TYPE_ENUM:
|
||||
case TYPE_STRING:
|
||||
case TYPE_FILE:
|
||||
case TYPE_FUNC: {
|
||||
val.string_val.data = util::copy_string(other.val.string_val.data, other.val.string_val.length);
|
||||
break;
|
||||
}
|
||||
|
||||
case TYPE_PATTERN: {
|
||||
val.pattern_text_val = util::copy_string(val.pattern_text_val);
|
||||
break;
|
||||
}
|
||||
case TYPE_TABLE: {
|
||||
val.set_val.vals = new Value*[other.val.set_val.size];
|
||||
for ( zeek_int_t i = 0; i < other.val.set_val.size; i++ )
|
||||
val.set_val.vals[i] = new Value(*other.val.set_val.vals[i]);
|
||||
break;
|
||||
}
|
||||
case TYPE_VECTOR: {
|
||||
val.vector_val.vals = new Value*[other.val.vector_val.size];
|
||||
for ( zeek_int_t i = 0; i < other.val.vector_val.size; i++ )
|
||||
val.vector_val.vals[i] = new Value(*other.val.vector_val.vals[i]);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
// Deal with simple/atomic types.
|
||||
val = other.val;
|
||||
break;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Value::Value(Value&& other) noexcept {
|
||||
present = other.present;
|
||||
type = other.type;
|
||||
subtype = other.type;
|
||||
line_number = other.line_number;
|
||||
|
||||
val = other.val; // take ownership.
|
||||
|
||||
other.val = _val();
|
||||
other.line_number = -1;
|
||||
other.present = false;
|
||||
}
|
||||
|
||||
Value::~Value() {
|
||||
if ( ! present )
|
||||
return;
|
||||
|
|
|
@ -106,7 +106,7 @@ private:
|
|||
struct Value {
|
||||
TypeTag type; //! The type of the value.
|
||||
TypeTag subtype; //! Inner type for sets and vectors.
|
||||
bool present; //! False for optional record fields that are not set.
|
||||
bool present = false; //! False for optional record fields that are not set.
|
||||
|
||||
struct set_t {
|
||||
zeek_int_t size;
|
||||
|
@ -185,6 +185,16 @@ struct Value {
|
|||
Value(TypeTag arg_type, TypeTag arg_subtype, bool arg_present = true)
|
||||
: type(arg_type), subtype(arg_subtype), present(arg_present) {}
|
||||
|
||||
/**
|
||||
* Copy constructor.
|
||||
*/
|
||||
Value(const Value& other);
|
||||
|
||||
/**
|
||||
* Move constructor.
|
||||
*/
|
||||
Value(Value&& other) noexcept;
|
||||
|
||||
/**
|
||||
* Destructor.
|
||||
*/
|
||||
|
@ -241,7 +251,6 @@ struct Value {
|
|||
|
||||
private:
|
||||
friend class IPAddr;
|
||||
Value(const Value& other) = delete;
|
||||
|
||||
// For values read by the input framework, this can represent the line number
|
||||
// containing this value. Used by the Ascii reader primarily.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue