diff --git a/src/broker/messaging.bif b/src/broker/messaging.bif index aba84fe344..96008861de 100644 --- a/src/broker/messaging.bif +++ b/src/broker/messaging.bif @@ -173,83 +173,3 @@ function Broker::__unsubscribe%(topic_prefix: string%): bool auto rval = zeek::broker_mgr->Unsubscribe(topic_prefix->CheckString()); return zeek::val_mgr->Bool(rval); %} - -module Cluster; - -type Cluster::Pool: record; - -## Publishes an event to a node within a pool according to Round-Robin -## distribution strategy. -## -## pool: the pool of nodes that are eligible to receive the event. -## -## key: an arbitrary string to identify the purpose for which you're -## distributing the event. e.g. consider using namespacing of your -## script like "Intel::cluster_rr_key". -## -## args: Either the event arguments as already made by -## :zeek:see:`Broker::make_event` or the argument list to pass along -## to it. -## -## Returns: true if the message is sent. -function Cluster::publish_rr%(pool: Pool, key: string, ...%): bool - %{ - static zeek::Func* topic_func = nullptr; - - if ( ! topic_func ) - topic_func = zeek::detail::global_scope()->Find("Cluster::rr_topic")->GetVal()->AsFunc(); - - if ( ! is_cluster_pool(pool) ) - { - zeek::emit_builtin_error("expected type Cluster::Pool for pool"); - return zeek::val_mgr->False(); - } - - zeek::Args vl{{zeek::NewRef{}, pool}, {zeek::NewRef{}, key}}; - auto topic = topic_func->Invoke(&vl); - - if ( ! topic->AsString()->Len() ) - return zeek::val_mgr->False(); - - auto rval = publish_event_args(ArgsSpan{*@ARGS@}.subspan(2), - topic->AsString(), frame); - return zeek::val_mgr->Bool(rval); - %} - - -## Publishes an event to a node within a pool according to Rendezvous -## (Highest Random Weight) hashing strategy. -## -## pool: the pool of nodes that are eligible to receive the event. -## -## key: data used for input to the hashing function that will uniformly -## distribute keys among available nodes. -## -## args: Either the event arguments as already made by -## :zeek:see:`Broker::make_event` or the argument list to pass along -## to it. -## -## Returns: true if the message is sent. -function Cluster::publish_hrw%(pool: Pool, key: any, ...%): bool - %{ - static zeek::Func* topic_func = nullptr; - - if ( ! topic_func ) - topic_func = zeek::detail::global_scope()->Find("Cluster::hrw_topic")->GetVal()->AsFunc(); - - if ( ! is_cluster_pool(pool) ) - { - zeek::emit_builtin_error("expected type Cluster::Pool for pool"); - return zeek::val_mgr->False(); - } - - zeek::Args vl{{zeek::NewRef{}, pool}, {zeek::NewRef{}, key}}; - auto topic = topic_func->Invoke(&vl); - - if ( ! topic->AsString()->Len() ) - return zeek::val_mgr->False(); - - auto rval = publish_event_args(ArgsSpan{*@ARGS@}.subspan(2), - topic->AsString(), frame); - return zeek::val_mgr->Bool(rval); - %} diff --git a/src/cluster/BifSupport.cc b/src/cluster/BifSupport.cc index 40e62ed29f..871a2dfc2d 100644 --- a/src/cluster/BifSupport.cc +++ b/src/cluster/BifSupport.cc @@ -136,4 +136,13 @@ zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args) { zeek::obj_desc_short(args[0]->GetType().get()).c_str())); return zeek::val_mgr->False(); } + +bool is_cluster_pool(const zeek::Val* pool) { + static zeek::RecordTypePtr pool_type = nullptr; + + if ( ! pool_type ) + pool_type = zeek::id::find_type("Cluster::Pool"); + + return pool->GetType() == pool_type; +} } // namespace zeek::cluster::detail::bif diff --git a/src/cluster/BifSupport.h b/src/cluster/BifSupport.h index 2434482796..b02fc97bd0 100644 --- a/src/cluster/BifSupport.h +++ b/src/cluster/BifSupport.h @@ -44,6 +44,8 @@ zeek::RecordValPtr make_event(zeek::ArgsSpan args); */ zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args); +bool is_cluster_pool(const zeek::Val* pool); + } // namespace cluster::detail::bif } // namespace zeek diff --git a/src/cluster/cluster.bif b/src/cluster/cluster.bif index cdbe5edf9d..2ec1e81c92 100644 --- a/src/cluster/cluster.bif +++ b/src/cluster/cluster.bif @@ -69,3 +69,81 @@ function Cluster::Backend::__init%(%): bool auto rval = zeek::cluster::backend->Init(); return zeek::val_mgr->Bool(rval); %} + +type Cluster::Pool: record; + +## Publishes an event to a node within a pool according to Round-Robin +## distribution strategy. +## +## pool: the pool of nodes that are eligible to receive the event. +## +## key: an arbitrary string to identify the purpose for which you're +## distributing the event. e.g. consider using namespacing of your +## script like "Intel::cluster_rr_key". +## +## args: Either the event arguments as already made by +## :zeek:see:`Cluster::make_event` or the argument list to pass along +## to it. +## +## Returns: true if the message is sent. +function Cluster::publish_rr%(pool: Pool, key: string, ...%): bool + %{ + static zeek::Func* topic_func = nullptr; + + if ( ! topic_func ) + topic_func = zeek::detail::global_scope()->Find("Cluster::rr_topic")->GetVal()->AsFunc(); + + if ( ! is_cluster_pool(pool) ) + { + zeek::emit_builtin_error("expected type Cluster::Pool for pool"); + return zeek::val_mgr->False(); + } + + zeek::Args vl{{zeek::NewRef{}, pool}, {zeek::NewRef{}, key}}; + auto topic = topic_func->Invoke(&vl); + + if ( ! topic->AsString()->Len() ) + return zeek::val_mgr->False(); + + auto args = zeek::ArgsSpan{*@ARGS@}.subspan(2); + return publish_event(topic, args); + %} + + +## Publishes an event to a node within a pool according to Rendezvous +## (Highest Random Weight) hashing strategy. +## +## pool: the pool of nodes that are eligible to receive the event. +## +## key: data used for input to the hashing function that will uniformly +## distribute keys among available nodes. +## +## args: Either the event arguments as already made by +## :zeek:see:`Broker::make_event` or the argument list to pass along +## to it. +## +## Returns: true if the message is sent. +function Cluster::publish_hrw%(pool: Pool, key: any, ...%): bool + %{ + static zeek::Func* topic_func = nullptr; + + if ( ! topic_func ) + topic_func = zeek::detail::global_scope()->Find("Cluster::hrw_topic")->GetVal()->AsFunc(); + + if ( ! is_cluster_pool(pool) ) + { + zeek::emit_builtin_error("expected type Cluster::Pool for pool"); + return zeek::val_mgr->False(); + } + + zeek::Args vl{{zeek::NewRef{}, pool}, {zeek::NewRef{}, key}}; + auto topic = topic_func->Invoke(&vl); + + if ( ! topic->AsString()->Len() ) + return zeek::val_mgr->False(); + + auto args = zeek::ArgsSpan{*@ARGS@}.subspan(2); + + ScriptLocationScope scope{frame}; + return publish_event(topic, args); + %} diff --git a/testing/btest/Baseline/cluster.generic.cluster-publish-errors/.stderr b/testing/btest/Baseline/cluster.generic.cluster-publish-errors/.stderr new file mode 100644 index 0000000000..72b4cada12 --- /dev/null +++ b/testing/btest/Baseline/cluster.generic.cluster-publish-errors/.stderr @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +error in <...>/cluster-publish-errors.zeek, line 58: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish(topic, Cluster::MyEvent())) +error in <...>/cluster-publish-errors.zeek, line 65: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish_hrw(Cluster::proxy_pool, key, Cluster::MyEvent())) +error in <...>/cluster-publish-errors.zeek, line 72: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish_rr(Cluster::proxy_pool, key, Cluster::MyEvent())) diff --git a/testing/btest/Baseline/cluster.generic.cluster-publish-errors/.stdout b/testing/btest/Baseline/cluster.generic.cluster-publish-errors/.stdout new file mode 100644 index 0000000000..53ade358c6 --- /dev/null +++ b/testing/btest/Baseline/cluster.generic.cluster-publish-errors/.stdout @@ -0,0 +1,13 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Broker::make_event with Cluster::publish() +r=, T +Broker::make_event with Cluster::publish_hrw() +r=, T +Broker::make_event with Cluster::publish_rr() +r=, T +Cluster::publish() with wrong event +r=, F +Cluster::publish_hrw() with wrong event +r=, F +Cluster::publish_rr() with wrong event +r=, F diff --git a/testing/btest/Baseline/cluster.generic.cluster-publish-errors/zeromq.err b/testing/btest/Baseline/cluster.generic.cluster-publish-errors/zeromq.err new file mode 100644 index 0000000000..551c87d12e --- /dev/null +++ b/testing/btest/Baseline/cluster.generic.cluster-publish-errors/zeromq.err @@ -0,0 +1,7 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +error in <...>/cluster-publish-errors.zeek, line 30: Publish of Broker::Event record instance with type 'Broker::Event' to a non-Broker backend (Cluster::publish(topic, Cluster::be)) +error in <...>/cluster-publish-errors.zeek, line 39: Publish of Broker::Event record instance with type 'Broker::Event' to a non-Broker backend (Cluster::publish_hrw(Cluster::proxy_pool, key, Cluster::be)) +error in <...>/cluster-publish-errors.zeek, line 47: Publish of Broker::Event record instance with type 'Broker::Event' to a non-Broker backend (Cluster::publish_rr(Cluster::proxy_pool, key, Cluster::be)) +error in <...>/cluster-publish-errors.zeek, line 58: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish(topic, Cluster::MyEvent())) +error in <...>/cluster-publish-errors.zeek, line 65: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish_hrw(Cluster::proxy_pool, key, Cluster::MyEvent())) +error in <...>/cluster-publish-errors.zeek, line 72: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish_rr(Cluster::proxy_pool, key, Cluster::MyEvent())) diff --git a/testing/btest/Baseline/cluster.generic.cluster-publish-errors/zeromq.out b/testing/btest/Baseline/cluster.generic.cluster-publish-errors/zeromq.out new file mode 100644 index 0000000000..5c368f2c83 --- /dev/null +++ b/testing/btest/Baseline/cluster.generic.cluster-publish-errors/zeromq.out @@ -0,0 +1,13 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Broker::make_event with Cluster::publish() +r=, F +Broker::make_event with Cluster::publish_hrw() +r=, F +Broker::make_event with Cluster::publish_rr() +r=, F +Cluster::publish() with wrong event +r=, F +Cluster::publish_hrw() with wrong event +r=, F +Cluster::publish_rr() with wrong event +r=, F diff --git a/testing/btest/cluster/generic/cluster-publish-errors.zeek b/testing/btest/cluster/generic/cluster-publish-errors.zeek new file mode 100644 index 0000000000..5e106012a0 --- /dev/null +++ b/testing/btest/cluster/generic/cluster-publish-errors.zeek @@ -0,0 +1,74 @@ +# @TEST-DOC: Test errors of cluster bifs +# +# @TEST-EXEC: zeek --parse-only -b %INPUT +# @TEST-EXEC: zeek -b %INPUT +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr +# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stdout + +module Cluster; + +event ping1(c: count, how: string) &is_used + { + } + +hook hook1(c: count, how: string) &is_used + { + } + + +event zeek_init() + { + # Fake the pool! + init_pool_node(Cluster::proxy_pool, "proxy-1"); + mark_pool_node_alive(Cluster::proxy_pool, "proxy-1"); + } + +event zeek_init() &priority=-1 + { + print "Broker::make_event with Cluster::publish()"; + local be = Broker::make_event(ping1, 1, "make_event()"); + local r = Cluster::publish("topic", be); + print "r=", r; + } + +event zeek_init() &priority=-2 + { + print "Broker::make_event with Cluster::publish_hrw()"; + + local be = Broker::make_event(ping1, 1, "make_event()"); + local r = Cluster::publish_hrw(Cluster::proxy_pool, "key", be); + print "r=", r; + } + +event zeek_init() &priority=-3 + { + print "Broker::make_event with Cluster::publish_rr()"; + local be = Broker::make_event(ping1, 1, "make_event()"); + local r = Cluster::publish_rr(Cluster::proxy_pool, "key", be); + print "r=", r; + } + +type MyEvent: record { + x: count &default=1; +}; + +event zeek_init() &priority=-4 + { + print "Cluster::publish() with wrong event"; + local r = Cluster::publish("topic", MyEvent()); + print "r=", r; + } + +event zeek_init() &priority=-4 + { + print "Cluster::publish_hrw() with wrong event"; + local r = Cluster::publish_hrw(Cluster::proxy_pool, "key", MyEvent()); + print "r=", r; + } + +event zeek_init() &priority=-4 + { + print "Cluster::publish_rr() with wrong event"; + local r = Cluster::publish_rr(Cluster::proxy_pool, "key", MyEvent()); + print "r=", r; + }