Merge remote-tracking branch 'origin/topic/justin/file_analysis_speedup'

* origin/topic/justin/file_analysis_speedup:
  speed up file analysis, remove IncrementByteCount
This commit is contained in:
Arne Welzel 2025-05-09 17:25:38 +02:00
commit 6b6c3dbbb7
5 changed files with 34 additions and 15 deletions

View file

@ -1,3 +1,10 @@
8.0.0-dev.82 | 2025-05-09 17:25:38 +0200
* speed up file analysis, remove IncrementByteCount (Justin Azoff)
Avoid creating and recreating count objects for each chunk of file
analyzed. This replaces counts inside of records with c++ uint64_ts.
8.0.0-dev.80 | 2025-05-09 09:51:48 +0200 8.0.0-dev.80 | 2025-05-09 09:51:48 +0200
* Updating submodules binpac, broker and zeek-aux [nomail] (Arne Welzel, Corelight) * Updating submodules binpac, broker and zeek-aux [nomail] (Arne Welzel, Corelight)

View file

@ -1 +1 @@
8.0.0-dev.80 8.0.0-dev.82

View file

@ -1310,11 +1310,15 @@ public:
return cast_intrusive<T>(GetField(field)); return cast_intrusive<T>(GetField(field));
} }
// Returns true if the slot for the given field is initialized.
// This helper can be used to guard GetFieldAs() accesses.
bool HasRawField(int field) const { return record_val[field].has_value(); }
// The following return the given field converted to a particular // The following return the given field converted to a particular
// underlying value. We provide these to enable efficient // underlying value. We provide these to enable efficient
// access to record fields (without requiring an intermediary Val). // access to record fields (without requiring an intermediary Val).
// It is up to the caller to ensure that the field exists in the // It is up to the caller to ensure that the field exists in the
// record (using HasField(), if necessary). // record (using HasRawField(), if necessary).
template<typename T, typename std::enable_if_t<is_zeek_val_v<T>, bool> = true> template<typename T, typename std::enable_if_t<is_zeek_val_v<T>, bool> = true>
auto GetFieldAs(int field) const -> std::invoke_result_t<decltype(&T::Get), T> { auto GetFieldAs(int field) const -> std::invoke_result_t<decltype(&T::Get), T> {
if constexpr ( std::is_same_v<T, BoolVal> || std::is_same_v<T, IntVal> || std::is_same_v<T, EnumVal> ) if constexpr ( std::is_same_v<T, BoolVal> || std::is_same_v<T, IntVal> || std::is_same_v<T, EnumVal> )

View file

@ -86,6 +86,9 @@ File::File(const std::string& file_id, const std::string& source_name, Connectio
reassembly_enabled(false), reassembly_enabled(false),
postpone_timeout(false), postpone_timeout(false),
done(false), done(false),
seen_bytes(0),
missing_bytes(0),
overflow_bytes(0),
analyzers(this) { analyzers(this) {
StaticInit(); StaticInit();
@ -147,6 +150,9 @@ void File::RaiseFileOverNewConnection(Connection* conn, bool is_orig) {
} }
uint64_t File::LookupFieldDefaultCount(int idx) const { uint64_t File::LookupFieldDefaultCount(int idx) const {
if ( val->HasRawField(idx) )
return val->GetFieldAs<zeek::CountVal>(idx);
auto v = val->GetFieldOrDefault(idx); auto v = val->GetFieldOrDefault(idx);
return v->AsCount(); return v->AsCount();
} }
@ -192,23 +198,19 @@ bool File::SetExtractionLimit(RecordValPtr args, uint64_t bytes) {
return true; return true;
} }
void File::IncrementByteCount(uint64_t size, int field_idx) {
uint64_t old = LookupFieldDefaultCount(field_idx);
val->Assign(field_idx, old + size);
}
void File::SetTotalBytes(uint64_t size) { void File::SetTotalBytes(uint64_t size) {
DBG_LOG(DBG_FILE_ANALYSIS, "[%s] Total bytes %" PRIu64, id.c_str(), size); DBG_LOG(DBG_FILE_ANALYSIS, "[%s] Total bytes %" PRIu64, id.c_str(), size);
val->Assign(total_bytes_idx, size); val->Assign(total_bytes_idx, size);
} }
bool File::IsComplete() const { bool File::IsComplete() const {
const auto& total = val->GetField(total_bytes_idx); // If total_bytes hasn't been initialized yet, file is certainly not complete.
if ( ! val->HasRawField(total_bytes_idx) )
if ( ! total )
return false; return false;
if ( stream_offset >= total->AsCount() ) auto total = val->GetFieldAs<zeek::CountVal>(total_bytes_idx);
if ( stream_offset >= total )
return true; return true;
return false; return false;
@ -372,7 +374,7 @@ void File::DeliverStream(const u_char* data, uint64_t len) {
} }
stream_offset += len; stream_offset += len;
IncrementByteCount(len, seen_bytes_idx); seen_bytes += len;
} }
void File::DeliverChunk(const u_char* data, uint64_t len, uint64_t offset) { void File::DeliverChunk(const u_char* data, uint64_t len, uint64_t offset) {
@ -388,7 +390,7 @@ void File::DeliverChunk(const u_char* data, uint64_t len, uint64_t offset) {
if ( reassembly_max_buffer > 0 && reassembly_max_buffer < file_reassembler->TotalSize() ) { if ( reassembly_max_buffer > 0 && reassembly_max_buffer < file_reassembler->TotalSize() ) {
uint64_t current_offset = stream_offset; uint64_t current_offset = stream_offset;
uint64_t gap_bytes = file_reassembler->Flush(); uint64_t gap_bytes = file_reassembler->Flush();
IncrementByteCount(gap_bytes, overflow_bytes_idx); overflow_bytes += gap_bytes;
if ( FileEventAvailable(file_reassembly_overflow) ) { if ( FileEventAvailable(file_reassembly_overflow) ) {
FileEvent(file_reassembly_overflow, {val, val_mgr->Count(current_offset), val_mgr->Count(gap_bytes)}); FileEvent(file_reassembly_overflow, {val, val_mgr->Count(current_offset), val_mgr->Count(gap_bytes)});
@ -411,7 +413,7 @@ void File::DeliverChunk(const u_char* data, uint64_t len, uint64_t offset) {
} }
else { else {
// We can't reassemble so we throw out the data for streaming. // We can't reassemble so we throw out the data for streaming.
IncrementByteCount(len, overflow_bytes_idx); overflow_bytes += len;
} }
DBG_LOG(DBG_FILE_ANALYSIS, "[%s] %" PRIu64 " chunk bytes in at offset %" PRIu64 "; %s [%s%s]", id.c_str(), len, DBG_LOG(DBG_FILE_ANALYSIS, "[%s] %" PRIu64 " chunk bytes in at offset %" PRIu64 "; %s [%s%s]", id.c_str(), len,
@ -513,7 +515,7 @@ void File::Gap(uint64_t offset, uint64_t len) {
analyzers.DrainModifications(); analyzers.DrainModifications();
stream_offset += len; stream_offset += len;
IncrementByteCount(len, missing_bytes_idx); missing_bytes += len;
} }
bool File::FileEventAvailable(EventHandlerPtr h) { return h && ! file_mgr->IsIgnored(id); } bool File::FileEventAvailable(EventHandlerPtr h) { return h && ! file_mgr->IsIgnored(id); }
@ -526,6 +528,9 @@ void File::FileEvent(EventHandlerPtr h) {
} }
void File::FileEvent(EventHandlerPtr h, Args args) { void File::FileEvent(EventHandlerPtr h, Args args) {
val->Assign(seen_bytes_idx, seen_bytes);
val->Assign(missing_bytes_idx, missing_bytes);
val->Assign(overflow_bytes_idx, overflow_bytes);
event_mgr.Enqueue(h, std::move(args)); event_mgr.Enqueue(h, std::move(args));
if ( h == file_new || h == file_over_new_connection || h == file_sniff || h == file_timeout || if ( h == file_new || h == file_over_new_connection || h == file_sniff || h == file_timeout ||

View file

@ -325,6 +325,9 @@ protected:
bool reassembly_enabled; /**< Whether file stream reassembly is needed. */ bool reassembly_enabled; /**< Whether file stream reassembly is needed. */
bool postpone_timeout; /**< Whether postponing timeout is requested. */ bool postpone_timeout; /**< Whether postponing timeout is requested. */
bool done; /**< If this object is about to be deleted. */ bool done; /**< If this object is about to be deleted. */
uint64_t seen_bytes; /**< Number of bytes processed for this file. */
uint64_t missing_bytes; /**< Number of bytes missed for this file. */
uint64_t overflow_bytes; /**< Number of bytes not delivered. */
detail::AnalyzerSet analyzers; /**< A set of attached file analyzers. */ detail::AnalyzerSet analyzers; /**< A set of attached file analyzers. */
std::list<Analyzer*> done_analyzers; /**< Analyzers we're done with, remembered here until they std::list<Analyzer*> done_analyzers; /**< Analyzers we're done with, remembered here until they
can be safely deleted. */ can be safely deleted. */