Change notices to be processed on worker.

In the past they were processed on the manager - which requires big
records to be sent around.

This has a potential of incompatibilities if someone relied on global
state for notice processing.

GH-214
This commit is contained in:
Johanna Amann 2019-06-25 13:29:41 -07:00
parent f810de11fa
commit 3ec9fb0f7f
4 changed files with 98 additions and 91 deletions

5
NEWS
View file

@ -338,6 +338,11 @@ Changed Functionality
passed to any other functions for further processing. The remainder of the passed to any other functions for further processing. The remainder of the
``ocsp_response_bytes`` is unchanged. ``ocsp_response_bytes`` is unchanged.
- For performance reasons, procesing on notices is now always performed on
the node on which the notice is raised. This is in difference to earlier
versions of Zeek, in which notices always were first sent to the manager
and processed there.
Removed Functionality Removed Functionality
--------------------- ---------------------

View file

@ -273,6 +273,18 @@ export {
## identifier: The identifier string of the notice that should be suppressed. ## identifier: The identifier string of the notice that should be suppressed.
global begin_suppression: event(ts: time, suppress_for: interval, note: Type, identifier: string); global begin_suppression: event(ts: time, suppress_for: interval, note: Type, identifier: string);
## This is an internal event that is used to broadcast the begin_suppression
## event over a cluster.
##
## ts: time indicating then when the notice to be suppressed occured.
##
## suppress_for: length of time that this notice should be suppressed.
##
## note: The :zeek:type:`Notice::Type` of the notice.
##
## identifier: The identifier string of the notice that should be suppressed.
global manager_begin_suppression: event(ts: time, suppress_for: interval, note: Type, identifier: string);
## A function to determine if an event is supposed to be suppressed. ## A function to determine if an event is supposed to be suppressed.
## ##
## n: The record containing the notice in question. ## n: The record containing the notice in question.
@ -314,17 +326,8 @@ export {
## rec: The record containing notice data before it is logged. ## rec: The record containing notice data before it is logged.
global log_notice: event(rec: Info); global log_notice: event(rec: Info);
## This is an internal wrapper for the global :zeek:id:`NOTICE` ## This is an internal function to populate policy records.
## function; disregard. global apply_policy: function(n: Notice::Info);
##
## n: The record of notice data.
global internal_NOTICE: function(n: Notice::Info);
## This is the event used to transport notices on the cluster.
##
## n: The notice information to be sent to the cluster manager for
## further processing.
global cluster_notice: event(n: Notice::Info);
} }
module GLOBAL; module GLOBAL;
@ -334,17 +337,11 @@ function NOTICE(n: Notice::Info)
if ( Notice::is_being_suppressed(n) ) if ( Notice::is_being_suppressed(n) )
return; return;
@if ( Cluster::is_enabled() ) # Fill out fields that might be empty and do the policy processing.
if ( Cluster::local_node_type() == Cluster::MANAGER ) Notice::apply_policy(n);
Notice::internal_NOTICE(n);
else # Generate the notice event with the notice.
{ hook Notice::notice(n);
n$peer_name = n$peer_descr = Cluster::node;
Broker::publish(Cluster::manager_topic, Notice::cluster_notice, n);
}
@else
Notice::internal_NOTICE(n);
@endif
} }
module Notice; module Notice;
@ -521,6 +518,9 @@ hook Notice::notice(n: Notice::Info) &priority=-5
n$suppress_for != 0secs ) n$suppress_for != 0secs )
{ {
event Notice::begin_suppression(n$ts, n$suppress_for, n$note, n$identifier); event Notice::begin_suppression(n$ts, n$suppress_for, n$note, n$identifier);
@if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER )
event Notice::manager_begin_suppression(n$ts, n$suppress_for, n$note, n$identifier);
@endif
} }
} }
@ -531,15 +531,27 @@ event Notice::begin_suppression(ts: time, suppress_for: interval, note: Type,
suppressing[note, identifier] = suppress_until; suppressing[note, identifier] = suppress_until;
} }
@if ( Cluster::is_enabled() && Cluster::local_node_type() == Cluster::MANAGER )
event zeek_init() event zeek_init()
{ {
if ( ! Cluster::is_enabled() )
return;
Broker::auto_publish(Cluster::worker_topic, Notice::begin_suppression); Broker::auto_publish(Cluster::worker_topic, Notice::begin_suppression);
Broker::auto_publish(Cluster::proxy_topic, Notice::begin_suppression); Broker::auto_publish(Cluster::proxy_topic, Notice::begin_suppression);
} }
event Notice::manager_begin_suppression(ts: time, suppress_for: interval, note: Type,
identifier: string)
{
event Notice::begin_suppression(ts, suppress_for, note, identifier);
}
@endif
@if ( Cluster::is_enabled() && Cluster::local_node_type() != Cluster::MANAGER )
event zeek_init()
{
Broker::auto_publish(Cluster::manager_topic, Notice::manager_begin_suppression);
}
@endif
function is_being_suppressed(n: Notice::Info): bool function is_being_suppressed(n: Notice::Info): bool
{ {
if ( n?$identifier && [n$note, n$identifier] in suppressing ) if ( n?$identifier && [n$note, n$identifier] in suppressing )
@ -605,6 +617,14 @@ function apply_policy(n: Notice::Info)
if ( ! n?$ts ) if ( ! n?$ts )
n$ts = network_time(); n$ts = network_time();
@if ( Cluster::is_enabled() )
if ( ! n?$peer_name )
n$peer_name = Cluster::node;
if ( ! n?$peer_descr )
n$peer_descr = Cluster::node;
@endif
if ( n?$f ) if ( n?$f )
populate_file_info(n$f, n); populate_file_info(n$f, n);
@ -652,28 +672,4 @@ function apply_policy(n: Notice::Info)
# suppression interval given yet, the default is applied. # suppression interval given yet, the default is applied.
if ( ! n?$suppress_for ) if ( ! n?$suppress_for )
n$suppress_for = default_suppression_interval; n$suppress_for = default_suppression_interval;
# Delete the connection and file records if they're there so we
# aren't sending that to remote machines. It can cause problems
# due to the size of those records.
if ( n?$conn )
delete n$conn;
if ( n?$iconn )
delete n$iconn;
if ( n?$f )
delete n$f;
}
function internal_NOTICE(n: Notice::Info)
{
# Fill out fields that might be empty and do the policy processing.
apply_policy(n);
# Generate the notice event with the notice.
hook Notice::notice(n);
}
event Notice::cluster_notice(n: Notice::Info)
{
NOTICE(n);
} }

View file

@ -33,9 +33,15 @@ event delayed_notice()
NOTICE([$note=Test_Notice, $msg="test notice!"]); NOTICE([$note=Test_Notice, $msg="test notice!"]);
} }
event terminate_me()
{
terminate();
}
event ready() event ready()
{ {
schedule 1secs { delayed_notice() }; schedule 1secs { delayed_notice() };
schedule 2secs { terminate_me() };
} }
@if ( Cluster::local_node_type() == Cluster::MANAGER ) @if ( Cluster::local_node_type() == Cluster::MANAGER )
@ -50,7 +56,7 @@ event Cluster::node_up(name: string, id: string)
Broker::publish(Cluster::worker_topic, ready); Broker::publish(Cluster::worker_topic, ready);
} }
event Notice::log_notice(rec: Notice::Info) event Cluster::node_down(name: string, id: string)
{ {
terminate(); terminate();
} }