mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
cluster: Fix Cluster::publish() of Broker::Data
The broker serializer leverages the existing data_to_val() function. During unserialization, if the destination type is any, the logic simply wraps the broker::data value into a Broker::Data record. Therefore, events with any parameters are currently exposed to the Broker::Data type. There is a bigger issue in that re-publishing such Broker::Data instances would encode them as a normal record. Explicitly prevent this by serializing the contained data value directly instead, similar to what Broker already did when publishing a record.
This commit is contained in:
parent
271fc15041
commit
d9a74cf32d
9 changed files with 379 additions and 4 deletions
|
@ -539,13 +539,22 @@ std::string Manager::NodeID() const { return to_string(bstate->endpoint.node_id(
|
||||||
bool Manager::DoPublishEvent(const std::string& topic, const cluster::detail::Event& event) {
|
bool Manager::DoPublishEvent(const std::string& topic, const cluster::detail::Event& event) {
|
||||||
broker::vector xs;
|
broker::vector xs;
|
||||||
xs.reserve(event.args.size());
|
xs.reserve(event.args.size());
|
||||||
|
|
||||||
for ( const auto& a : event.args ) {
|
for ( const auto& a : event.args ) {
|
||||||
auto r = detail::val_to_data(a.get());
|
if ( a->GetType() == zeek::BifType::Record::Broker::Data ) {
|
||||||
if ( ! r ) {
|
// When encountering a Broker::Data instance within args, pick out
|
||||||
|
// the broker::data directly to avoid double encoding of the record.
|
||||||
|
const auto& val = a->AsRecordVal()->GetField(0);
|
||||||
|
auto* data_val = static_cast<zeek::Broker::detail::DataVal*>(val.get());
|
||||||
|
xs.emplace_back(data_val->data);
|
||||||
|
}
|
||||||
|
else if ( auto r = detail::val_to_data(a.get()) ) {
|
||||||
|
xs.emplace_back(std::move(r.value()));
|
||||||
|
}
|
||||||
|
else {
|
||||||
Error("Failed to convert %s to broker::data", zeek::obj_desc(a.get()).c_str());
|
Error("Failed to convert %s to broker::data", zeek::obj_desc(a.get()).c_str());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
xs.emplace_back(std::move(r.value()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string name(event.HandlerName());
|
std::string name(event.HandlerName());
|
||||||
|
|
|
@ -21,6 +21,9 @@ std::optional<zeek::cluster::detail::Event> to_cluster_event(const zeek::RecordV
|
||||||
const auto& func = rec->GetField<zeek::FuncVal>(0);
|
const auto& func = rec->GetField<zeek::FuncVal>(0);
|
||||||
const auto& vargs = rec->GetField<zeek::VectorVal>(1);
|
const auto& vargs = rec->GetField<zeek::VectorVal>(1);
|
||||||
|
|
||||||
|
if ( ! func )
|
||||||
|
return std::nullopt;
|
||||||
|
|
||||||
const auto& eh = zeek::event_registry->Lookup(func->AsFuncPtr()->GetName());
|
const auto& eh = zeek::event_registry->Lookup(func->AsFuncPtr()->GetName());
|
||||||
if ( ! eh ) {
|
if ( ! eh ) {
|
||||||
zeek::emit_builtin_error(
|
zeek::emit_builtin_error(
|
||||||
|
|
|
@ -10,6 +10,7 @@
|
||||||
#include "zeek/broker/Data.h"
|
#include "zeek/broker/Data.h"
|
||||||
#include "zeek/cluster/Backend.h"
|
#include "zeek/cluster/Backend.h"
|
||||||
|
|
||||||
|
#include "broker/data.bif.h"
|
||||||
#include "broker/data_envelope.hh"
|
#include "broker/data_envelope.hh"
|
||||||
#include "broker/error.hh"
|
#include "broker/error.hh"
|
||||||
#include "broker/format/json.hh"
|
#include "broker/format/json.hh"
|
||||||
|
@ -30,8 +31,16 @@ namespace {
|
||||||
std::optional<broker::zeek::Event> to_broker_event(const detail::Event& ev) {
|
std::optional<broker::zeek::Event> to_broker_event(const detail::Event& ev) {
|
||||||
broker::vector xs;
|
broker::vector xs;
|
||||||
xs.reserve(ev.args.size());
|
xs.reserve(ev.args.size());
|
||||||
|
|
||||||
for ( const auto& a : ev.args ) {
|
for ( const auto& a : ev.args ) {
|
||||||
if ( auto res = zeek::Broker::detail::val_to_data(a.get()) ) {
|
if ( a->GetType() == zeek::BifType::Record::Broker::Data ) {
|
||||||
|
// When encountering a Broker::Data instance within args, pick out
|
||||||
|
// the broker::data directly to avoid double encoding, Broker::Data.
|
||||||
|
const auto& val = a->AsRecordVal()->GetField(0);
|
||||||
|
auto* data_val = static_cast<zeek::Broker::detail::DataVal*>(val.get());
|
||||||
|
xs.emplace_back(data_val->data);
|
||||||
|
}
|
||||||
|
else if ( auto res = zeek::Broker::detail::val_to_data(a.get()) ) {
|
||||||
xs.emplace_back(std::move(res.value()));
|
xs.emplace_back(std::move(res.value()));
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -85,6 +94,11 @@ std::optional<detail::Event> to_zeek_event(const broker::zeek::Event& ev) {
|
||||||
for ( size_t i = 0; i < args.size(); ++i ) {
|
for ( size_t i = 0; i < args.size(); ++i ) {
|
||||||
const auto& expected_type = arg_types[i];
|
const auto& expected_type = arg_types[i];
|
||||||
auto arg = args[i].to_data();
|
auto arg = args[i].to_data();
|
||||||
|
// XXX: data_to_val() uses Broker::Data for `any` type parameters, exposing
|
||||||
|
// Broker::Data to the script-layer even if Broker isn't used.
|
||||||
|
//
|
||||||
|
// This might be part of the API, but seems we could also use the concrete
|
||||||
|
// Val type if the serializer encodes that information in the message.
|
||||||
auto val = zeek::Broker::detail::data_to_val(arg, expected_type.get());
|
auto val = zeek::Broker::detail::data_to_val(arg, expected_type.get());
|
||||||
if ( val )
|
if ( val )
|
||||||
vl.emplace_back(std::move(val));
|
vl.emplace_back(std::move(val));
|
||||||
|
|
|
@ -0,0 +1,67 @@
|
||||||
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
|
node_up, worker-1
|
||||||
|
sending pings, 0, count, 1
|
||||||
|
got pong, 1, for ping, 0, count (cluster publish), Broker::Data, [data=broker::data{1}]
|
||||||
|
got pong, 2, for ping, 0, count (cluster event ), Broker::Data, [data=broker::data{1}]
|
||||||
|
got pong, 3, for ping, 0, count (broker publish ), Broker::Data, [data=broker::data{1}]
|
||||||
|
got pong, 4, for ping, 0, count (broker event ), Broker::Data, [data=broker::data{1}]
|
||||||
|
got pong, 5, for ping, 0, count (cluster publish), Broker::Data, [data=broker::data{1}]
|
||||||
|
got pong, 6, for ping, 0, count (cluster event ), Broker::Data, [data=broker::data{1}]
|
||||||
|
got pong, 7, for ping, 0, count (broker publish ), Broker::Data, [data=broker::data{1}]
|
||||||
|
got pong, 8, for ping, 0, count (broker event ), Broker::Data, [data=broker::data{1}]
|
||||||
|
got pong, 9, for ping, 0, count (cluster publish), Broker::Data, [data=broker::data{1}]
|
||||||
|
got pong, 10, for ping, 0, count (cluster event ), Broker::Data, [data=broker::data{1}]
|
||||||
|
got pong, 11, for ping, 0, count (broker publish ), Broker::Data, [data=broker::data{1}]
|
||||||
|
got pong, 12, for ping, 0, count (broker event ), Broker::Data, [data=broker::data{1}]
|
||||||
|
sending pings, 1, string, a string
|
||||||
|
got pong, 13, for ping, 1, string (cluster publish), Broker::Data, [data=broker::data{a string}]
|
||||||
|
got pong, 14, for ping, 1, string (cluster event ), Broker::Data, [data=broker::data{a string}]
|
||||||
|
got pong, 15, for ping, 1, string (broker publish ), Broker::Data, [data=broker::data{a string}]
|
||||||
|
got pong, 16, for ping, 1, string (broker event ), Broker::Data, [data=broker::data{a string}]
|
||||||
|
got pong, 17, for ping, 1, string (cluster publish), Broker::Data, [data=broker::data{a string}]
|
||||||
|
got pong, 18, for ping, 1, string (cluster event ), Broker::Data, [data=broker::data{a string}]
|
||||||
|
got pong, 19, for ping, 1, string (broker publish ), Broker::Data, [data=broker::data{a string}]
|
||||||
|
got pong, 20, for ping, 1, string (broker event ), Broker::Data, [data=broker::data{a string}]
|
||||||
|
got pong, 21, for ping, 1, string (cluster publish), Broker::Data, [data=broker::data{a string}]
|
||||||
|
got pong, 22, for ping, 1, string (cluster event ), Broker::Data, [data=broker::data{a string}]
|
||||||
|
got pong, 23, for ping, 1, string (broker publish ), Broker::Data, [data=broker::data{a string}]
|
||||||
|
got pong, 24, for ping, 1, string (broker event ), Broker::Data, [data=broker::data{a string}]
|
||||||
|
sending pings, 2, port, 42/tcp
|
||||||
|
got pong, 25, for ping, 2, port (cluster publish), Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got pong, 26, for ping, 2, port (cluster event ), Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got pong, 27, for ping, 2, port (broker publish ), Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got pong, 28, for ping, 2, port (broker event ), Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got pong, 29, for ping, 2, port (cluster publish), Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got pong, 30, for ping, 2, port (cluster event ), Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got pong, 31, for ping, 2, port (broker publish ), Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got pong, 32, for ping, 2, port (broker event ), Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got pong, 33, for ping, 2, port (cluster publish), Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got pong, 34, for ping, 2, port (cluster event ), Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got pong, 35, for ping, 2, port (broker publish ), Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got pong, 36, for ping, 2, port (broker event ), Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
sending pings, 3, vector of count, [1, 2, 3]
|
||||||
|
got pong, 37, for ping, 3, vector of count (cluster publish), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got pong, 38, for ping, 3, vector of count (cluster event ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got pong, 39, for ping, 3, vector of count (broker publish ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got pong, 40, for ping, 3, vector of count (broker event ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got pong, 41, for ping, 3, vector of count (cluster publish), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got pong, 42, for ping, 3, vector of count (cluster event ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got pong, 43, for ping, 3, vector of count (broker publish ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got pong, 44, for ping, 3, vector of count (broker event ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got pong, 45, for ping, 3, vector of count (cluster publish), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got pong, 46, for ping, 3, vector of count (cluster event ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got pong, 47, for ping, 3, vector of count (broker publish ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got pong, 48, for ping, 3, vector of count (broker event ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
sending pings, 4, time, 42.0
|
||||||
|
got pong, 49, for ping, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got pong, 50, for ping, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got pong, 51, for ping, 4, time (broker publish ), Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got pong, 52, for ping, 4, time (broker event ), Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got pong, 53, for ping, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got pong, 54, for ping, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got pong, 55, for ping, 4, time (broker publish ), Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got pong, 56, for ping, 4, time (broker event ), Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got pong, 57, for ping, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got pong, 58, for ping, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got pong, 59, for ping, 4, time (broker publish ), Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got pong, 60, for ping, 4, time (broker event ), Broker::Data, [data=broker::data{42000000000ns}]
|
|
@ -0,0 +1,17 @@
|
||||||
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
|
got ping, 0, count, Broker::Data, [data=broker::data{1}]
|
||||||
|
got ping, 0, count, Broker::Data, [data=broker::data{1}]
|
||||||
|
got ping, 0, count, Broker::Data, [data=broker::data{1}]
|
||||||
|
got ping, 1, string, Broker::Data, [data=broker::data{a string}]
|
||||||
|
got ping, 1, string, Broker::Data, [data=broker::data{a string}]
|
||||||
|
got ping, 1, string, Broker::Data, [data=broker::data{a string}]
|
||||||
|
got ping, 2, port, Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got ping, 2, port, Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got ping, 2, port, Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got ping, 3, vector of count, Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got ping, 3, vector of count, Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got ping, 3, vector of count, Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got finish!
|
|
@ -0,0 +1,37 @@
|
||||||
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
|
node_up, worker-1
|
||||||
|
sending pings, 0, count, 1
|
||||||
|
got pong, 1, with, 0, count (cluster publish), Broker::Data, [data=broker::data{1}]
|
||||||
|
got pong, 2, with, 0, count (cluster event ), Broker::Data, [data=broker::data{1}]
|
||||||
|
got pong, 3, with, 0, count (cluster publish), Broker::Data, [data=broker::data{1}]
|
||||||
|
got pong, 4, with, 0, count (cluster event ), Broker::Data, [data=broker::data{1}]
|
||||||
|
got pong, 5, with, 0, count (cluster publish), Broker::Data, [data=broker::data{1}]
|
||||||
|
got pong, 6, with, 0, count (cluster event ), Broker::Data, [data=broker::data{1}]
|
||||||
|
sending pings, 1, string, a string
|
||||||
|
got pong, 7, with, 1, string (cluster publish), Broker::Data, [data=broker::data{a string}]
|
||||||
|
got pong, 8, with, 1, string (cluster event ), Broker::Data, [data=broker::data{a string}]
|
||||||
|
got pong, 9, with, 1, string (cluster publish), Broker::Data, [data=broker::data{a string}]
|
||||||
|
got pong, 10, with, 1, string (cluster event ), Broker::Data, [data=broker::data{a string}]
|
||||||
|
got pong, 11, with, 1, string (cluster publish), Broker::Data, [data=broker::data{a string}]
|
||||||
|
got pong, 12, with, 1, string (cluster event ), Broker::Data, [data=broker::data{a string}]
|
||||||
|
sending pings, 2, port, 42/tcp
|
||||||
|
got pong, 13, with, 2, port (cluster publish), Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got pong, 14, with, 2, port (cluster event ), Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got pong, 15, with, 2, port (cluster publish), Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got pong, 16, with, 2, port (cluster event ), Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got pong, 17, with, 2, port (cluster publish), Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got pong, 18, with, 2, port (cluster event ), Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
sending pings, 3, vector of count, [1, 2, 3]
|
||||||
|
got pong, 19, with, 3, vector of count (cluster publish), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got pong, 20, with, 3, vector of count (cluster event ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got pong, 21, with, 3, vector of count (cluster publish), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got pong, 22, with, 3, vector of count (cluster event ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got pong, 23, with, 3, vector of count (cluster publish), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got pong, 24, with, 3, vector of count (cluster event ), Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
sending pings, 4, time, 42.0
|
||||||
|
got pong, 25, with, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got pong, 26, with, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got pong, 27, with, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got pong, 28, with, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got pong, 29, with, 4, time (cluster publish), Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got pong, 30, with, 4, time (cluster event ), Broker::Data, [data=broker::data{42000000000ns}]
|
|
@ -0,0 +1,17 @@
|
||||||
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
|
got ping, 0, count, Broker::Data, [data=broker::data{1}]
|
||||||
|
got ping, 0, count, Broker::Data, [data=broker::data{1}]
|
||||||
|
got ping, 0, count, Broker::Data, [data=broker::data{1}]
|
||||||
|
got ping, 1, string, Broker::Data, [data=broker::data{a string}]
|
||||||
|
got ping, 1, string, Broker::Data, [data=broker::data{a string}]
|
||||||
|
got ping, 1, string, Broker::Data, [data=broker::data{a string}]
|
||||||
|
got ping, 2, port, Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got ping, 2, port, Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got ping, 2, port, Broker::Data, [data=broker::data{42/tcp}]
|
||||||
|
got ping, 3, vector of count, Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got ping, 3, vector of count, Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got ping, 3, vector of count, Broker::Data, [data=broker::data{(1, 2, 3)}]
|
||||||
|
got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got ping, 4, time, Broker::Data, [data=broker::data{42000000000ns}]
|
||||||
|
got finish!
|
107
testing/btest/cluster/broker/publish-any.zeek
Normal file
107
testing/btest/cluster/broker/publish-any.zeek
Normal file
|
@ -0,0 +1,107 @@
|
||||||
|
# @TEST-DOC: Send any values and observe behavior using broker.
|
||||||
|
#
|
||||||
|
# @TEST-PORT: BROKER_PORT1
|
||||||
|
# @TEST-PORT: BROKER_PORT2
|
||||||
|
#
|
||||||
|
# @TEST-EXEC: zeek -b --parse-only common.zeek manager.zeek worker.zeek
|
||||||
|
#
|
||||||
|
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek"
|
||||||
|
# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek"
|
||||||
|
#
|
||||||
|
# @TEST-EXEC: btest-bg-wait 10
|
||||||
|
# @TEST-EXEC: btest-diff ./manager/.stdout
|
||||||
|
# @TEST-EXEC: btest-diff ./worker-1/.stdout
|
||||||
|
|
||||||
|
@TEST-START-FILE cluster-layout.zeek
|
||||||
|
redef Cluster::nodes = {
|
||||||
|
["manager"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT1"))],
|
||||||
|
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=to_port(getenv("BROKER_PORT2")), $manager="manager"],
|
||||||
|
};
|
||||||
|
@TEST-END-FILE
|
||||||
|
|
||||||
|
# @TEST-START-FILE common.zeek
|
||||||
|
redef Log::default_rotation_interval = 0sec;
|
||||||
|
|
||||||
|
global finish: event() &is_used;
|
||||||
|
global ping: event(c: count, what: string, val: any) &is_used;
|
||||||
|
global pong: event(c: count, what: string, val: any) &is_used;
|
||||||
|
# @TEST-END-FILE
|
||||||
|
|
||||||
|
# @TEST-START-FILE manager.zeek
|
||||||
|
@load ./common.zeek
|
||||||
|
|
||||||
|
global i = 0;
|
||||||
|
global pongs = 0;
|
||||||
|
|
||||||
|
event send_any()
|
||||||
|
{
|
||||||
|
if ( i > 4 )
|
||||||
|
return;
|
||||||
|
|
||||||
|
local val: any;
|
||||||
|
if ( i == 0 )
|
||||||
|
val = 1;
|
||||||
|
else if ( i == 1 )
|
||||||
|
val = "a string";
|
||||||
|
else if ( i == 2 )
|
||||||
|
val = 42/tcp;
|
||||||
|
else if ( i == 3 )
|
||||||
|
val = vector(1, 2, 3);
|
||||||
|
else
|
||||||
|
val = double_to_time(42.0);
|
||||||
|
|
||||||
|
print "sending pings", i, type_name(val), val;
|
||||||
|
Cluster::publish_hrw(Cluster::worker_pool, cat(i), ping, i, type_name(val), val);
|
||||||
|
Broker::publish(Cluster::worker_topic, ping, i, type_name(val), val);
|
||||||
|
local e = Cluster::make_event(ping, i, type_name(val), val);
|
||||||
|
Cluster::publish_hrw(Cluster::worker_pool, cat(i), e);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
schedule 0.05sec { send_any() };
|
||||||
|
}
|
||||||
|
|
||||||
|
event pong(c: count, what: string, val: any)
|
||||||
|
{
|
||||||
|
++pongs;
|
||||||
|
print "got pong", pongs, "for ping", c, what, type_name(val), val;
|
||||||
|
|
||||||
|
# We send 5 pings in 3 different variations and
|
||||||
|
# get 4 one pong for each.
|
||||||
|
if ( pongs == 60 )
|
||||||
|
Cluster::publish(Cluster::worker_topic, finish);
|
||||||
|
}
|
||||||
|
|
||||||
|
event Cluster::node_up(name: string, id: string)
|
||||||
|
{
|
||||||
|
print "node_up", name;
|
||||||
|
schedule 0.1sec { send_any() };
|
||||||
|
}
|
||||||
|
|
||||||
|
event Cluster::node_down(name: string, id: string)
|
||||||
|
{
|
||||||
|
terminate();
|
||||||
|
}
|
||||||
|
# @TEST-END-FILE
|
||||||
|
|
||||||
|
|
||||||
|
# @TEST-START-FILE worker.zeek
|
||||||
|
@load ./common.zeek
|
||||||
|
|
||||||
|
event ping(c: count, what: string, val: any)
|
||||||
|
{
|
||||||
|
print "got ping", c, what, type_name(val), cat(val);
|
||||||
|
Cluster::publish(Cluster::manager_topic, pong, c, what + " (cluster publish)", val);
|
||||||
|
local e = Cluster::make_event(pong, c, what + " (cluster event )", val);
|
||||||
|
Cluster::publish(Cluster::manager_topic, e);
|
||||||
|
|
||||||
|
Broker::publish(Cluster::manager_topic, pong, c, what + " (broker publish )", val);
|
||||||
|
local be = Broker::make_event(pong, c, what + " (broker event )", val);
|
||||||
|
Broker::publish(Cluster::manager_topic, be);
|
||||||
|
}
|
||||||
|
|
||||||
|
event finish()
|
||||||
|
{
|
||||||
|
print "got finish!";
|
||||||
|
terminate();
|
||||||
|
}
|
||||||
|
# @TEST-END-FILE
|
104
testing/btest/cluster/generic/publish-any.zeek
Normal file
104
testing/btest/cluster/generic/publish-any.zeek
Normal file
|
@ -0,0 +1,104 @@
|
||||||
|
# @TEST-DOC: Send any values and observe behavior using zeromq.
|
||||||
|
#
|
||||||
|
# @TEST-REQUIRES: have-zeromq
|
||||||
|
#
|
||||||
|
# @TEST-PORT: XPUB_PORT
|
||||||
|
# @TEST-PORT: XSUB_PORT
|
||||||
|
# @TEST-PORT: LOG_PULL_PORT
|
||||||
|
#
|
||||||
|
# @TEST-EXEC: cp $FILES/zeromq/cluster-layout-no-logger.zeek cluster-layout.zeek
|
||||||
|
# @TEST-EXEC: cp $FILES/zeromq/test-bootstrap.zeek zeromq-test-bootstrap.zeek
|
||||||
|
#
|
||||||
|
# @TEST-EXEC: zeek -b --parse-only common.zeek manager.zeek worker.zeek
|
||||||
|
#
|
||||||
|
# @TEST-EXEC: btest-bg-run manager "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=manager zeek -b ../manager.zeek"
|
||||||
|
# @TEST-EXEC: btest-bg-run worker-1 "ZEEKPATH=$ZEEKPATH:.. && CLUSTER_NODE=worker-1 zeek -b ../worker.zeek"
|
||||||
|
#
|
||||||
|
# @TEST-EXEC: btest-bg-wait 10
|
||||||
|
# @TEST-EXEC: btest-diff ./manager/.stdout
|
||||||
|
# @TEST-EXEC: btest-diff ./worker-1/.stdout
|
||||||
|
|
||||||
|
# @TEST-START-FILE common.zeek
|
||||||
|
@load ./zeromq-test-bootstrap.zeek
|
||||||
|
|
||||||
|
redef Log::default_rotation_interval = 0sec;
|
||||||
|
|
||||||
|
global finish: event() &is_used;
|
||||||
|
global ping: event(c: count, what: string, val: any) &is_used;
|
||||||
|
global pong: event(c: count, what: string, val: any) &is_used;
|
||||||
|
# @TEST-END-FILE
|
||||||
|
|
||||||
|
# @TEST-START-FILE manager.zeek
|
||||||
|
@load ./common.zeek
|
||||||
|
|
||||||
|
global i = 0;
|
||||||
|
global pongs = 0;
|
||||||
|
|
||||||
|
event send_any()
|
||||||
|
{
|
||||||
|
if ( i > 4 )
|
||||||
|
return;
|
||||||
|
|
||||||
|
local val: any;
|
||||||
|
if ( i == 0 )
|
||||||
|
val = 1;
|
||||||
|
else if ( i == 1 )
|
||||||
|
val = "a string";
|
||||||
|
else if ( i == 2 )
|
||||||
|
val = 42/tcp;
|
||||||
|
else if ( i == 3 )
|
||||||
|
val = vector(1, 2, 3);
|
||||||
|
else
|
||||||
|
val = double_to_time(42.0);
|
||||||
|
|
||||||
|
print "sending pings", i, type_name(val), val;
|
||||||
|
Cluster::publish_hrw(Cluster::worker_pool, cat(i), ping, i, type_name(val), val);
|
||||||
|
Cluster::publish(Cluster::worker_topic, ping, i, type_name(val), val);
|
||||||
|
local e = Cluster::make_event(ping, i, type_name(val), val);
|
||||||
|
Cluster::publish_hrw(Cluster::worker_pool, cat(i), e);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
schedule 0.05sec { send_any() };
|
||||||
|
}
|
||||||
|
|
||||||
|
event pong(c: count, what: string, val: any)
|
||||||
|
{
|
||||||
|
++pongs;
|
||||||
|
print "got pong", pongs, "with", c, what, type_name(val), val;
|
||||||
|
|
||||||
|
# We send 5 pings in 3 different variations and
|
||||||
|
# get two pongs for each.
|
||||||
|
if ( pongs == 30 )
|
||||||
|
Cluster::publish(Cluster::worker_topic, finish);
|
||||||
|
}
|
||||||
|
|
||||||
|
event Cluster::node_up(name: string, id: string)
|
||||||
|
{
|
||||||
|
print "node_up", name;
|
||||||
|
schedule 0.1sec { send_any() };
|
||||||
|
}
|
||||||
|
|
||||||
|
event Cluster::node_down(name: string, id: string)
|
||||||
|
{
|
||||||
|
terminate();
|
||||||
|
}
|
||||||
|
# @TEST-END-FILE
|
||||||
|
|
||||||
|
|
||||||
|
# @TEST-START-FILE worker.zeek
|
||||||
|
@load ./common.zeek
|
||||||
|
|
||||||
|
event ping(c: count, what: string, val: any)
|
||||||
|
{
|
||||||
|
print "got ping", c, what, type_name(val), cat(val);
|
||||||
|
Cluster::publish(Cluster::manager_topic, pong, c, what + " (cluster publish)", val);
|
||||||
|
local e = Cluster::make_event(pong, c, what + " (cluster event )", val);
|
||||||
|
Cluster::publish(Cluster::manager_topic, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
event finish()
|
||||||
|
{
|
||||||
|
print "got finish!";
|
||||||
|
terminate();
|
||||||
|
}
|
||||||
|
# @TEST-END-FILE
|
Loading…
Add table
Add a link
Reference in a new issue