cluster: Move publish_hrw() and publish_rr() to cluster.bif

From this point on, Cluster::publish_hrw() and Cluster::publish_rr()
go through cluster/Backend.cc code.
This commit is contained in:
Arne Welzel 2024-11-15 13:54:20 +01:00
parent 79a71357c7
commit 210b54799e
9 changed files with 200 additions and 80 deletions

View file

@ -173,83 +173,3 @@ function Broker::__unsubscribe%(topic_prefix: string%): bool
auto rval = zeek::broker_mgr->Unsubscribe(topic_prefix->CheckString());
return zeek::val_mgr->Bool(rval);
%}
module Cluster;
type Cluster::Pool: record;
## Publishes an event to a node within a pool according to Round-Robin
## distribution strategy.
##
## pool: the pool of nodes that are eligible to receive the event.
##
## key: an arbitrary string to identify the purpose for which you're
## distributing the event. e.g. consider using namespacing of your
## script like "Intel::cluster_rr_key".
##
## args: Either the event arguments as already made by
## :zeek:see:`Broker::make_event` or the argument list to pass along
## to it.
##
## Returns: true if the message is sent.
function Cluster::publish_rr%(pool: Pool, key: string, ...%): bool
%{
static zeek::Func* topic_func = nullptr;
if ( ! topic_func )
topic_func = zeek::detail::global_scope()->Find("Cluster::rr_topic")->GetVal()->AsFunc();
if ( ! is_cluster_pool(pool) )
{
zeek::emit_builtin_error("expected type Cluster::Pool for pool");
return zeek::val_mgr->False();
}
zeek::Args vl{{zeek::NewRef{}, pool}, {zeek::NewRef{}, key}};
auto topic = topic_func->Invoke(&vl);
if ( ! topic->AsString()->Len() )
return zeek::val_mgr->False();
auto rval = publish_event_args(ArgsSpan{*@ARGS@}.subspan(2),
topic->AsString(), frame);
return zeek::val_mgr->Bool(rval);
%}
## Publishes an event to a node within a pool according to Rendezvous
## (Highest Random Weight) hashing strategy.
##
## pool: the pool of nodes that are eligible to receive the event.
##
## key: data used for input to the hashing function that will uniformly
## distribute keys among available nodes.
##
## args: Either the event arguments as already made by
## :zeek:see:`Broker::make_event` or the argument list to pass along
## to it.
##
## Returns: true if the message is sent.
function Cluster::publish_hrw%(pool: Pool, key: any, ...%): bool
%{
static zeek::Func* topic_func = nullptr;
if ( ! topic_func )
topic_func = zeek::detail::global_scope()->Find("Cluster::hrw_topic")->GetVal()->AsFunc();
if ( ! is_cluster_pool(pool) )
{
zeek::emit_builtin_error("expected type Cluster::Pool for pool");
return zeek::val_mgr->False();
}
zeek::Args vl{{zeek::NewRef{}, pool}, {zeek::NewRef{}, key}};
auto topic = topic_func->Invoke(&vl);
if ( ! topic->AsString()->Len() )
return zeek::val_mgr->False();
auto rval = publish_event_args(ArgsSpan{*@ARGS@}.subspan(2),
topic->AsString(), frame);
return zeek::val_mgr->Bool(rval);
%}

View file

@ -136,4 +136,13 @@ zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args) {
zeek::obj_desc_short(args[0]->GetType().get()).c_str()));
return zeek::val_mgr->False();
}
bool is_cluster_pool(const zeek::Val* pool) {
static zeek::RecordTypePtr pool_type = nullptr;
if ( ! pool_type )
pool_type = zeek::id::find_type<zeek::RecordType>("Cluster::Pool");
return pool->GetType() == pool_type;
}
} // namespace zeek::cluster::detail::bif

View file

@ -44,6 +44,8 @@ zeek::RecordValPtr make_event(zeek::ArgsSpan args);
*/
zeek::ValPtr publish_event(const zeek::ValPtr& topic, zeek::ArgsSpan args);
bool is_cluster_pool(const zeek::Val* pool);
} // namespace cluster::detail::bif
} // namespace zeek

View file

@ -69,3 +69,81 @@ function Cluster::Backend::__init%(%): bool
auto rval = zeek::cluster::backend->Init();
return zeek::val_mgr->Bool(rval);
%}
type Cluster::Pool: record;
## Publishes an event to a node within a pool according to Round-Robin
## distribution strategy.
##
## pool: the pool of nodes that are eligible to receive the event.
##
## key: an arbitrary string to identify the purpose for which you're
## distributing the event. e.g. consider using namespacing of your
## script like "Intel::cluster_rr_key".
##
## args: Either the event arguments as already made by
## :zeek:see:`Cluster::make_event` or the argument list to pass along
## to it.
##
## Returns: true if the message is sent.
function Cluster::publish_rr%(pool: Pool, key: string, ...%): bool
%{
static zeek::Func* topic_func = nullptr;
if ( ! topic_func )
topic_func = zeek::detail::global_scope()->Find("Cluster::rr_topic")->GetVal()->AsFunc();
if ( ! is_cluster_pool(pool) )
{
zeek::emit_builtin_error("expected type Cluster::Pool for pool");
return zeek::val_mgr->False();
}
zeek::Args vl{{zeek::NewRef{}, pool}, {zeek::NewRef{}, key}};
auto topic = topic_func->Invoke(&vl);
if ( ! topic->AsString()->Len() )
return zeek::val_mgr->False();
auto args = zeek::ArgsSpan{*@ARGS@}.subspan(2);
return publish_event(topic, args);
%}
## Publishes an event to a node within a pool according to Rendezvous
## (Highest Random Weight) hashing strategy.
##
## pool: the pool of nodes that are eligible to receive the event.
##
## key: data used for input to the hashing function that will uniformly
## distribute keys among available nodes.
##
## args: Either the event arguments as already made by
## :zeek:see:`Broker::make_event` or the argument list to pass along
## to it.
##
## Returns: true if the message is sent.
function Cluster::publish_hrw%(pool: Pool, key: any, ...%): bool
%{
static zeek::Func* topic_func = nullptr;
if ( ! topic_func )
topic_func = zeek::detail::global_scope()->Find("Cluster::hrw_topic")->GetVal()->AsFunc();
if ( ! is_cluster_pool(pool) )
{
zeek::emit_builtin_error("expected type Cluster::Pool for pool");
return zeek::val_mgr->False();
}
zeek::Args vl{{zeek::NewRef{}, pool}, {zeek::NewRef{}, key}};
auto topic = topic_func->Invoke(&vl);
if ( ! topic->AsString()->Len() )
return zeek::val_mgr->False();
auto args = zeek::ArgsSpan{*@ARGS@}.subspan(2);
ScriptLocationScope scope{frame};
return publish_event(topic, args);
%}

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
error in <...>/cluster-publish-errors.zeek, line 58: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish(topic, Cluster::MyEvent()))
error in <...>/cluster-publish-errors.zeek, line 65: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish_hrw(Cluster::proxy_pool, key, Cluster::MyEvent()))
error in <...>/cluster-publish-errors.zeek, line 72: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish_rr(Cluster::proxy_pool, key, Cluster::MyEvent()))

View file

@ -0,0 +1,13 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Broker::make_event with Cluster::publish()
r=, T
Broker::make_event with Cluster::publish_hrw()
r=, T
Broker::make_event with Cluster::publish_rr()
r=, T
Cluster::publish() with wrong event
r=, F
Cluster::publish_hrw() with wrong event
r=, F
Cluster::publish_rr() with wrong event
r=, F

View file

@ -0,0 +1,7 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
error in <...>/cluster-publish-errors.zeek, line 30: Publish of Broker::Event record instance with type 'Broker::Event' to a non-Broker backend (Cluster::publish(topic, Cluster::be))
error in <...>/cluster-publish-errors.zeek, line 39: Publish of Broker::Event record instance with type 'Broker::Event' to a non-Broker backend (Cluster::publish_hrw(Cluster::proxy_pool, key, Cluster::be))
error in <...>/cluster-publish-errors.zeek, line 47: Publish of Broker::Event record instance with type 'Broker::Event' to a non-Broker backend (Cluster::publish_rr(Cluster::proxy_pool, key, Cluster::be))
error in <...>/cluster-publish-errors.zeek, line 58: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish(topic, Cluster::MyEvent()))
error in <...>/cluster-publish-errors.zeek, line 65: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish_hrw(Cluster::proxy_pool, key, Cluster::MyEvent()))
error in <...>/cluster-publish-errors.zeek, line 72: Publish of unknown record type 'Cluster::MyEvent' (Cluster::publish_rr(Cluster::proxy_pool, key, Cluster::MyEvent()))

View file

@ -0,0 +1,13 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Broker::make_event with Cluster::publish()
r=, F
Broker::make_event with Cluster::publish_hrw()
r=, F
Broker::make_event with Cluster::publish_rr()
r=, F
Cluster::publish() with wrong event
r=, F
Cluster::publish_hrw() with wrong event
r=, F
Cluster::publish_rr() with wrong event
r=, F

View file

@ -0,0 +1,74 @@
# @TEST-DOC: Test errors of cluster bifs
#
# @TEST-EXEC: zeek --parse-only -b %INPUT
# @TEST-EXEC: zeek -b %INPUT
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stderr
# @TEST-EXEC: TEST_DIFF_CANONIFIER=$SCRIPTS/diff-remove-abspath btest-diff .stdout
module Cluster;
event ping1(c: count, how: string) &is_used
{
}
hook hook1(c: count, how: string) &is_used
{
}
event zeek_init()
{
# Fake the pool!
init_pool_node(Cluster::proxy_pool, "proxy-1");
mark_pool_node_alive(Cluster::proxy_pool, "proxy-1");
}
event zeek_init() &priority=-1
{
print "Broker::make_event with Cluster::publish()";
local be = Broker::make_event(ping1, 1, "make_event()");
local r = Cluster::publish("topic", be);
print "r=", r;
}
event zeek_init() &priority=-2
{
print "Broker::make_event with Cluster::publish_hrw()";
local be = Broker::make_event(ping1, 1, "make_event()");
local r = Cluster::publish_hrw(Cluster::proxy_pool, "key", be);
print "r=", r;
}
event zeek_init() &priority=-3
{
print "Broker::make_event with Cluster::publish_rr()";
local be = Broker::make_event(ping1, 1, "make_event()");
local r = Cluster::publish_rr(Cluster::proxy_pool, "key", be);
print "r=", r;
}
type MyEvent: record {
x: count &default=1;
};
event zeek_init() &priority=-4
{
print "Cluster::publish() with wrong event";
local r = Cluster::publish("topic", MyEvent());
print "r=", r;
}
event zeek_init() &priority=-4
{
print "Cluster::publish_hrw() with wrong event";
local r = Cluster::publish_hrw(Cluster::proxy_pool, "key", MyEvent());
print "r=", r;
}
event zeek_init() &priority=-4
{
print "Cluster::publish_rr() with wrong event";
local r = Cluster::publish_rr(Cluster::proxy_pool, "key", MyEvent());
print "r=", r;
}