mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Merge remote-tracking branch 'origin/topic/awelzel/fix-cluster-publish-any'
* origin/topic/awelzel/fix-cluster-publish-any: cluster/Backend: Handle unspecified table/set cluster: Fix Cluster::publish() of Broker::Data cluster: Be noisy when attempting to connect to an unknown node
This commit is contained in:
commit
25554fa668
17 changed files with 519 additions and 9 deletions
29
CHANGES
29
CHANGES
|
@ -1,3 +1,32 @@
|
|||
7.1.0-dev.796 | 2024-12-12 13:16:57 -0700
|
||||
|
||||
* cluster/Backend: Handle unspecified table/set (Arne Welzel, Corelight)
|
||||
|
||||
Same as what we do in Broker. Use the expected type if publishing
|
||||
a table() or set() parameter.
|
||||
|
||||
This fixes issues when switching sumstats to Cluster::publish()
|
||||
|
||||
* cluster: Fix Cluster::publish() of Broker::Data (Arne Welzel, Corelight)
|
||||
|
||||
The broker serializer leverages the existing data_to_val() function.
|
||||
During unserialization, if the destination type is any, the logic
|
||||
simply wraps the broker::data value into a Broker::Data record.
|
||||
Therefore, events with any parameters are currently exposed to
|
||||
the Broker::Data type.
|
||||
|
||||
There is a bigger issue in that re-publishing such Broker::Data
|
||||
instances would encode them as a normal record. Explicitly prevent
|
||||
this by serializing the contained data value directly instead, similar
|
||||
to what Broker already did when publishing a record.
|
||||
|
||||
* cluster: Be noisy when attempting to connect to an unknown node (Arne Welzel, Corelight)
|
||||
|
||||
Mostly due to spending too much time wondering why nodes didn't connect
|
||||
when there was a mismatch between "manager" and "manager-1" in the
|
||||
cluster layout. Remove manager from test-all-policy-cluster test to
|
||||
avoid connection attempts in this test.
|
||||
|
||||
7.1.0-dev.792 | 2024-12-12 11:35:08 -0700
|
||||
|
||||
* Bump auxil/spicy to latest development snapshot (Benjamin Bannier, Corelight)
|
||||
|
|
2
VERSION
2
VERSION
|
@ -1 +1 @@
|
|||
7.1.0-dev.792
|
||||
7.1.0-dev.796
|
||||
|
|
|
@ -36,6 +36,8 @@ function connect_peer(node_type: NodeType, node_name: string)
|
|||
status));
|
||||
return;
|
||||
}
|
||||
|
||||
Reporter::warning(fmt("connect_peer: node '%s' (%s) not found", node_name, node_type));
|
||||
}
|
||||
|
||||
function connect_peers_with_type(node_type: NodeType)
|
||||
|
|
|
@ -539,13 +539,22 @@ std::string Manager::NodeID() const { return to_string(bstate->endpoint.node_id(
|
|||
bool Manager::DoPublishEvent(const std::string& topic, const cluster::detail::Event& event) {
|
||||
broker::vector xs;
|
||||
xs.reserve(event.args.size());
|
||||
|
||||
for ( const auto& a : event.args ) {
|
||||
auto r = detail::val_to_data(a.get());
|
||||
if ( ! r ) {
|
||||
if ( a->GetType() == zeek::BifType::Record::Broker::Data ) {
|
||||
// When encountering a Broker::Data instance within args, pick out
|
||||
// the broker::data directly to avoid double encoding of the record.
|
||||
const auto& val = a->AsRecordVal()->GetField(0);
|
||||
auto* data_val = static_cast<zeek::Broker::detail::DataVal*>(val.get());
|
||||
xs.emplace_back(data_val->data);
|
||||
}
|
||||
else if ( auto r = detail::val_to_data(a.get()) ) {
|
||||
xs.emplace_back(std::move(r.value()));
|
||||
}
|
||||
else {
|
||||
Error("Failed to convert %s to broker::data", zeek::obj_desc(a.get()).c_str());
|
||||
return false;
|
||||
}
|
||||
xs.emplace_back(std::move(r.value()));
|
||||
}
|
||||
|
||||
std::string name(event.HandlerName());
|
||||
|
|
|
@ -36,9 +36,15 @@ std::optional<zeek::Args> detail::check_args(const zeek::FuncValPtr& handler, ze
|
|||
|
||||
for ( size_t i = 0; i < args.size(); i++ ) {
|
||||
const auto& a = args[i];
|
||||
const auto& got_type = a->GetType();
|
||||
auto got_type = a->GetType();
|
||||
const auto& expected_type = types[i];
|
||||
|
||||
// If called with an unspecified table or set, adopt the expected type
|
||||
// as otherwise same_type() fails.
|
||||
if ( got_type->Tag() == TYPE_TABLE && got_type->AsTableType()->IsUnspecifiedTable() )
|
||||
if ( expected_type->Tag() == TYPE_TABLE && got_type->IsSet() == expected_type->IsSet() )
|
||||
got_type = expected_type;
|
||||
|
||||
if ( ! same_type(got_type, expected_type) ) {
|
||||
zeek::reporter->Error("event parameter #%zu type mismatch, got %s, expecting %s", i + 1,
|
||||
zeek::obj_desc_short(got_type.get()).c_str(),
|
||||
|
|
|
@ -21,6 +21,9 @@ std::optional<zeek::cluster::detail::Event> to_cluster_event(const zeek::RecordV
|
|||
const auto& func = rec->GetField<zeek::FuncVal>(0);
|
||||
const auto& vargs = rec->GetField<zeek::VectorVal>(1);
|
||||
|
||||
if ( ! func )
|
||||
return std::nullopt;
|
||||
|
||||
const auto& eh = zeek::event_registry->Lookup(func->AsFuncPtr()->GetName());
|
||||
if ( ! eh ) {
|
||||
zeek::emit_builtin_error(
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
#include "zeek/broker/Data.h"
|
||||
#include "zeek/cluster/Backend.h"
|
||||
|
||||
#include "broker/data.bif.h"
|
||||
#include "broker/data_envelope.hh"
|
||||
#include "broker/error.hh"
|
||||
#include "broker/format/json.hh"
|
||||
|
@ -30,8 +31,16 @@ namespace {
|
|||
std::optional<broker::zeek::Event> to_broker_event(const detail::Event& ev) {
|
||||
broker::vector xs;
|
||||
xs.reserve(ev.args.size());
|
||||
|
||||
for ( const auto& a : ev.args ) {
|
||||
if ( auto res = zeek::Broker::detail::val_to_data(a.get()) ) {
|
||||
if ( a->GetType() == zeek::BifType::Record::Broker::Data ) {
|
||||
// When encountering a Broker::Data instance within args, pick out
|
||||
// the broker::data directly to avoid double encoding, Broker::Data.
|
||||
const auto& val = a->AsRecordVal()->GetField(0);
|
||||
auto* data_val = static_cast<zeek::Broker::detail::DataVal*>(val.get());
|
||||
xs.emplace_back(data_val->data);
|
||||
}
|
||||
else if ( auto res = zeek::Broker::detail::val_to_data(a.get()) ) {
|
||||
xs.emplace_back(std::move(res.value()));
|
||||
}
|
||||
else {
|
||||
|
@ -85,6 +94,11 @@ std::optional<detail::Event> to_zeek_event(const broker::zeek::Event& ev) {
|
|||
for ( size_t i = 0; i < args.size(); ++i ) {
|
||||
const auto& expected_type = arg_types[i];
|
||||
auto arg = args[i].to_data();
|
||||
// XXX: data_to_val() uses Broker::Data for `any` type parameters, exposing
|
||||
// Broker::Data to the script-layer even if Broker isn't used.
|
||||
//
|
||||
// This might be part of the API, but seems we could also use the concrete
|
||||
// Val type if the serializer encodes that information in the message.
|
||||
auto val = zeek::Broker::detail::data_to_val(arg, expected_type.get());
|
||||
if ( val )
|
||||
vl.emplace_back(std::move(val));
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
node_up, worker-1
|
||||
sending pings, 0, count, 1
|
||||
got pong, 1, for ping, 0, count (cluster publish), Broker::Data, [data=broker::data{1}]
|
||||
got pong, 2, for ping, 0, count (cluster event ), Broker::Data, [data=broker::data{1}]
|
||||
got pong, 3, for ping, 0, count (broker publish ), Broker::Data, [data=broker::data{1}]
|
||||
got pong, 4, for ping, 0, count (broker event ), Broker::Data, [data=broker::data{1}]
|
||||
got pong, 5, for ping, 0, count (cluster publish), Broker::Data, [data=broker::data{1}]
|
||||
got pong, 6, for ping, 0, count (cluster event ), Broker::Data, [data=broker::data{1}]
|
||||
got pong, 7, for ping, 0, count (broker publish ), Broker::Data, [data=broker::data{1}]
|
||||
got pong, 8, for ping, 0, count (broker event ), Broker::Data, [data=broker::data{1}]
|
||||
got pong, 9, for ping, 0, count (cluster publish), Broker::Data, [data=broker::data{1}]
|
||||
got pong, 10, for ping, 0, count (cluster event ), Broker::Data, [data=broker::data{1}]
|
||||
got pong, 11, for ping, 0, count (broker publish ), Broker::Data, [data=broker::data{1}]
|
||||
got pong, 12, for ping, 0, count (broker event ), Broker::Data, [data=broker::data{1}]
|
||||
sending pings, 1, string, a string
|
||||
got pong, 13, for ping, 1, string (cluster publish), Broker::Data, [data=broker::data{a string}]
|
||||
got pong, 14, for ping, 1, string (cluster event ), Broker::Data, [data=broker::data{a string}]
|
||||
got pong, 15, for ping, 1, string (broker publish ), Broker::Data, [data=broker::data{a string}]
|
||||
got pong, 16, for ping, 1, string (broker event ), Broker::Data, [data=broker::data{a string}]
|
||||
got pong, 17, for ping, 1, string (cluster publish), Broker::Data, [data=broker::data{a string}]
|
||||
got pong, 18, for ping, 1, string (cluster event ), Broker::Data, [data=broker::data{a string}]
|
||||
got pong, 19, for ping, 1, string (broker publish ), Broker::Data, [data=broker::data{a string}]
|
||||
got pong, 20, for ping, 1, string (broker event ), Broker::Data, [data=broker::data{a string}]
|
||||
got pong, 21, for ping, 1, string (cluster publish), Broker::Data, [data=broker::data{a string}]
|
||||
got pong, 22, for ping, 1, string (cluster event ), Broker::Data, [data=broker::data{a string}]
|
||||
got pong, 23, for ping, 1, string (broker publish ), Broker::Data, [data=broker::data{a string}]
|
||||
got pong, 24, for ping, 1, string (broker event ), Broker::Data, [data=broker::data{a string}]
|
||||
sending pings, 2, port, 42/tcp
|
||||
got pong, 25, for ping, 2, port (cluster publish), Broker::Data, [data=broker::data{42/tcp}]
|
||||
got pong, 26, for ping, 2, port (cluster event ), Broker::Data, [data=broker::data{42/tcp}]
|
||||
got pong, 27, for ping, 2, port (broker publish ), Broker::Data, [data=broker::data{42/tcp}]
|
||||
got pong, 28, for ping, 2, port (broker event ), Broker::Data, [data=broker::data{42/tcp}]
|
||||
got pong, 29, for ping, 2, port (cluster publish), Broker::Data, [data=broker::data{42/tcp}]
|
||||
got pong, 30, for ping, 2, port (cluster event ), Broker::Data, [data=broker::data{42/tcp}]
|
||||
got pong, 31, for ping, 2, port (broker publish ), Broker::Data, [data=broker::data{42/tcp}]
|
||||
got pong, 32, for ping, 2, port (broker event ), Broker::Data, [data=broker::data{42/tcp}]
|
||||
got pong, 33, for ping, 2, port (cluster publish), Broker::Data, [data=broker::data{42/tcp}]
|
||||
got pong, 34, for ping, 2, port (cluster event ), Broker::Data, [data=broker::data{42/tcp}]
|
||||
got pong, 35, for ping, 2, port (broker publish ), Broker::Data, [data=broker::data{42/tcp}]
|
||||
got pong, 36, for ping, 2, port (broker event ), Broker::Data, [data=broker::data{42/tcp}]
|
||||
sending pings, 3, vector of count, [1, 2, 3]
|
||||
got pong, 37, for ping, 3, vector of count (cluster publish), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got pong, 38, for ping, 3, vector of count (cluster event ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got pong, 39, for ping, 3, vector of count (broker publish ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got pong, 40, for ping, 3, vector of count (broker event ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got pong, 41, for ping, 3, vector of count (cluster publish), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got pong, 42, for ping, 3, vector of count (cluster event ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got pong, 43, for ping, 3, vector of count (broker publish ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got pong, 44, for ping, 3, vector of count (broker event ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got pong, 45, for ping, 3, vector of count (cluster publish), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got pong, 46, for ping, 3, vector of count (cluster event ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got pong, 47, for ping, 3, vector of count (broker publish ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got pong, 48, for ping, 3, vector of count (broker event ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
sending pings, 4, time, 42.0
|
||||
got pong, 49, for ping, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got pong, 50, for ping, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got pong, 51, for ping, 4, time (broker publish ), Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got pong, 52, for ping, 4, time (broker event ), Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got pong, 53, for ping, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got pong, 54, for ping, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got pong, 55, for ping, 4, time (broker publish ), Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got pong, 56, for ping, 4, time (broker event ), Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got pong, 57, for ping, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got pong, 58, for ping, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got pong, 59, for ping, 4, time (broker publish ), Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got pong, 60, for ping, 4, time (broker event ), Broker::Data, [data=broker::data{42000000000ns}]
|
|
@ -0,0 +1,17 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
got ping, 0, count, Broker::Data, [data=broker::data{1}]
|
||||
got ping, 0, count, Broker::Data, [data=broker::data{1}]
|
||||
got ping, 0, count, Broker::Data, [data=broker::data{1}]
|
||||
got ping, 1, string, Broker::Data, [data=broker::data{a string}]
|
||||
got ping, 1, string, Broker::Data, [data=broker::data{a string}]
|
||||
got ping, 1, string, Broker::Data, [data=broker::data{a string}]
|
||||
got ping, 2, port, Broker::Data, [data=broker::data{42/tcp}]
|
||||
got ping, 2, port, Broker::Data, [data=broker::data{42/tcp}]
|
||||
got ping, 2, port, Broker::Data, [data=broker::data{42/tcp}]
|
||||
got ping, 3, vector of count, Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got ping, 3, vector of count, Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got ping, 3, vector of count, Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got finish!
|
|
@ -0,0 +1,37 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
node_up, worker-1
|
||||
sending pings, 0, count, 1
|
||||
got pong, 1, with, 0, count (cluster publish), Broker::Data, [data=broker::data{1}]
|
||||
got pong, 2, with, 0, count (cluster event ), Broker::Data, [data=broker::data{1}]
|
||||
got pong, 3, with, 0, count (cluster publish), Broker::Data, [data=broker::data{1}]
|
||||
got pong, 4, with, 0, count (cluster event ), Broker::Data, [data=broker::data{1}]
|
||||
got pong, 5, with, 0, count (cluster publish), Broker::Data, [data=broker::data{1}]
|
||||
got pong, 6, with, 0, count (cluster event ), Broker::Data, [data=broker::data{1}]
|
||||
sending pings, 1, string, a string
|
||||
got pong, 7, with, 1, string (cluster publish), Broker::Data, [data=broker::data{a string}]
|
||||
got pong, 8, with, 1, string (cluster event ), Broker::Data, [data=broker::data{a string}]
|
||||
got pong, 9, with, 1, string (cluster publish), Broker::Data, [data=broker::data{a string}]
|
||||
got pong, 10, with, 1, string (cluster event ), Broker::Data, [data=broker::data{a string}]
|
||||
got pong, 11, with, 1, string (cluster publish), Broker::Data, [data=broker::data{a string}]
|
||||
got pong, 12, with, 1, string (cluster event ), Broker::Data, [data=broker::data{a string}]
|
||||
sending pings, 2, port, 42/tcp
|
||||
got pong, 13, with, 2, port (cluster publish), Broker::Data, [data=broker::data{42/tcp}]
|
||||
got pong, 14, with, 2, port (cluster event ), Broker::Data, [data=broker::data{42/tcp}]
|
||||
got pong, 15, with, 2, port (cluster publish), Broker::Data, [data=broker::data{42/tcp}]
|
||||
got pong, 16, with, 2, port (cluster event ), Broker::Data, [data=broker::data{42/tcp}]
|
||||
got pong, 17, with, 2, port (cluster publish), Broker::Data, [data=broker::data{42/tcp}]
|
||||
got pong, 18, with, 2, port (cluster event ), Broker::Data, [data=broker::data{42/tcp}]
|
||||
sending pings, 3, vector of count, [1, 2, 3]
|
||||
got pong, 19, with, 3, vector of count (cluster publish), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got pong, 20, with, 3, vector of count (cluster event ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got pong, 21, with, 3, vector of count (cluster publish), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got pong, 22, with, 3, vector of count (cluster event ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got pong, 23, with, 3, vector of count (cluster publish), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got pong, 24, with, 3, vector of count (cluster event ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
sending pings, 4, time, 42.0
|
||||
got pong, 25, with, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got pong, 26, with, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got pong, 27, with, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got pong, 28, with, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got pong, 29, with, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got pong, 30, with, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}]
|
|
@ -0,0 +1,17 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
got ping, 0, count, Broker::Data, [data=broker::data{1}]
|
||||
got ping, 0, count, Broker::Data, [data=broker::data{1}]
|
||||
got ping, 0, count, Broker::Data, [data=broker::data{1}]
|
||||
got ping, 1, string, Broker::Data, [data=broker::data{a string}]
|
||||
got ping, 1, string, Broker::Data, [data=broker::data{a string}]
|
||||
got ping, 1, string, Broker::Data, [data=broker::data{a string}]
|
||||
got ping, 2, port, Broker::Data, [data=broker::data{42/tcp}]
|
||||
got ping, 2, port, Broker::Data, [data=broker::data{42/tcp}]
|
||||
got ping, 2, port, Broker::Data, [data=broker::data{42/tcp}]
|
||||
got ping, 3, vector of count, Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got ping, 3, vector of count, Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got ping, 3, vector of count, Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||
got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}]
|
||||
got finish!
|
|
@ -0,0 +1,5 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
node_up, worker-1
|
||||
pong_table, hello, ResultTable, {\x0a\x0a}
|
||||
pong_set, hello, ResultSet, {\x0a\x0a}
|
||||
node_down, worker-1
|
|
@ -0,0 +1,3 @@
|
|||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||
ping_table, hello, ResultTable, {\x0a\x0a}
|
||||
ping_set, hello, ResultSet, {\x0a\x0a}
|
107
testing/btest/cluster/broker/publish-any.zeek
Normal file
107
testing/btest/cluster/broker/publish-any.zeek
Normal file
|
@ -0,0 +1,107 @@
|
|||
# @TEST-DOC: Send any values and observe behavior using broker.
|
||||
#
|
||||
# @TEST-PORT: BROKER_PORT1
|
||||
# @TEST-PORT: BROKER_PORT2
|
||||
#
|
||||
# @TEST-EXEC: zeek -b --parse-only common.zeek manager.zeek worker.zeek
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek"
|
||||
# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek"
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-wait 10
|
||||
# @TEST-EXEC: btest-diff ./manager/.stdout
|
||||
# @TEST-EXEC: btest-diff ./worker-1/.stdout
|
||||
|
||||
@TEST-START-FILE cluster-layout.zeek
|
||||
redef Cluster::nodes = {
|
||||
["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))],
|
||||
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager"],
|
||||
};
|
||||
@TEST-END-FILE
|
||||
|
||||
# @TEST-START-FILE common.zeek
|
||||
redef Log::default_rotation_interval = 0sec;
|
||||
|
||||
global finish: event() &is_used;
|
||||
global ping: event(c: count, what: string, val: any) &is_used;
|
||||
global pong: event(c: count, what: string, val: any) &is_used;
|
||||
# @TEST-END-FILE
|
||||
|
||||
# @TEST-START-FILE manager.zeek
|
||||
@load ./common.zeek
|
||||
|
||||
global i = 0;
|
||||
global pongs = 0;
|
||||
|
||||
event send_any()
|
||||
{
|
||||
if ( i > 4 )
|
||||
return;
|
||||
|
||||
local val: any;
|
||||
if ( i == 0 )
|
||||
val = 1;
|
||||
else if ( i == 1 )
|
||||
val = "a string";
|
||||
else if ( i == 2 )
|
||||
val = 42/tcp;
|
||||
else if ( i == 3 )
|
||||
val = vector(1, 2, 3);
|
||||
else
|
||||
val = double_to_time(42.0);
|
||||
|
||||
print "sending pings", i, type_name(val), val;
|
||||
Cluster::publish_hrw(Cluster::worker_pool, cat(i), ping, i, type_name(val), val);
|
||||
Broker::publish(Cluster::worker_topic, ping, i, type_name(val), val);
|
||||
local e = Cluster::make_event(ping, i, type_name(val), val);
|
||||
Cluster::publish_hrw(Cluster::worker_pool, cat(i), e);
|
||||
++i;
|
||||
|
||||
schedule 0.05sec { send_any() };
|
||||
}
|
||||
|
||||
event pong(c: count, what: string, val: any)
|
||||
{
|
||||
++pongs;
|
||||
print "got pong", pongs, "for ping", c, what, type_name(val), val;
|
||||
|
||||
# We send 5 pings in 3 different variations and
|
||||
# get 4 one pong for each.
|
||||
if ( pongs == 60 )
|
||||
Cluster::publish(Cluster::worker_topic, finish);
|
||||
}
|
||||
|
||||
event Cluster::node_up(name: string, id: string)
|
||||
{
|
||||
print "node_up", name;
|
||||
schedule 0.1sec { send_any() };
|
||||
}
|
||||
|
||||
event Cluster::node_down(name: string, id: string)
|
||||
{
|
||||
terminate();
|
||||
}
|
||||
# @TEST-END-FILE
|
||||
|
||||
|
||||
# @TEST-START-FILE worker.zeek
|
||||
@load ./common.zeek
|
||||
|
||||
event ping(c: count, what: string, val: any)
|
||||
{
|
||||
print "got ping", c, what, type_name(val), cat(val);
|
||||
Cluster::publish(Cluster::manager_topic, pong, c, what + " (cluster publish)", val);
|
||||
local e = Cluster::make_event(pong, c, what + " (cluster event )", val);
|
||||
Cluster::publish(Cluster::manager_topic, e);
|
||||
|
||||
Broker::publish(Cluster::manager_topic, pong, c, what + " (broker publish )", val);
|
||||
local be = Broker::make_event(pong, c, what + " (broker event )", val);
|
||||
Broker::publish(Cluster::manager_topic, be);
|
||||
}
|
||||
|
||||
event finish()
|
||||
{
|
||||
print "got finish!";
|
||||
terminate();
|
||||
}
|
||||
# @TEST-END-FILE
|
104
testing/btest/cluster/generic/publish-any.zeek
Normal file
104
testing/btest/cluster/generic/publish-any.zeek
Normal file
|
@ -0,0 +1,104 @@
|
|||
# @TEST-DOC: Send any values and observe behavior using zeromq.
|
||||
#
|
||||
# @TEST-REQUIRES: have-zeromq
|
||||
#
|
||||
# @TEST-PORT: XPUB_PORT
|
||||
# @TEST-PORT: XSUB_PORT
|
||||
# @TEST-PORT: LOG_PULL_PORT
|
||||
#
|
||||
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-no-logger.zeek cluster-layout.zeek
|
||||
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
|
||||
#
|
||||
# @TEST-EXEC: zeek -b --parse-only common.zeek manager.zeek worker.zeek
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek"
|
||||
# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek"
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-wait 10
|
||||
# @TEST-EXEC: btest-diff ./manager/.stdout
|
||||
# @TEST-EXEC: btest-diff ./worker-1/.stdout
|
||||
|
||||
# @TEST-START-FILE common.zeek
|
||||
@load ./zeromq-test-bootstrap.zeek
|
||||
|
||||
redef Log::default_rotation_interval = 0sec;
|
||||
|
||||
global finish: event() &is_used;
|
||||
global ping: event(c: count, what: string, val: any) &is_used;
|
||||
global pong: event(c: count, what: string, val: any) &is_used;
|
||||
# @TEST-END-FILE
|
||||
|
||||
# @TEST-START-FILE manager.zeek
|
||||
@load ./common.zeek
|
||||
|
||||
global i = 0;
|
||||
global pongs = 0;
|
||||
|
||||
event send_any()
|
||||
{
|
||||
if ( i > 4 )
|
||||
return;
|
||||
|
||||
local val: any;
|
||||
if ( i == 0 )
|
||||
val = 1;
|
||||
else if ( i == 1 )
|
||||
val = "a string";
|
||||
else if ( i == 2 )
|
||||
val = 42/tcp;
|
||||
else if ( i == 3 )
|
||||
val = vector(1, 2, 3);
|
||||
else
|
||||
val = double_to_time(42.0);
|
||||
|
||||
print "sending pings", i, type_name(val), val;
|
||||
Cluster::publish_hrw(Cluster::worker_pool, cat(i), ping, i, type_name(val), val);
|
||||
Cluster::publish(Cluster::worker_topic, ping, i, type_name(val), val);
|
||||
local e = Cluster::make_event(ping, i, type_name(val), val);
|
||||
Cluster::publish_hrw(Cluster::worker_pool, cat(i), e);
|
||||
++i;
|
||||
|
||||
schedule 0.05sec { send_any() };
|
||||
}
|
||||
|
||||
event pong(c: count, what: string, val: any)
|
||||
{
|
||||
++pongs;
|
||||
print "got pong", pongs, "with", c, what, type_name(val), val;
|
||||
|
||||
# We send 5 pings in 3 different variations and
|
||||
# get two pongs for each.
|
||||
if ( pongs == 30 )
|
||||
Cluster::publish(Cluster::worker_topic, finish);
|
||||
}
|
||||
|
||||
event Cluster::node_up(name: string, id: string)
|
||||
{
|
||||
print "node_up", name;
|
||||
schedule 0.1sec { send_any() };
|
||||
}
|
||||
|
||||
event Cluster::node_down(name: string, id: string)
|
||||
{
|
||||
terminate();
|
||||
}
|
||||
# @TEST-END-FILE
|
||||
|
||||
|
||||
# @TEST-START-FILE worker.zeek
|
||||
@load ./common.zeek
|
||||
|
||||
event ping(c: count, what: string, val: any)
|
||||
{
|
||||
print "got ping", c, what, type_name(val), cat(val);
|
||||
Cluster::publish(Cluster::manager_topic, pong, c, what + " (cluster publish)", val);
|
||||
local e = Cluster::make_event(pong, c, what + " (cluster event )", val);
|
||||
Cluster::publish(Cluster::manager_topic, e);
|
||||
}
|
||||
|
||||
event finish()
|
||||
{
|
||||
print "got finish!";
|
||||
terminate();
|
||||
}
|
||||
# @TEST-END-FILE
|
90
testing/btest/cluster/generic/publish-unspecified.zeek
Normal file
90
testing/btest/cluster/generic/publish-unspecified.zeek
Normal file
|
@ -0,0 +1,90 @@
|
|||
# @TEST-DOC: Startup a manager running the ZeroMQ proxy thread, a worker connects and the manager sends a finish event to terminate the worker.
|
||||
#
|
||||
# @TEST-REQUIRES: have-zeromq
|
||||
#
|
||||
# @TEST-GROUP: cluster-zeromq
|
||||
#
|
||||
# @TEST-PORT: XPUB_PORT
|
||||
# @TEST-PORT: XSUB_PORT
|
||||
# @TEST-PORT: LOG_PULL_PORT
|
||||
#
|
||||
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-simple.zeek cluster-layout.zeek
|
||||
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
|
||||
#
|
||||
# @TEST-EXEC: zeek --parse-only manager.zeek worker.zeek
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek >out"
|
||||
# @TEST-EXEC: btest-bg-run worker "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek >out"
|
||||
#
|
||||
# @TEST-EXEC: btest-bg-wait 30
|
||||
# @TEST-EXEC: btest-diff ./manager/out
|
||||
# @TEST-EXEC: btest-diff ./worker/out
|
||||
|
||||
|
||||
# @TEST-START-FILE common.zeek
|
||||
@load ./zeromq-test-bootstrap
|
||||
|
||||
type ResultTable: table[string] of count;
|
||||
type ResultSet : set[count];
|
||||
|
||||
global ping_table: event(msg: string, t: ResultTable) &is_used;
|
||||
global pong_table: event(msg: string, t: ResultTable) &is_used;
|
||||
|
||||
global ping_set: event(msg: string, s: ResultSet) &is_used;
|
||||
global pong_set: event(msg: string, s: ResultSet) &is_used;
|
||||
|
||||
global finish: event() &is_used;
|
||||
# @TEST-END-FILE
|
||||
|
||||
# @TEST-START-FILE manager.zeek
|
||||
@load ./common.zeek
|
||||
# If a node comes up that isn't us, send it a finish event.
|
||||
event Cluster::node_up(name: string, id: string)
|
||||
{
|
||||
print "node_up", name;
|
||||
Cluster::publish(Cluster::nodeid_topic(id), ping_table, "hello", table());
|
||||
Cluster::publish(Cluster::nodeid_topic(id), ping_set, "hello", set());
|
||||
}
|
||||
|
||||
|
||||
event pong_table(msg: string, t: ResultTable)
|
||||
{
|
||||
print "pong_table", msg, type_name(t), cat(t);
|
||||
}
|
||||
|
||||
event pong_set(msg: string, t: ResultSet)
|
||||
{
|
||||
print "pong_set", msg, type_name(t), cat(t);
|
||||
Cluster::publish(Cluster::worker_topic, finish);
|
||||
}
|
||||
|
||||
# If the worker vanishes, finish the test.
|
||||
event Cluster::node_down(name: string, id: string)
|
||||
{
|
||||
print "node_down", name;
|
||||
terminate();
|
||||
}
|
||||
# @TEST-END-FILE
|
||||
|
||||
# @TEST-START-FILE worker.zeek
|
||||
@load ./common.zeek
|
||||
|
||||
event ping_table(msg: string, t: ResultTable) &is_used
|
||||
{
|
||||
print "ping_table", msg, type_name(t), cat(t);
|
||||
local e = Cluster::make_event(pong_table, msg, table());
|
||||
Cluster::publish(Cluster::manager_topic, e);
|
||||
}
|
||||
|
||||
event ping_set(msg: string, t: ResultSet) &is_used
|
||||
{
|
||||
print "ping_set", msg, type_name(t), cat(t);
|
||||
local e = Cluster::make_event(pong_set, msg, set());
|
||||
Cluster::publish(Cluster::manager_topic, e);
|
||||
}
|
||||
|
||||
event finish()
|
||||
{
|
||||
terminate();
|
||||
}
|
||||
# @TEST-END-FILE
|
|
@ -23,9 +23,9 @@ redef Cluster::backend = Cluster::CLUSTER_BACKEND_BROKER;
|
|||
@TEST-START-FILE cluster-layout.zeek
|
||||
redef Cluster::nodes = {
|
||||
["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))],
|
||||
["logger-1"] = [$node_type=Cluster::LOGGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager"],
|
||||
["proxy-1"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3")), $manager="manager"],
|
||||
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT4")), $manager="manager"],
|
||||
["logger-1"] = [$node_type=Cluster::LOGGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2"))],
|
||||
["proxy-1"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT3"))],
|
||||
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT4"))],
|
||||
};
|
||||
|
||||
@TEST-END-FILE
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue