Merge remote-tracking branch 'origin/topic/awelzel/cluster-telemetry-follow-up'

* origin/topic/awelzel/cluster-telemetry-follow-up:
  Bump cluster test suite
  cluster/Telemetry: Cache CallExpr locations
  cluster/Telemetry: Avoid unneeded StringVal() construction
  Val: Switch TablePatternMatcher to std::string_view
  RE: Add MatchAll() and MatchSet() for std::string_view
  cluster/websocket: Fix and test for invalid X-Application-Name
  cluster/telemetry: Move topic_normalization redef to zeromq
This commit is contained in:
Arne Welzel 2025-06-30 13:29:07 +02:00
commit 5847a2d32e
12 changed files with 164 additions and 38 deletions

16
CHANGES
View file

@ -1,3 +1,19 @@
8.0.0-dev.560 | 2025-06-30 13:29:07 +0200
* cluster/Telemetry: Cache CallExpr locations (Arne Welzel, Corelight)
* cluster/Telemetry: Avoid unneeded StringVal() construction (Arne Welzel, Corelight)
* Val: Switch TablePatternMatcher to std::string_view (Arne Welzel, Corelight)
...and add TableVal::LookupPattern(std::string_view sv).
* RE: Add MatchAll() and MatchSet() for std::string_view (Arne Welzel, Corelight)
* cluster/websocket: Fix and test for invalid X-Application-Name (Arne Welzel, Corelight)
* cluster/telemetry: Move topic_normalization redef to zeromq (Arne Welzel, Corelight)
8.0.0-dev.551 | 2025-06-26 20:21:31 +0100
* Spicy SSL analyzer: make record layer version parsing more strict (Johanna Amann, Corelight)

View file

@ -1 +1 @@
8.0.0-dev.551
8.0.0-dev.560

View file

@ -29,7 +29,6 @@ export {
## Map to an empty string to skip recording a specific metric
## completely.
const topic_normalizations: table[pattern] of string = {
[/^zeek\.cluster\.nodeid\..*/] = "zeek.cluster.nodeid.__normalized__",
[/^zeek\/cluster\/nodeid\/.*/] = "zeek/cluster/nodeid/__normalized__",
} &ordered &redef;

View file

@ -257,6 +257,10 @@ function zeromq_nodeid_topic(id: string): string {
return nodeid_topic_prefix + "." + id + ".";
}
redef Cluster::Telemetry::topic_normalizations += {
[/^zeek\.cluster\.nodeid\..*/] = "zeek.cluster.nodeid.__normalized__",
};
# Unique identifier for this node with some debug information.
const my_node_id = fmt("zeromq_%s_%s_%s_%s", Cluster::node, gethostname(), getpid(), unique_id("N"));

View file

@ -189,24 +189,37 @@ std::string Specific_RE_Matcher::LookupDef(const std::string& def) {
return {};
}
bool Specific_RE_Matcher::MatchAll(const char* s) { return MatchAll((const u_char*)(s), strlen(s)); }
bool Specific_RE_Matcher::MatchAll(const char* s) { return MatchAll(std::string_view{s}); }
bool Specific_RE_Matcher::MatchAll(const String* s) {
// s->Len() does not include '\0'.
return MatchAll(s->Bytes(), s->Len());
bool Specific_RE_Matcher::MatchAll(const String* s) { return MatchAll(s->ToStdStringView()); }
bool Specific_RE_Matcher::MatchAll(std::string_view sv) {
return MatchAll(reinterpret_cast<const u_char*>(sv.data()), sv.size());
}
bool Specific_RE_Matcher::MatchSet(const String* s, std::vector<AcceptIdx>& matches) {
return MatchAll(s->Bytes(), s->Len(), &matches);
}
int Specific_RE_Matcher::Match(const char* s) { return Match((const u_char*)(s), strlen(s)); }
bool Specific_RE_Matcher::MatchSet(std::string_view sv, std::vector<AcceptIdx>& matches) {
return MatchAll(reinterpret_cast<const u_char*>(sv.data()), sv.size(), &matches);
}
int Specific_RE_Matcher::Match(const String* s) { return Match(s->Bytes(), s->Len()); }
int Specific_RE_Matcher::Match(const char* s) { return Match(std::string_view{s}); }
int Specific_RE_Matcher::LongestMatch(const char* s) { return LongestMatch((const u_char*)(s), strlen(s)); }
int Specific_RE_Matcher::Match(const String* s) { return Match(s->ToStdStringView()); }
int Specific_RE_Matcher::LongestMatch(const String* s) { return LongestMatch(s->Bytes(), s->Len()); }
int Specific_RE_Matcher::Match(std::string_view sv) {
return Match(reinterpret_cast<const u_char*>(sv.data()), sv.size());
}
int Specific_RE_Matcher::LongestMatch(const char* s) { return LongestMatch(std::string_view{s}); }
int Specific_RE_Matcher::LongestMatch(const String* s) { return LongestMatch(s->ToStdStringView()); }
int Specific_RE_Matcher::LongestMatch(std::string_view sv) {
return LongestMatch(reinterpret_cast<const u_char*>(sv.data()), sv.size());
}
bool Specific_RE_Matcher::MatchAll(const u_char* bv, int n, std::vector<AcceptIdx>* matches) {
if ( ! dfa )

View file

@ -7,6 +7,7 @@
#include <map>
#include <set>
#include <string>
#include <string_view>
#include "zeek/CCL.h"
#include "zeek/EquivClass.h"
@ -89,6 +90,7 @@ public:
bool MatchAll(const char* s);
bool MatchAll(const String* s);
bool MatchAll(std::string_view sv);
// Compiles a set of regular expressions simultaneously.
// 'idx' contains indices associated with the expressions.
@ -104,16 +106,21 @@ public:
// Behaves as MatchAll(), consuming the complete input string.
bool MatchSet(const String* s, std::vector<AcceptIdx>& matches);
// As MatchSet() above, but taking a std::string_view.
bool MatchSet(std::string_view sv, std::vector<AcceptIdx>& matches);
// Returns the position in s just beyond where the first match
// occurs, or 0 if there is no such position in s. Note that
// if the pattern matches empty strings, matching continues
// in an attempt to match at least one character.
int Match(const char* s);
int Match(const String* s);
int Match(std::string_view sv);
int Match(const u_char* bv, int n);
int LongestMatch(const char* s);
int LongestMatch(const String* s);
int LongestMatch(std::string_view sv);
int LongestMatch(const u_char* bv, int n, bool bol = true, bool eol = true);
EquivClass* EC() { return &equiv_class; }

View file

@ -1573,10 +1573,10 @@ public:
void Clear() { matcher.reset(); }
VectorValPtr Lookup(const StringValPtr& s);
VectorValPtr Lookup(std::string_view sv);
// Delegate to matcher->MatchAll().
bool MatchAll(const StringValPtr& s);
bool MatchAll(std::string_view sv);
void GetStats(detail::DFA_State_Cache_Stats* stats) const {
if ( matcher && matcher->DFA() )
@ -1606,7 +1606,7 @@ private:
std::vector<ValPtr> matcher_yields;
};
VectorValPtr detail::TablePatternMatcher::Lookup(const StringValPtr& s) {
VectorValPtr detail::TablePatternMatcher::Lookup(std::string_view sv) {
auto results = make_intrusive<VectorVal>(vtype);
if ( ! matcher ) {
@ -1617,7 +1617,7 @@ VectorValPtr detail::TablePatternMatcher::Lookup(const StringValPtr& s) {
}
std::vector<AcceptIdx> matches;
matcher->MatchSet(s->AsString(), matches);
matcher->MatchSet(sv, matches);
for ( auto m : matches )
results->Append(matcher_yields[m]);
@ -1625,7 +1625,7 @@ VectorValPtr detail::TablePatternMatcher::Lookup(const StringValPtr& s) {
return results;
}
bool detail::TablePatternMatcher::MatchAll(const StringValPtr& s) {
bool detail::TablePatternMatcher::MatchAll(std::string_view sv) {
if ( ! matcher ) {
if ( tbl->Get()->Length() == 0 )
return false;
@ -1633,7 +1633,7 @@ bool detail::TablePatternMatcher::MatchAll(const StringValPtr& s) {
Build();
}
return matcher->MatchAll(s->AsString());
return matcher->MatchAll(sv);
}
void detail::TablePatternMatcher::Build() {
@ -2182,18 +2182,20 @@ TableValPtr TableVal::LookupSubnetValues(const SubNetVal* search) {
return nt;
}
VectorValPtr TableVal::LookupPattern(const StringValPtr& s) {
VectorValPtr TableVal::LookupPattern(const StringValPtr& s) { return LookupPattern(s->ToStdStringView()); }
VectorValPtr TableVal::LookupPattern(std::string_view sv) {
if ( ! pattern_matcher || ! GetType()->Yield() )
reporter->InternalError("LookupPattern called on wrong table type");
return pattern_matcher->Lookup(s);
return pattern_matcher->Lookup(sv);
}
bool TableVal::MatchPattern(const StringValPtr& s) {
if ( ! pattern_matcher )
reporter->InternalError("LookupPattern called on wrong table type");
return pattern_matcher->MatchAll(s);
return pattern_matcher->MatchAll(s->ToStdStringView());
}
void TableVal::GetPatternMatcherStats(detail::DFA_State_Cache_Stats* stats) const {

View file

@ -4,6 +4,8 @@
#include <sys/types.h> // for u_char
#include <array>
#include <string>
#include <string_view>
#include <unordered_map>
#include <vector>
@ -898,6 +900,11 @@ public:
// Causes an internal error if called for any other kind of table.
VectorValPtr LookupPattern(const StringValPtr& s);
// For a table[pattern], return a vector of all yields matching
// the given string.
// Causes an internal error if called for any other kind of table.
VectorValPtr LookupPattern(std::string_view sv);
// For a table[pattern] or set[pattern], returns True if any of the
// patterns in the index matches the given string, else False.
// Causes an internal error if called for any other kind of table.

View file

@ -22,11 +22,7 @@ TableTopicNormalizer::TableTopicNormalizer() {
}
std::string_view TableTopicNormalizer::operator()(std::string_view topic) {
// TODO: It'd be nice if we could just lookup via string_view so we can
// avoid the allocation of the intermediary StringVal just to match
// against the patterns.
auto sv = zeek::make_intrusive<zeek::StringVal>(topic);
VectorValPtr r = topic_normalizations->LookupPattern(sv);
VectorValPtr r = topic_normalizations->LookupPattern(topic);
if ( r->Size() == 0 )
return topic;
@ -131,28 +127,44 @@ void VerboseTelemetry::OnIncomingEvent(std::string_view topic, std::string_view
namespace {
std::string determine_script_location() {
std::string result = "none";
if ( zeek::detail::call_stack.empty() )
return result;
// Cached lookup of a script location.
std::string_view determine_script_location() {
// Global cache for CallExpr pointers to their location.
static std::map<const zeek::detail::CallExpr*, std::string> location_cache;
ssize_t sidx = static_cast<ssize_t>(zeek::detail::call_stack.size()) - 1;
while ( sidx >= 0 ) {
const auto* func = zeek::detail::call_stack[sidx].func;
const auto* ce = zeek::detail::call_stack[sidx].call;
// without_zeekpath_component looks pretty expensive and might be
// better to cache the result using the ce pointer instead of computing
// it over and over again.
// Cached?
if ( auto it = location_cache.find(ce); it != location_cache.end() )
return it->second;
// Future: Ignore wrapper functions if we ever come across some.
// We only care about Broker::publish() and Cluster::publish() and
// these aren't wrapped, so currently nothing to do here.
//
// if ( ignore func ) {
// --sidx;
// continue;
// }
//
// Check Func.cc::emit_builtin_error_common() for inspiration how to
// remove wrapper function.
const auto* loc = ce->GetLocationInfo();
std::string normalized_location = zeek::util::detail::without_zeekpath_component(loc->filename);
result = normalized_location + ":" + std::to_string(loc->first_line);
break;
normalized_location.append(":");
normalized_location.append(std::to_string(loc->first_line));
auto [it, inserted] = location_cache.emplace(ce, std::move(normalized_location));
assert(inserted);
return it->second;
}
return result;
return "none";
}
} // namespace
@ -198,11 +210,10 @@ DebugTelemetry::DebugTelemetry(TopicNormalizer topic_normalizer, std::string_vie
void DebugTelemetry::OnOutgoingEvent(std::string_view topic, std::string_view handler_name,
const SerializationInfo& info) {
auto normalized_topic = topic_normalizer(topic);
std::string script_location = determine_script_location();
labels_view[topic_idx].second = normalized_topic;
labels_view[handler_idx].second = handler_name;
labels_view[script_location_idx].second = script_location;
labels_view[script_location_idx].second = determine_script_location();
const auto& hist = out->GetOrAdd(labels_view);
hist->Observe(static_cast<double>(info.Size()));

View file

@ -303,7 +303,6 @@ void WebSocketEventDispatcher::Process(const WebSocketOpen& open) {
});
if ( ! good_application_name ) {
QueueReply(WebSocketCloseReply{wsc, 1001, "Internal error"});
open.wsc->SendError("invalid_application_name", "Invalid X-Application-Name");
open.wsc->Close(1008, "Invalid X-Application-Name");

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
connected
recv code invalid_application_name context Invalid X-Application-Name
exception code 1008 reason Invalid X-Application-Name

View file

@ -0,0 +1,64 @@
# @TEST-DOC: Test a WebSocket client with an invalid X-Application-Name that is rejected.
#
# @TEST-REQUIRES: have-zeromq
# @TEST-REQUIRES: python3 -c 'import websockets.sync'
#
# @TEST-GROUP: cluster-zeromq
#
# @TEST-PORT: XPUB_PORT
# @TEST-PORT: XSUB_PORT
# @TEST-PORT: LOG_PULL_PORT
# @TEST-PORT: WEBSOCKET_PORT
#
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
# @TEST-EXEC: cp $FILES/ws/wstest.py .
#
# @TEST-EXEC: zeek -b --parse-only manager.zeek
# @TEST-EXEC: python3 -m py_compile client.py
#
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek"
# @TEST-EXEC: python3 client.py > client.out 2>&1
#
# @TEST-EXEC: btest-diff client.out
# @TEST-START-FILE manager.zeek
@load ./zeromq-test-bootstrap
global ping: event(msg: string, c: count) &is_used;
event zeek_init()
{
Cluster::subscribe("/test/pings");
Cluster::listen_websocket([$listen_addr=127.0.0.1, $listen_port=to_port(getenv("WEBSOCKET_PORT"))]);
}
# terminate() on the first proper client connection.
event Cluster::websocket_client_added(info: Cluster::EndpointInfo, subscriptions: string_vec)
{
terminate();
}
# @TEST-END-FILE
# @TEST-START-FILE client.py
import websockets.exceptions
import wstest
def run(ws_url):
try:
with wstest.connect("ws1", ws_url, additional_headers={"X-Application-Name": "!!invalid~~"}) as tc:
print("connected")
while True:
err = tc.recv_json()
print("recv", "code", err["code"], "context", err["context"])
except websockets.exceptions.ConnectionClosedError as e:
print("exception", "code", e.code, "reason", e.reason)
# For terminating the Zeek server.
with wstest.connect("ws2", ws_url) as tc:
tc.hello_v1([])
if __name__ == "__main__":
wstest.main(run, wstest.WS4_URL_V1)
# @TEST-END-FILE