From 4600ca41f656ecc1d5ef7aa20f34e35453b0be46 Mon Sep 17 00:00:00 2001 From: Vern Paxson Date: Mon, 10 Apr 2023 11:43:19 -0700 Subject: [PATCH] logging speedup by switching to raw record access --- src/Val.h | 8 ++++ src/logging/Manager.cc | 83 ++++++++++++++++++++++++++++-------------- src/logging/Manager.h | 2 +- 3 files changed, 65 insertions(+), 28 deletions(-) diff --git a/src/Val.h b/src/Val.h index e73536f127..01e4a201ee 100644 --- a/src/Val.h +++ b/src/Val.h @@ -52,9 +52,15 @@ class HashKey; class ValTrace; class ZBody; +class CPPRuntime; } // namespace detail +namespace logging + { +class Manager; + } + namespace run_state { @@ -1403,8 +1409,10 @@ public: static void DoneParsing(); protected: + friend class zeek::logging::Manager; friend class zeek::detail::ValTrace; friend class zeek::detail::ZBody; + friend class zeek::detail::CPPRuntime; RecordValPtr DoCoerceTo(RecordTypePtr other, bool allow_orphaning) const; diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 2b39aa4ff2..f4d55086df 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -973,11 +973,8 @@ bool Manager::Write(EnumVal* id, RecordVal* columns_arg) return true; } -threading::Value* Manager::ValToLogVal(Val* val, Type* ty) +threading::Value* Manager::ValToLogVal(std::optional& val, Type* ty) { - if ( ! ty ) - ty = val->GetType().get(); - if ( ! val ) return new threading::Value(ty->Tag(), false); @@ -987,12 +984,12 @@ threading::Value* Manager::ValToLogVal(Val* val, Type* ty) { case TYPE_BOOL: case TYPE_INT: - lval->val.int_val = val->InternalInt(); + lval->val.int_val = val->AsInt(); break; case TYPE_ENUM: { - const char* s = val->GetType()->AsEnumType()->Lookup(val->InternalInt()); + const char* s = ty->AsEnumType()->Lookup(val->AsInt()); if ( s ) { @@ -1002,7 +999,8 @@ threading::Value* Manager::ValToLogVal(Val* val, Type* ty) else { - val->GetType()->Error("enum type does not contain value", val); + auto err_msg = "enum type does not contain value:" + std::to_string(val->AsInt()); + ty->Error(err_msg.c_str()); lval->val.string_val.data = util::copy_string(""); lval->val.string_val.length = 0; } @@ -1010,31 +1008,44 @@ threading::Value* Manager::ValToLogVal(Val* val, Type* ty) } case TYPE_COUNT: - lval->val.uint_val = val->InternalUnsigned(); + lval->val.uint_val = val->AsCount(); break; case TYPE_PORT: - lval->val.port_val.port = val->AsPortVal()->Port(); - lval->val.port_val.proto = val->AsPortVal()->PortType(); + { + auto p = val->AsCount(); + + auto pt = TRANSPORT_UNKNOWN; + auto pm = p & PORT_SPACE_MASK; + if ( pm == TCP_PORT_MASK ) + pt = TRANSPORT_TCP; + else if ( pm == UDP_PORT_MASK ) + pt = TRANSPORT_UDP; + else if ( pm == ICMP_PORT_MASK ) + pt = TRANSPORT_ICMP; + + lval->val.port_val.port = p & ~PORT_SPACE_MASK; + lval->val.port_val.proto = pt; break; + } case TYPE_SUBNET: - val->AsSubNet().ConvertToThreadingValue(&lval->val.subnet_val); + val->AsSubNet()->Get().ConvertToThreadingValue(&lval->val.subnet_val); break; case TYPE_ADDR: - val->AsAddr().ConvertToThreadingValue(&lval->val.addr_val); + val->AsAddr()->Get().ConvertToThreadingValue(&lval->val.addr_val); break; case TYPE_DOUBLE: case TYPE_TIME: case TYPE_INTERVAL: - lval->val.double_val = val->InternalDouble(); + lval->val.double_val = val->AsDouble(); break; case TYPE_STRING: { - const String* s = val->AsString(); + const String* s = val->AsString()->AsString(); char* buf = new char[s->Len()]; memcpy(buf, s->Bytes(), s->Len()); @@ -1065,31 +1076,44 @@ threading::Value* Manager::ValToLogVal(Val* val, Type* ty) case TYPE_TABLE: { - auto set = val->AsTableVal()->ToPureListVal(); + auto tbl = val->AsTable(); + auto set = tbl->ToPureListVal(); + if ( ! set ) // ToPureListVal has reported an internal warning // already. Just keep going by making something up. set = make_intrusive(TYPE_INT); + auto tbl_t = cast_intrusive(tbl->GetType()); + auto& set_t = tbl_t->GetIndexTypes()[0]; + bool is_managed = ZVal::IsManagedType(set_t); + lval->val.set_val.size = set->Length(); lval->val.set_val.vals = new threading::Value*[lval->val.set_val.size]; for ( zeek_int_t i = 0; i < lval->val.set_val.size; i++ ) - lval->val.set_val.vals[i] = ValToLogVal(set->Idx(i).get()); + { + std::optional s_i = ZVal(set->Idx(i), set_t); + lval->val.set_val.vals[i] = ValToLogVal(s_i, set_t.get()); + if ( is_managed ) + ZVal::DeleteManagedType(*s_i); + } break; } case TYPE_VECTOR: { - VectorVal* vec = val->AsVectorVal(); + VectorVal* vec = val->AsVector(); lval->val.vector_val.size = vec->Size(); lval->val.vector_val.vals = new threading::Value*[lval->val.vector_val.size]; + auto& vv = vec->RawVec(); + auto& vt = vec->GetType()->Yield(); + for ( zeek_int_t i = 0; i < lval->val.vector_val.size; i++ ) { - lval->val.vector_val.vals[i] = ValToLogVal(vec->ValAt(i).get(), - vec->GetType()->Yield().get()); + lval->val.vector_val.vals[i] = ValToLogVal((*vv)[i], vt.get()); } break; @@ -1118,7 +1142,8 @@ threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter, R for ( int i = 0; i < filter->num_fields; ++i ) { - Val* val; + std::optional val; + Type* vt; if ( i < filter->num_ext_fields ) { if ( ! ext_rec ) @@ -1128,21 +1153,23 @@ threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter, R continue; } - val = ext_rec.get(); + val = ZVal(ext_rec.get()); + vt = ext_rec->GetType().get(); } else - val = columns; + { + val = ZVal(columns); + vt = columns->GetType().get(); + } // For each field, first find the right value, which can // potentially be nested inside other records. list& indices = filter->indices[i]; - ValPtr val_ptr; - for ( list::iterator j = indices.begin(); j != indices.end(); ++j ) { - val_ptr = val->AsRecordVal()->GetField(*j); - val = val_ptr.get(); + auto vr = val->AsRecord(); + val = vr->RawOptField(*j); if ( ! val ) { @@ -1150,10 +1177,12 @@ threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter, R vals[i] = new threading::Value(filter->fields[i]->type, false); break; } + + vt = cast_intrusive(vr->GetType())->GetFieldType(*j).get(); } if ( val ) - vals[i] = ValToLogVal(val); + vals[i] = ValToLogVal(val, vt); } return vals; diff --git a/src/logging/Manager.h b/src/logging/Manager.h index 6f926242f3..cb2e554d5c 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -291,7 +291,7 @@ private: threading::Value** RecordToFilterVals(Stream* stream, Filter* filter, RecordVal* columns); - threading::Value* ValToLogVal(Val* val, Type* ty = nullptr); + threading::Value* ValToLogVal(std::optional& val, Type* ty); Stream* FindStream(EnumVal* id); void RemoveDisabledWriters(Stream* stream); void InstallRotationTimer(WriterInfo* winfo);