mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 22:58:20 +00:00
Expand Broker APIs to allow tracking directionality of peering establishment
This provides ways to figure out for a given peer, or a given address/port pair,
whether the local node originally established the peering.
(cherry picked from commit b430d5235c
)
This commit is contained in:
parent
458b887df1
commit
4372cdfe2a
9 changed files with 123 additions and 1 deletions
|
@ -244,6 +244,10 @@ export {
|
||||||
type PeerInfo: record {
|
type PeerInfo: record {
|
||||||
peer: EndpointInfo;
|
peer: EndpointInfo;
|
||||||
status: PeerStatus;
|
status: PeerStatus;
|
||||||
|
|
||||||
|
## Whether the local node created the peering, as opposed to a
|
||||||
|
## remote establishing it by connecting to us.
|
||||||
|
is_outbound: bool;
|
||||||
};
|
};
|
||||||
|
|
||||||
type PeerInfos: vector of PeerInfo;
|
type PeerInfos: vector of PeerInfo;
|
||||||
|
@ -347,6 +351,16 @@ export {
|
||||||
## TODO: We do not have a function yet to terminate a connection.
|
## TODO: We do not have a function yet to terminate a connection.
|
||||||
global unpeer: function(a: string, p: port): bool;
|
global unpeer: function(a: string, p: port): bool;
|
||||||
|
|
||||||
|
## Whether the local node originally initiated the peering with the
|
||||||
|
## given endpoint.
|
||||||
|
##
|
||||||
|
## a: the address used in previous successful call to :zeek:see:`Broker::peer`.
|
||||||
|
##
|
||||||
|
## p: the port used in previous successful call to :zeek:see:`Broker::peer`.
|
||||||
|
##
|
||||||
|
## Returns:: True if this node initiated the peering.
|
||||||
|
global is_outbound_peering: function(a: string, p: port): bool;
|
||||||
|
|
||||||
## Get a list of all peer connections.
|
## Get a list of all peer connections.
|
||||||
##
|
##
|
||||||
## Returns: a list of all peer connections.
|
## Returns: a list of all peer connections.
|
||||||
|
@ -508,6 +522,11 @@ function unpeer(a: string, p: port): bool
|
||||||
return __unpeer(a, p);
|
return __unpeer(a, p);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function is_outbound_peering(a: string, p: port): bool
|
||||||
|
{
|
||||||
|
return __is_outbound_peering(a, p);
|
||||||
|
}
|
||||||
|
|
||||||
function peers(): vector of PeerInfo
|
function peers(): vector of PeerInfo
|
||||||
{
|
{
|
||||||
return __peers();
|
return __peers();
|
||||||
|
|
|
@ -388,6 +388,7 @@ public:
|
||||||
broker::endpoint endpoint;
|
broker::endpoint endpoint;
|
||||||
broker::subscriber subscriber;
|
broker::subscriber subscriber;
|
||||||
PeerBufferStatePtr peerBufferState;
|
PeerBufferStatePtr peerBufferState;
|
||||||
|
std::unordered_set<broker::network_info> outbound_peerings;
|
||||||
};
|
};
|
||||||
|
|
||||||
const broker::endpoint_info Manager::NoPeer{{}, {}};
|
const broker::endpoint_info Manager::NoPeer{{}, {}};
|
||||||
|
@ -704,6 +705,7 @@ void Manager::Peer(const string& addr, uint16_t port, double retry) {
|
||||||
|
|
||||||
auto secs = broker::timeout::seconds(static_cast<uint64_t>(retry));
|
auto secs = broker::timeout::seconds(static_cast<uint64_t>(retry));
|
||||||
bstate->endpoint.peer_nosync(addr, port, secs);
|
bstate->endpoint.peer_nosync(addr, port, secs);
|
||||||
|
bstate->outbound_peerings.emplace(broker::network_info(addr, port));
|
||||||
|
|
||||||
auto counts_as_iosource = get_option("Broker::peer_counts_as_iosource")->AsBool();
|
auto counts_as_iosource = get_option("Broker::peer_counts_as_iosource")->AsBool();
|
||||||
|
|
||||||
|
@ -735,6 +737,15 @@ void Manager::Unpeer(const string& addr, uint16_t port) {
|
||||||
|
|
||||||
FlushLogBuffers();
|
FlushLogBuffers();
|
||||||
bstate->endpoint.unpeer_nosync(addr, port);
|
bstate->endpoint.unpeer_nosync(addr, port);
|
||||||
|
bstate->outbound_peerings.erase(broker::network_info(addr, port));
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Manager::IsOutboundPeering(const string& addr, uint16_t port) const {
|
||||||
|
return bstate->outbound_peerings.find(broker::network_info(addr, port)) != bstate->outbound_peerings.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Manager::IsOutboundPeering(const broker::network_info& ni) const {
|
||||||
|
return bstate->outbound_peerings.find(ni) != bstate->outbound_peerings.end();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<broker::peer_info> Manager::Peers() const {
|
std::vector<broker::peer_info> Manager::Peers() const {
|
||||||
|
|
|
@ -150,6 +150,21 @@ public:
|
||||||
*/
|
*/
|
||||||
void Unpeer(const std::string& addr, uint16_t port);
|
void Unpeer(const std::string& addr, uint16_t port);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether the local node originally initiated the peering with the
|
||||||
|
* given endpoint.
|
||||||
|
* @param addr the address used in zeek::Broker::Manager::Peer().
|
||||||
|
* @param port the port used in zeek::Broker::Manager::Peer().
|
||||||
|
*/
|
||||||
|
bool IsOutboundPeering(const std::string& addr, uint16_t port) const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether the local node originally initiated the peering with the
|
||||||
|
* given endpoint.
|
||||||
|
* @param ni the address and port used in zeek::Broker::Manager::Peer().
|
||||||
|
*/
|
||||||
|
bool IsOutboundPeering(const broker::network_info& ni) const;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return a list of peer endpoints.
|
* @return a list of peer endpoints.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -194,6 +194,12 @@ function Broker::__unpeer%(a: string, p: port%): bool
|
||||||
return zeek::val_mgr->True();
|
return zeek::val_mgr->True();
|
||||||
%}
|
%}
|
||||||
|
|
||||||
|
function Broker::__is_outbound_peering%(a: string, p: port%): bool
|
||||||
|
%{
|
||||||
|
zeek::Broker::Manager::ScriptScopeGuard ssg;
|
||||||
|
return zeek::val_mgr->Bool(broker_mgr->IsOutboundPeering(a->CheckString(), p->Port()));
|
||||||
|
%}
|
||||||
|
|
||||||
function Broker::__peers%(%): PeerInfos
|
function Broker::__peers%(%): PeerInfos
|
||||||
%{
|
%{
|
||||||
zeek::Broker::Manager::ScriptScopeGuard ssg;
|
zeek::Broker::Manager::ScriptScopeGuard ssg;
|
||||||
|
@ -228,6 +234,15 @@ function Broker::__peers%(%): PeerInfos
|
||||||
peer_info->Assign(0, std::move(endpoint_info));
|
peer_info->Assign(0, std::move(endpoint_info));
|
||||||
peer_info->Assign(1, zeek::BifType::Enum::Broker::PeerStatus->GetEnumVal(ps));
|
peer_info->Assign(1, zeek::BifType::Enum::Broker::PeerStatus->GetEnumVal(ps));
|
||||||
|
|
||||||
|
// Broker has an existing concept of peer flags, see the broker::peer_info
|
||||||
|
// and broker::peer_flags structs. They currently aren't currently, but
|
||||||
|
// we can update the following logic once they are.
|
||||||
|
|
||||||
|
if ( p.peer.network.has_value() )
|
||||||
|
peer_info->Assign(2, zeek::val_mgr->Bool(broker_mgr->IsOutboundPeering(p.peer.network.value())));
|
||||||
|
else
|
||||||
|
peer_info->Assign(2, zeek::val_mgr->False());
|
||||||
|
|
||||||
rval->Assign(i, std::move(peer_info));
|
rval->Assign(i, std::move(peer_info));
|
||||||
++i;
|
++i;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,2 +1,2 @@
|
||||||
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
534 seen BiFs, 0 unseen BiFs (), 0 new BiFs ()
|
535 seen BiFs, 0 unseen BiFs (), 0 new BiFs ()
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
|
peered, this is the outgoing peering: T
|
||||||
|
via Broker::peers(): T
|
||||||
|
after unpeering: F
|
|
@ -0,0 +1,3 @@
|
||||||
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
|
peered, this is the outgoing peering: F
|
||||||
|
via Broker::peers(): F
|
54
testing/btest/broker/peering-directionality.zeek
Normal file
54
testing/btest/broker/peering-directionality.zeek
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
# This tests whether the script-layer can correctly query if a given Broker
|
||||||
|
# peering originated from the local node or from another node that peered with it.
|
||||||
|
#
|
||||||
|
# @TEST-GROUP: broker
|
||||||
|
# @TEST-PORT: BROKER_PORT
|
||||||
|
#
|
||||||
|
# @TEST-EXEC: btest-bg-run client "zeek -b ../client.zeek >out"
|
||||||
|
# @TEST-EXEC: btest-bg-run server "zeek -b ../server.zeek >out"
|
||||||
|
#
|
||||||
|
# @TEST-EXEC: btest-bg-wait 15
|
||||||
|
# @TEST-EXEC: btest-diff client/out
|
||||||
|
# @TEST-EXEC: btest-diff server/out
|
||||||
|
|
||||||
|
# @TEST-START-FILE client.zeek
|
||||||
|
redef exit_only_after_terminate = T;
|
||||||
|
|
||||||
|
event zeek_init()
|
||||||
|
{
|
||||||
|
Broker::subscribe("zeek/event/my_topic");
|
||||||
|
Broker::peer("127.0.0.1", to_port(getenv("BROKER_PORT")));
|
||||||
|
}
|
||||||
|
|
||||||
|
event Broker::peer_added(ep: Broker::EndpointInfo, msg: string)
|
||||||
|
{
|
||||||
|
print fmt("peered, this is the outgoing peering: %s",
|
||||||
|
Broker::is_outbound_peering(ep$network$address, ep$network$bound_port));
|
||||||
|
print fmt("via Broker::peers(): %s", Broker::peers()[0]$is_outbound);
|
||||||
|
|
||||||
|
Broker::unpeer("127.0.0.1", to_port(getenv("BROKER_PORT")));
|
||||||
|
|
||||||
|
print fmt("after unpeering: %s",
|
||||||
|
Broker::is_outbound_peering(ep$network$address, ep$network$bound_port));
|
||||||
|
terminate();
|
||||||
|
}
|
||||||
|
# @TEST-END-FILE
|
||||||
|
|
||||||
|
|
||||||
|
# @TEST-START-FILE server.zeek
|
||||||
|
redef exit_only_after_terminate = T;
|
||||||
|
|
||||||
|
event zeek_init()
|
||||||
|
{
|
||||||
|
Broker::subscribe("zeek/event/my_topic");
|
||||||
|
Broker::listen("127.0.0.1", to_port(getenv("BROKER_PORT")));
|
||||||
|
}
|
||||||
|
|
||||||
|
event Broker::peer_added(ep: Broker::EndpointInfo, msg: string)
|
||||||
|
{
|
||||||
|
print fmt("peered, this is the outgoing peering: %s",
|
||||||
|
Broker::is_outbound_peering(ep$network$address, ep$network$bound_port));
|
||||||
|
print fmt("via Broker::peers(): %s", Broker::peers()[0]$is_outbound);
|
||||||
|
terminate();
|
||||||
|
}
|
||||||
|
# @TEST-END-FILE
|
|
@ -36,6 +36,7 @@ global known_BiFs = set(
|
||||||
"Broker::__insert_into_set",
|
"Broker::__insert_into_set",
|
||||||
"Broker::__insert_into_table",
|
"Broker::__insert_into_table",
|
||||||
"Broker::__is_closed",
|
"Broker::__is_closed",
|
||||||
|
"Broker::__is_outbound_peering",
|
||||||
"Broker::__keys",
|
"Broker::__keys",
|
||||||
"Broker::__listen",
|
"Broker::__listen",
|
||||||
"Broker::__node_id",
|
"Broker::__node_id",
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue