Merge topic/actor-system throug a squashed commit.

This commit is contained in:
Robin Sommer 2018-05-16 23:48:07 +00:00
parent 7a6f5020f6
commit fe7e1ee7f0
466 changed files with 12559 additions and 9655 deletions

View file

@ -1,2 +1,3 @@
@load ./main
@load ./store
@load ./log

View file

@ -0,0 +1,80 @@
@load ./main
module Broker;
export {
## The Broker logging stream identifier.
redef enum Log::ID += { LOG };
## The type of a Broker activity being logged.
type Type: enum {
## An informational status update.
STATUS,
## An error situation.
ERROR
};
## A record type containing the column fields of the Broker log.
type Info: record {
## The network time at which a Broker event occurred.
ts: time &log;
## The type of the Broker event.
ty: Type &log;
## The event being logged.
ev: string &log;
## The peer (if any) with which a Broker event is
## concerned.
peer: NetworkInfo &log &optional;
## An optional message describing the Broker event in more detail
message: string &log &optional;
};
}
event bro_init() &priority=5
{
Log::create_stream(Broker::LOG, [$columns=Info, $path="broker"]);
}
function log_status(ev: string, endpoint: EndpointInfo, msg: string)
{
local r: Info;
r = [$ts = network_time(),
$ev = ev,
$ty = STATUS,
$message = msg];
if ( endpoint?$network )
r$peer = endpoint$network;
Log::write(Broker::LOG, r);
}
event Broker::peer_added(endpoint: EndpointInfo, msg: string)
{
log_status("peer-added", endpoint, msg);
}
event Broker::peer_removed(endpoint: EndpointInfo, msg: string)
{
log_status("peer-removed", endpoint, msg);
}
event Broker::peer_lost(endpoint: EndpointInfo, msg: string)
{
log_status("connection-terminated", endpoint, msg);
}
event Broker::error(code: ErrorCode, msg: string)
{
local ev = cat(code);
ev = subst_string(ev, "Broker::", "");
ev = subst_string(ev, "_", "-");
ev = to_lower(ev);
Log::write(Broker::LOG, [$ts = network_time(),
$ev = ev,
$ty = ERROR,
$message = msg]);
}

View file

@ -1,55 +1,160 @@
##! Various data structure definitions for use with Bro's communication system.
module Log;
export {
type Log::ID: enum {
## Dummy place-holder.
UNKNOWN
};
}
##! The Broker-based communication API and its various options.
module Broker;
export {
## Default port for Broker communication. Where not specified
## otherwise, this is the port to connect to and listen on.
const default_port = 9999/tcp &redef;
## A name used to identify this endpoint to peers.
## Default interval to retry listening on a port if it's currently in
## use already.
const default_listen_retry = 30sec &redef;
## Default address on which to listen.
##
## .. bro:see:: Broker::connect Broker::listen
const endpoint_name = "" &redef;
## .. bro:see:: Broker::listen
const default_listen_address = "" &redef;
## Change communication behavior.
type EndpointFlags: record {
## Whether to restrict message topics that can be published to peers.
auto_publish: bool &default = T;
## Whether to restrict what message topics or data store identifiers
## the local endpoint advertises to peers (e.g. subscribing to
## events or making a master data store available).
auto_advertise: bool &default = T;
## Default interval to retry connecting to a peer if it cannot be made to work
## initially, or if it ever becomes disconnected.
const default_connect_retry = 30sec &redef;
## If false, do not use SSL for network connections. By default, SSL will even
## be used if no certificates / CAs have been configured. In that case
## (which is the default) the communication will be encrypted, but not
## authenticated.
const disable_ssl = F &redef;
## Path to a file containing concatenated trusted certificates
## in PEM format. If set, Bro will require valid certificates forx
## all peers.
const ssl_cafile = "" &redef;
## Path to an OpenSSL-style directory of trusted certificates.
## If set, Bro will require valid certificates forx
## all peers.
const ssl_capath = "" &redef;
## Path to a file containing a X.509 certificate for this
## node in PEM format. If set, Bro will require valid certificates for
## all peers.
const ssl_certificate = "" &redef;
## Passphrase to decrypt the private key specified by
## :bro:see:`Broker::ssl_keyfile`. If set, Bro will require valid
## certificates for all peers.
const ssl_passphrase = "" &redef;
## Path to the file containing the private key for this node's
## certificate. If set, Bro will require valid certificates for
## all peers.
const ssl_keyfile = "" &redef;
## Forward all received messages to subscribing peers.
const forward_messages = F &redef;
## The default topic prefix where logs will be published. The log's stream
## id is appended when writing to a particular stream.
const default_log_topic_prefix = "bro/logs/" &redef;
## The default implementation for :bro:see:`Broker::log_topic`.
function default_log_topic(id: Log::ID, path: string): string
{
return default_log_topic_prefix + cat(id);
}
## A function that will be called for each log entry to determine what
## broker topic string will be used for sending it to peers. The
## default implementation will return a value based on
## :bro:see:`Broker::default_log_topic_prefix`.
##
## id: the ID associated with the log stream entry that will be sent.
##
## path: the path to which the log stream entry will be output.
##
## Returns: a string representing the broker topic to which the log
## will be sent.
const log_topic: function(id: Log::ID, path: string): string = default_log_topic &redef;
type ErrorCode: enum {
## The unspecified default error code.
UNSPECIFIED = 1,
## Version incompatibility.
PEER_INCOMPATIBLE = 2,
## Referenced peer does not exist.
PEER_INVALID = 3,
## Remote peer not listening.
PEER_UNAVAILABLE = 4,
## An peering request timed out.
PEER_TIMEOUT = 5,
## Master with given name already exist.
MASTER_EXISTS = 6,
## Master with given name does not exist.
NO_SUCH_MASTER = 7,
## The given data store key does not exist.
NO_SUCH_KEY = 8,
## The store operation timed out.
REQUEST_TIMEOUT = 9,
## The operation expected a different type than provided
TYPE_CLASH = 10,
## The data value cannot be used to carry out the desired operation.
INVALID_DATA = 11,
## The storage backend failed to execute the operation.
BACKEND_FAILURE = 12,
## The storage backend failed to execute the operation.
STALE_DATA = 13,
## Catch-all for a CAF-level problem.
CAF_ERROR = 100
};
## Fine-grained tuning of communication behavior for a particular message.
type SendFlags: record {
## Send the message to the local endpoint.
self: bool &default = F;
## Send the message to peer endpoints that advertise interest in
## the topic associated with the message.
peers: bool &default = T;
## Send the message to peer endpoints even if they don't advertise
## interest in the topic associated with the message.
unsolicited: bool &default = F;
## The possible states of a peer endpoint.
type PeerStatus: enum {
## The peering process is initiated.
INITIALIZING,
## Connection establishment in process.
CONNECTING,
## Connection established, peering pending.
CONNECTED,
## Successfully peered.
PEERED,
## Connection to remote peer lost.
DISCONNECTED,
## Reconnecting to peer after a lost connection.
RECONNECTING,
};
type NetworkInfo: record {
## The IP address or hostname where the endpoint listens.
address: string &log;
## The port where the endpoint is bound to.
bound_port: port &log;
};
type EndpointInfo: record {
## A unique identifier of the node.
id: string;
## Network-level information.
network: NetworkInfo &optional;
};
type PeerInfo: record {
peer: EndpointInfo;
status: PeerStatus;
};
type PeerInfos: vector of PeerInfo;
## Opaque communication data.
type Data: record {
d: opaque of Broker::Data &optional;
data: opaque of Broker::Data &optional;
};
## Opaque communication data.
## Opaque communication data sequence.
type DataVector: vector of Broker::Data;
## Opaque event communication data.
type EventArgs: record {
type Event: record {
## The name of the event. Not set if invalid event or arguments.
name: string &optional;
## The arguments to the event.
@ -63,52 +168,23 @@ export {
val: Broker::Data;
};
## Enable use of communication.
##
## flags: used to tune the local Broker endpoint behavior.
##
## Returns: true if communication is successfully initialized.
global enable: function(flags: EndpointFlags &default = EndpointFlags()): bool;
## Changes endpoint flags originally supplied to :bro:see:`Broker::enable`.
##
## flags: the new endpoint behavior flags to use.
##
## Returns: true if flags were changed.
global set_endpoint_flags: function(flags: EndpointFlags &default = EndpointFlags()): bool;
## Allow sending messages to peers if associated with the given topic.
## This has no effect if auto publication behavior is enabled via the flags
## supplied to :bro:see:`Broker::enable` or :bro:see:`Broker::set_endpoint_flags`.
##
## topic: a topic to allow messages to be published under.
##
## Returns: true if successful.
global publish_topic: function(topic: string): bool;
## Disallow sending messages to peers if associated with the given topic.
## This has no effect if auto publication behavior is enabled via the flags
## supplied to :bro:see:`Broker::enable` or :bro:see:`Broker::set_endpoint_flags`.
##
## topic: a topic to disallow messages to be published under.
##
## Returns: true if successful.
global unpublish_topic: function(topic: string): bool;
## Listen for remote connections.
##
## p: the TCP port to listen on.
##
## a: an address string on which to accept connections, e.g.
## "127.0.0.1". An empty string refers to @p INADDR_ANY.
##
## reuse: equivalent to behavior of SO_REUSEADDR.
## p: the TCP port to listen on. The value 0 means that the OS should choose
## the next available free port.
##
## Returns: true if the local endpoint is now listening for connections.
## retry: If non-zero, retries listening in regular intervals if the port cannot be
## acquired immediately. 0 disables retries.
##
## .. bro:see:: Broker::incoming_connection_established
global listen: function(p: port, a: string &default = "", reuse: bool &default = T): bool;
## Returns: the bound port or 0/? on failure.
##
## .. bro:see:: Broker::status
global listen: function(a: string &default = default_listen_address,
p: port &default = default_port,
retry: interval &default = default_listen_retry): port;
## Initiate a remote connection.
##
## a: an address to connect to, e.g. "localhost" or "127.0.0.1".
@ -120,63 +196,66 @@ export {
## if it ever becomes disconnected.
##
## Returns: true if it's possible to try connecting with the peer and
## it's a new peer. The actual connection may not be established
## it's a new peer. The actual connection may not be established
## until a later point in time.
##
## .. bro:see:: Broker::outgoing_connection_established
global connect: function(a: string, p: port, retry: interval): bool;
## .. bro:see:: Broker::status
global peer: function(a: string, p: port &default=default_port,
retry: interval &default=default_connect_retry): bool;
## Remove a remote connection.
##
## a: the address used in previous successful call to :bro:see:`Broker::connect`.
## Note that this does not terminate the connection to the peer, it
## just means that we won't exchange any further information with it
## unless peering resumes later.
##
## p: the port used in previous successful call to :bro:see:`Broker::connect`.
## a: the address used in previous successful call to :bro:see:`Broker::peer`.
##
## p: the port used in previous successful call to :bro:see:`Broker::peer`.
##
## Returns: true if the arguments match a previously successful call to
## :bro:see:`Broker::connect`.
global disconnect: function(a: string, p: port): bool;
## :bro:see:`Broker::peer`.
##
## TODO: We do not have a function yet to terminate a connection.
global unpeer: function(a: string, p: port): bool;
## Print a simple message to any interested peers. The receiver can use
## :bro:see:`Broker::print_handler` to handle messages.
## Returns: a list of all peer connections.
global peers: function(): vector of PeerInfo;
## Returns: a unique identifier for the local broker endpoint.
global node_id: function(): string;
## Sends all pending log messages to remote peers. This normally
## doesn't need to be used except for test cases that are time-sensitive.
global flush_logs: function(): count;
## Publishes the value of an identifier to a given topic. The subscribers
## will update their local value for that identifier on receipt.
##
## topic: a topic associated with the printed message.
## topic: a topic associated with the message.
##
## msg: the print message to send to peers.
##
## flags: tune the behavior of how the message is sent.
## id: the identifier to publish.
##
## Returns: true if the message is sent.
global send_print: function(topic: string, msg: string, flags: SendFlags &default = SendFlags()): bool;
global publish_id: function(topic: string, id: string): bool;
## Register interest in all peer print messages that use a certain topic
## prefix. Use :bro:see:`Broker::print_handler` to handle received
## messages.
## Register interest in all peer event messages that use a certain topic
## prefix.
##
## topic_prefix: a prefix to match against remote message topics.
## e.g. an empty prefix matches everything and "a" matches
## "alice" and "amy" but not "bob".
##
## Returns: true if it's a new print subscription and it is now registered.
global subscribe_to_prints: function(topic_prefix: string): bool;
## Returns: true if it's a new event subscription and it is now registered.
global subscribe: function(topic_prefix: string): bool;
## Unregister interest in all peer print messages that use a topic prefix.
## Unregister interest in all peer event messages that use a topic prefix.
##
## topic_prefix: a prefix previously supplied to a successful call to
## :bro:see:`Broker::subscribe_to_prints`.
## :bro:see:`Broker::subscribe`.
##
## Returns: true if interest in the topic prefix is no longer advertised.
global unsubscribe_to_prints: function(topic_prefix: string): bool;
## Send an event to any interested peers.
##
## topic: a topic associated with the event message.
##
## args: event arguments as made by :bro:see:`Broker::event_args`.
##
## flags: tune the behavior of how the message is sent.
##
## Returns: true if the message is sent.
global send_event: function(topic: string, args: EventArgs, flags: SendFlags &default = SendFlags()): bool;
global unsubscribe: function(topic_prefix: string): bool;
## Automatically send an event to any interested peers whenever it is
## locally dispatched (e.g. using "event my_event(...);" in a script).
@ -187,83 +266,18 @@ export {
##
## ev: a Bro event value.
##
## flags: tune the behavior of how the message is sent.
##
## Returns: true if automatic event sending is now enabled.
global auto_event: function(topic: string, ev: any, flags: SendFlags &default = SendFlags()): bool;
global auto_publish: function(topic: string, ev: any): bool;
## Stop automatically sending an event to peers upon local dispatch.
##
## topic: a topic originally given to :bro:see:`Broker::auto_event`.
## topic: a topic originally given to :bro:see:`Broker::auto_publish`.
##
## ev: an event originally given to :bro:see:`Broker::auto_event`.
## ev: an event originally given to :bro:see:`Broker::auto_publish`.
##
## Returns: true if automatic events will not occur for the topic/event
## pair.
global auto_event_stop: function(topic: string, ev: any): bool;
## Register interest in all peer event messages that use a certain topic
## prefix.
##
## topic_prefix: a prefix to match against remote message topics.
## e.g. an empty prefix matches everything and "a" matches
## "alice" and "amy" but not "bob".
##
## Returns: true if it's a new event subscription and it is now registered.
global subscribe_to_events: function(topic_prefix: string): bool;
## Unregister interest in all peer event messages that use a topic prefix.
##
## topic_prefix: a prefix previously supplied to a successful call to
## :bro:see:`Broker::subscribe_to_events`.
##
## Returns: true if interest in the topic prefix is no longer advertised.
global unsubscribe_to_events: function(topic_prefix: string): bool;
## Enable remote logs for a given log stream.
##
## id: the log stream to enable remote logs for.
##
## flags: tune the behavior of how log entry messages are sent.
##
## Returns: true if remote logs are enabled for the stream.
global enable_remote_logs: function(id: Log::ID, flags: SendFlags &default = SendFlags()): bool;
## Disable remote logs for a given log stream.
##
## id: the log stream to disable remote logs for.
##
## Returns: true if remote logs are disabled for the stream.
global disable_remote_logs: function(id: Log::ID): bool;
## Check if remote logs are enabled for a given log stream.
##
## id: the log stream to check.
##
## Returns: true if remote logs are enabled for the given stream.
global remote_logs_enabled: function(id: Log::ID): bool;
## Register interest in all peer log messages that use a certain topic
## prefix. Logs are implicitly sent with topic "bro/log/<stream-name>" and
## the receiving side processes them through the logging framework as usual.
##
## topic_prefix: a prefix to match against remote message topics.
## e.g. an empty prefix matches everything and "a" matches
## "alice" and "amy" but not "bob".
##
## Returns: true if it's a new log subscription and it is now registered.
global subscribe_to_logs: function(topic_prefix: string): bool;
## Unregister interest in all peer log messages that use a topic prefix.
## Logs are implicitly sent with topic "bro/log/<stream-name>" and the
## receiving side processes them through the logging framework as usual.
##
## topic_prefix: a prefix previously supplied to a successful call to
## :bro:see:`Broker::subscribe_to_logs`.
##
## Returns: true if interest in the topic prefix is no longer advertised.
global unsubscribe_to_logs: function(topic_prefix: string): bool;
global auto_unpublish: function(topic: string, ev: any): bool;
}
@load base/bif/comm.bif
@ -271,106 +285,67 @@ export {
module Broker;
@ifdef ( Broker::__enable )
event retry_listen(a: string, p: port, retry: interval)
{
listen(a, p, retry);
}
function enable(flags: EndpointFlags &default = EndpointFlags()) : bool
{
return __enable(flags);
}
function listen(a: string, p: port, retry: interval): port
{
local bound = __listen(a, p);
function set_endpoint_flags(flags: EndpointFlags &default = EndpointFlags()): bool
{
return __set_endpoint_flags(flags);
}
if ( bound == 0/tcp && retry != 0secs )
schedule retry { retry_listen(a, p, retry) };
function publish_topic(topic: string): bool
{
return __publish_topic(topic);
}
return bound;
}
function unpublish_topic(topic: string): bool
{
return __unpublish_topic(topic);
}
function peer(a: string, p: port, retry: interval): bool
{
return __peer(a, p, retry);
}
function listen(p: port, a: string &default = "", reuse: bool &default = T): bool
{
return __listen(p, a, reuse);
}
function unpeer(a: string, p: port): bool
{
return __unpeer(a, p);
}
function connect(a: string, p: port, retry: interval): bool
{
return __connect(a, p, retry);
}
function peers(): vector of PeerInfo
{
return __peers();
}
function disconnect(a: string, p: port): bool
{
return __disconnect(a, p);
}
function node_id(): string
{
return __node_id();
}
function send_print(topic: string, msg: string, flags: SendFlags &default = SendFlags()): bool
{
return __send_print(topic, msg, flags);
}
function flush_logs(): count
{
return __flush_logs();
}
function subscribe_to_prints(topic_prefix: string): bool
{
return __subscribe_to_prints(topic_prefix);
}
function publish_id(topic: string, id: string): bool
{
return __publish_id(topic, id);
}
function unsubscribe_to_prints(topic_prefix: string): bool
{
return __unsubscribe_to_prints(topic_prefix);
}
function subscribe(topic_prefix: string): bool
{
return __subscribe(topic_prefix);
}
function send_event(topic: string, args: EventArgs, flags: SendFlags &default = SendFlags()): bool
{
return __event(topic, args, flags);
}
function unsubscribe(topic_prefix: string): bool
{
return __unsubscribe(topic_prefix);
}
function auto_event(topic: string, ev: any, flags: SendFlags &default = SendFlags()): bool
{
return __auto_event(topic, ev, flags);
}
function auto_publish(topic: string, ev: any): bool
{
return __auto_publish(topic, ev);
}
function auto_event_stop(topic: string, ev: any): bool
{
return __auto_event_stop(topic, ev);
}
function subscribe_to_events(topic_prefix: string): bool
{
return __subscribe_to_events(topic_prefix);
}
function unsubscribe_to_events(topic_prefix: string): bool
{
return __unsubscribe_to_events(topic_prefix);
}
function enable_remote_logs(id: Log::ID, flags: SendFlags &default = SendFlags()): bool
{
return __enable_remote_logs(id, flags);
}
function disable_remote_logs(id: Log::ID): bool
{
return __disable_remote_logs(id);
}
function remote_logs_enabled(id: Log::ID): bool
{
return __remote_logs_enabled(id);
}
function subscribe_to_logs(topic_prefix: string): bool
{
return __subscribe_to_logs(topic_prefix);
}
function unsubscribe_to_logs(topic_prefix: string): bool
{
return __unsubscribe_to_logs(topic_prefix);
}
@endif
function auto_unpublish(topic: string, ev: any): bool
{
return __auto_unpublish(topic, ev);
}

File diff suppressed because it is too large Load diff

View file

@ -1,11 +1,16 @@
# Load the core cluster support.
@load ./main
@load ./pools
@if ( Cluster::is_enabled() )
# Give the node being started up it's peer name.
redef peer_description = Cluster::node;
@if ( Cluster::enable_round_robin_logging )
redef Broker::log_topic = Cluster::rr_log_topic;
@endif
# Add a cluster prefix.
@prefixes += cluster
@ -19,13 +24,6 @@ redef peer_description = Cluster::node;
@load ./setup-connections
# Don't load the listening script until we're a bit more sure that the
# cluster framework is actually being enabled.
@load frameworks/communication/listen
## Set the port that this node is supposed to listen on.
redef Communication::listen_port = Cluster::nodes[Cluster::node]$p;
@if ( Cluster::local_node_type() == Cluster::MANAGER )
@load ./nodes/manager
# If no logger is defined, then the manager receives logs.

View file

@ -7,10 +7,111 @@
##! ``@load base/frameworks/cluster``.
@load base/frameworks/control
@load base/frameworks/broker
module Cluster;
export {
## Whether to distribute log messages among available logging nodes.
const enable_round_robin_logging = T &redef;
## The topic name used for exchanging general messages that are relevant to
## any node in a cluster. Used with broker-enabled cluster communication.
const broadcast_topic = "bro/cluster/broadcast" &redef;
## The topic name used for exchanging messages that are relevant to
## logger nodes in a cluster. Used with broker-enabled cluster communication.
const logger_topic = "bro/cluster/logger" &redef;
## The topic name used for exchanging messages that are relevant to
## manager nodes in a cluster. Used with broker-enabled cluster communication.
const manager_topic = "bro/cluster/manager" &redef;
## The topic name used for exchanging messages that are relevant to
## proxy nodes in a cluster. Used with broker-enabled cluster communication.
const proxy_topic = "bro/cluster/proxy" &redef;
## The topic name used for exchanging messages that are relevant to
## worker nodes in a cluster. Used with broker-enabled cluster communication.
const worker_topic = "bro/cluster/worker" &redef;
## The topic name used for exchanging messages that are relevant to
## time machine nodes in a cluster. Used with broker-enabled cluster communication.
const time_machine_topic = "bro/cluster/time_machine" &redef;
## The topic prefix used for exchanging messages that are relevant to
## a named node in a cluster. Used with broker-enabled cluster communication.
const node_topic_prefix = "bro/cluster/node/" &redef;
## Name of the node on which master data stores will be created if no other
## has already been specified by the user in :bro:see:`Cluster::stores`.
## An empty value means "use whatever name corresponds to the manager
## node".
const default_master_node = "" &redef;
## The type of data store backend that will be used for all data stores if
## no other has already been specified by the user in :bro:see:`Cluster::stores`.
const default_backend = Broker::MEMORY &redef;
## The type of persistent data store backend that will be used for all data
## stores if no other has already been specified by the user in
## :bro:see:`Cluster::stores`. This will be used when script authors call
## :bro:see:`Cluster::create_store` with the *persistent* argument set true.
const default_persistent_backend = Broker::SQLITE &redef;
## Setting a default dir will, for persistent backends that have not
## been given an explicit file path via :bro:see:`Cluster::stores`,
## automatically create a path within this dir that is based on the name of
## the data store.
const default_store_dir = "" &redef;
## Information regarding a cluster-enabled data store.
type StoreInfo: record {
## The name of the data store.
name: string &optional;
## The store handle.
store: opaque of Broker::Store &optional;
## The name of the cluster node on which the master version of the data
## store resides.
master_node: string &default=default_master_node;
## Whether the data store is the master version or a clone.
master: bool &default=F;
## The type of backend used for storing data.
backend: Broker::BackendType &default=default_backend;
## Parameters used for configuring the backend.
options: Broker::BackendOptions &default=Broker::BackendOptions();
## A resync/reconnect interval to pass through to
## :bro:see:`Broker::create_clone`.
clone_resync_interval: interval &default=Broker::default_clone_resync_interval;
## A staleness duration to pass through to
## :bro:see:`Broker::create_clone`.
clone_stale_interval: interval &default=Broker::default_clone_stale_interval;
## A mutation buffer interval to pass through to
## :bro:see:`Broker::create_clone`.
clone_mutation_buffer_interval: interval &default=Broker::default_clone_mutation_buffer_interval;
};
## A table of cluster-enabled data stores that have been created, indexed
## by their name. This table will be populated automatically by
## :bro:see:`Cluster::create_store`, but if you need to customize
## the options related to a particular data store, you may redef this
## table. Calls to :bro:see:`Cluster::create_store` will first check
## the table for an entry of the same name and, if found, will use the
## predefined options there when setting up the store.
global stores: table[string] of StoreInfo &default=StoreInfo() &redef;
## Sets up a cluster-enabled data store. They will also still properly
## function for uses that are not operating a cluster.
##
## name: the name of the data store to create.
##
## persistent: whether the data store must be persistent.
##
## Returns: the store's information. For master stores, the store will be
## ready to use immediately. For clones, the store field will not
## be set until the node containing the master store has connected.
global create_store: function(name: string, persistent: bool &default=F): StoreInfo;
## The cluster logging stream identifier.
redef enum Log::ID += { LOG };
@ -18,6 +119,8 @@ export {
type Info: record {
## The time at which a cluster message was generated.
ts: time;
## The name of the node that is creating the log record.
node: string;
## A message indicating information about the cluster's operation.
message: string;
} &log;
@ -46,43 +149,6 @@ export {
TIME_MACHINE,
};
## Events raised by a manager and handled by the workers.
const manager2worker_events = /Drop::.*/ &redef;
## Events raised by a manager and handled by proxies.
const manager2proxy_events = /EMPTY/ &redef;
## Events raised by a manager and handled by loggers.
const manager2logger_events = /EMPTY/ &redef;
## Events raised by proxies and handled by loggers.
const proxy2logger_events = /EMPTY/ &redef;
## Events raised by proxies and handled by a manager.
const proxy2manager_events = /EMPTY/ &redef;
## Events raised by proxies and handled by workers.
const proxy2worker_events = /EMPTY/ &redef;
## Events raised by workers and handled by loggers.
const worker2logger_events = /EMPTY/ &redef;
## Events raised by workers and handled by a manager.
const worker2manager_events = /(TimeMachine::command|Drop::.*)/ &redef;
## Events raised by workers and handled by proxies.
const worker2proxy_events = /EMPTY/ &redef;
## Events raised by TimeMachine instances and handled by a manager.
const tm2manager_events = /EMPTY/ &redef;
## Events raised by TimeMachine instances and handled by workers.
const tm2worker_events = /EMPTY/ &redef;
## Events sent by the control host (i.e., BroControl) when dynamically
## connecting to a running instance to update settings or request data.
const control_events = Control::controller_events &redef;
## Record type to indicate a node in a cluster.
type Node: record {
## Identifies the type of cluster node in this node's configuration.
@ -92,22 +158,17 @@ export {
## If the *ip* field is a non-global IPv6 address, this field
## can specify a particular :rfc:`4007` ``zone_id``.
zone_id: string &default="";
## The port to which this local node can connect when
## establishing communication.
## The port that this node will listen on for peer connections.
p: port;
## Identifier for the interface a worker is sniffing.
interface: string &optional;
## Name of the logger node this node uses. For manager, proxies and workers.
logger: string &optional;
## Name of the manager node this node uses. For workers and proxies.
manager: string &optional;
## Name of the proxy node this node uses. For workers and managers.
proxy: string &optional;
## Names of worker nodes that this node connects with.
## For managers and proxies.
workers: set[string] &optional;
## Name of a time machine node with which this node connects.
time_machine: string &optional;
## A unique identifier assigned to the node by the broker framework.
## This field is only set while a node is connected.
id: string &optional;
};
## This function can be called at any time to determine if the cluster
@ -134,6 +195,8 @@ export {
## named cluster-layout.bro somewhere in the BROPATH. It will be
## automatically loaded if the CLUSTER_NODE environment variable is set.
## Note that BroControl handles all of this automatically.
## The table is typically indexed by node names/labels (e.g. "manager"
## or "worker-1").
const nodes: table[string] of Node = {} &redef;
## Indicates whether or not the manager will act as the logger and receive
@ -148,8 +211,62 @@ export {
## Interval for retrying failed connections between cluster nodes.
const retry_interval = 1min &redef;
## When using broker-enabled cluster framework, nodes broadcast this event
## to exchange their user-defined name along with a string that uniquely
## identifies it for the duration of its lifetime. This string may change
## if the node dies and has to reconnect later.
global hello: event(name: string, id: string);
## When using broker-enabled cluster framework, this event will be emitted
## locally whenever a cluster node connects or reconnects.
global node_up: event(name: string, id: string);
## When using broker-enabled cluster framework, this event will be emitted
## locally whenever a connected cluster node becomes disconnected.
global node_down: event(name: string, id: string);
## Write a message to the cluster logging stream.
global log: function(msg: string);
## Retrieve the topic associated with a specific node in the cluster.
##
## name: the name of the cluster node (e.g. "manager").
##
## Returns: a topic string that may used to send a message exclusively to
## a given cluster node.
global node_topic: function(name: string): string;
}
type NamedNode: record {
name: string;
node: Node;
};
function nodes_with_type(node_type: NodeType): vector of NamedNode
{
local rval: vector of NamedNode = vector();
local names: vector of string = vector();
for ( name in Cluster::nodes )
names[|names|] = name;
names = sort(names, strcmp);
for ( i in names )
{
name = names[i];
local n = Cluster::nodes[name];
if ( n$node_type != node_type )
next;
rval[|rval|] = NamedNode($name=name, $node=n);
}
return rval;
}
function is_enabled(): bool
{
return (node != "");
@ -160,16 +277,64 @@ function local_node_type(): NodeType
return is_enabled() ? nodes[node]$node_type : NONE;
}
event remote_connection_handshake_done(p: event_peer) &priority=5
function node_topic(name: string): string
{
if ( p$descr in nodes && nodes[p$descr]$node_type == WORKER )
return node_topic_prefix + name;
}
event Cluster::hello(name: string, id: string) &priority=10
{
if ( name !in nodes )
{
Reporter::error(fmt("Got Cluster::hello msg from unexpected node: %s", name));
return;
}
local n = nodes[name];
if ( n?$id )
{
if ( n$id != id )
Reporter::error(fmt("Got Cluster::hello msg from duplicate node:%s",
name));
}
else
event Cluster::node_up(name, id);
n$id = id;
Cluster::log(fmt("got hello from %s (%s)", name, id));
if ( n$node_type == WORKER )
++worker_count;
}
event remote_connection_closed(p: event_peer) &priority=5
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) &priority=10
{
if ( p$descr in nodes && nodes[p$descr]$node_type == WORKER )
--worker_count;
if ( ! Cluster::is_enabled() )
return;
local e = Broker::make_event(Cluster::hello, node, Broker::node_id());
Broker::publish(Cluster::broadcast_topic, e);
}
event Broker::peer_lost(endpoint: Broker::EndpointInfo, msg: string) &priority=10
{
for ( node_name in nodes )
{
local n = nodes[node_name];
if ( n?$id && n$id == endpoint$id )
{
Cluster::log(fmt("node down: %s", node_name));
delete n$id;
if ( n$node_type == WORKER )
--worker_count;
event Cluster::node_down(node_name, endpoint$id);
break;
}
}
}
event bro_init() &priority=5
@ -183,3 +348,90 @@ event bro_init() &priority=5
Log::create_stream(Cluster::LOG, [$columns=Info, $path="cluster"]);
}
function create_store(name: string, persistent: bool &default=F): Cluster::StoreInfo
{
local info = stores[name];
info$name = name;
if ( Cluster::default_store_dir != "" )
{
local default_options = Broker::BackendOptions();
local path = Cluster::default_store_dir + "/" + name;
if ( info$options$sqlite$path == default_options$sqlite$path )
info$options$sqlite$path = path + ".sqlite";
if ( info$options$rocksdb$path == default_options$rocksdb$path )
info$options$rocksdb$path = path + ".rocksdb";
}
if ( persistent )
{
switch ( info$backend ) {
case Broker::MEMORY:
info$backend = Cluster::default_persistent_backend;
break;
case Broker::SQLITE:
fallthrough;
case Broker::ROCKSDB:
# no-op: user already asked for a specific persistent backend.
break;
default:
Reporter::error(fmt("unhandled data store type: %s", info$backend));
break;
}
}
if ( ! Cluster::is_enabled() )
{
if ( info?$store )
{
Reporter::warning(fmt("duplicate cluster store creation for %s", name));
return info;
}
info$store = Broker::create_master(name, info$backend, info$options);
info$master = T;
stores[name] = info;
return info;
}
if ( info$master_node == "" )
{
local mgr_nodes = nodes_with_type(Cluster::MANAGER);
if ( |mgr_nodes| == 0 )
Reporter::fatal(fmt("empty master node name for cluster store " +
"'%s', but there's no manager node to default",
name));
info$master_node = mgr_nodes[0]$name;
}
else if ( info$master_node !in Cluster::nodes )
Reporter::fatal(fmt("master node '%s' for cluster store '%s' does not exist",
info$master_node, name));
if ( Cluster::node == info$master_node )
{
info$store = Broker::create_master(name, info$backend, info$options);
info$master = T;
stores[name] = info;
Cluster::log(fmt("created master store: %s", name));
return info;
}
info$master = F;
stores[name] = info;
info$store = Broker::create_clone(info$name,
info$clone_resync_interval,
info$clone_stale_interval,
info$clone_mutation_buffer_interval);
Cluster::log(fmt("created clone store: %s", info$name));
return info;
}
function log(msg: string)
{
Log::write(Cluster::LOG, [$ts = network_time(), $node = node, $message = msg]);
}

View file

@ -0,0 +1,440 @@
##! Defines an interface for managing pools of cluster nodes. Pools are
##! are useful way to distribute work or data among nodes within a cluster.
@load ./main
@load base/utils/hash_hrw
module Cluster;
export {
## Store state of a cluster within within the context of a work pool.
type PoolNode: record {
## The node name (e.g. "manager").
name: string;
## An alias of *name* used to prevent hashing collisions when creating
## *site_id*.
alias: string;
## A 32-bit unique identifier for the pool node, derived from name/alias.
site_id: count;
## Whether the node is currently alive and can receive work.
alive: bool &default=F;
};
## A pool specification.
type PoolSpec: record {
## A topic string that can be used to reach all nodes within a pool.
topic: string &default = "";
## The type of nodes that are contained within the pool.
node_type: Cluster::NodeType &default = Cluster::PROXY;
## The maximum number of nodes that may belong to the pool.
## If not set, then all available nodes will be added to the pool,
## else the cluster framework will automatically limit the pool
## membership according to the threshhold.
max_nodes: count &optional;
## Whether the pool requires exclusive access to nodes. If true,
## then *max_nodes* nodes will not be assigned to any other pool.
## When using this flag, *max_nodes* must also be set.
exclusive: bool &default = F;
};
type PoolNodeTable: table[string] of PoolNode;
type RoundRobinTable: table[string] of int;
## A pool used for distributing data/work among a set of cluster nodes.
type Pool: record {
## The specification of the pool that was used when registering it.
spec: PoolSpec &default = PoolSpec();
## Nodes in the pool, indexed by their name (e.g. "manager").
nodes: PoolNodeTable &default = PoolNodeTable();
## A list of nodes in the pool in a deterministic order.
node_list: vector of PoolNode &default = vector();
## The Rendezvous hashing structure.
hrw_pool: HashHRW::Pool &default = HashHRW::Pool();
## Round-Robin table indexed by arbitrary key and storing the next
## index of *node_list* that will be eligible to receive work (if it's
## alive at the time of next request).
rr_key_seq: RoundRobinTable &default = RoundRobinTable();
};
## The specification for :bro:see:`Cluster::proxy_pool`.
global proxy_pool_spec: PoolSpec =
PoolSpec($topic = "bro/cluster/pool/proxy",
$node_type = Cluster::PROXY) &redef;
## The specification for :bro:see:`Cluster::worker_pool`.
global worker_pool_spec: PoolSpec =
PoolSpec($topic = "bro/cluster/pool/worker",
$node_type = Cluster::WORKER) &redef;
## The specification for :bro:see:`Cluster::logger_pool`.
global logger_pool_spec: PoolSpec =
PoolSpec($topic = "bro/cluster/pool/logger",
$node_type = Cluster::LOGGER) &redef;
## A pool containing all the proxy nodes of a cluster.
## The pool's node membership/availability is automatically
## maintained by the cluster framework.
global proxy_pool: Pool;
## A pool containing all the worker nodes of a cluster.
## The pool's node membership/availability is automatically
## maintained by the cluster framework.
global worker_pool: Pool;
## A pool containing all the logger nodes of a cluster.
## The pool's node membership/availability is automatically
## maintained by the cluster framework.
global logger_pool: Pool;
## Registers and initializes a pool.
global register_pool: function(spec: PoolSpec): Pool;
## Retrieve the topic associated with the node mapped via Rendezvous hash
## of an arbitrary key.
##
## pool: the pool of nodes to consider.
##
## key: data used for input to the hashing function that will uniformly
## distribute keys among available nodes.
##
## Returns: a topic string associated with a cluster node that is alive
## or an empty string if nothing is alive.
global hrw_topic: function(pool: Pool, key: any): string;
## Retrieve the topic associated with the node in a round-robin fashion.
##
## pool: the pool of nodes to consider.
##
## key: an arbitrary string to identify the purpose for which you're
## requesting the topic. e.g. consider using namespacing of your script
## like "Intel::cluster_rr_key".
##
## Returns: a topic string associated with a cluster node that is alive,
## or an empty string if nothing is alive.
global rr_topic: function(pool: Pool, key: string): string;
## Distributes log message topics among logger nodes via round-robin.
## This will be automatically assigned to :bro:see:`Broker::log_topic`
## if :bro:see:`Cluster::enable_round_robin_logging` is enabled.
## If no logger nodes are active, then this will return the value
## of :bro:see:`Broker::default_log_topic`.
global rr_log_topic: function(id: Log::ID, path: string): string;
}
## Initialize a node as a member of a pool.
##
## pool: the pool to which the node will belong.
##
## name: the name of the node (e.g. "manager").
##
## Returns: F if a node of the same name already exists in the pool, else T.
global init_pool_node: function(pool: Pool, name: string): bool;
## Mark a pool node as alive/online/available. :bro:see:`Cluster::hrw_topic`
## will distribute keys to nodes marked as alive.
##
## pool: the pool to which the node belongs.
##
## name: the name of the node to mark.
##
## Returns: F if the node does not exist in the pool, else T.
global mark_pool_node_alive: function(pool: Pool, name: string): bool;
## Mark a pool node as dead/offline/unavailable. :bro:see:`Cluster::hrw_topic`
## will not distribute keys to nodes marked as dead.
##
## pool: the pool to which the node belongs.
##
## name: the name of the node to mark.
##
## Returns: F if the node does not exist in the pool, else T.
global mark_pool_node_dead: function(pool: Pool, name: string): bool;
global registered_pools: vector of Pool = vector();
function register_pool(spec: PoolSpec): Pool
{
local rval = Pool($spec = spec);
registered_pools[|registered_pools|] = rval;
return rval;
}
function hrw_topic(pool: Pool, key: any): string
{
if ( |pool$hrw_pool$sites| == 0 )
return "";
local site = HashHRW::get_site(pool$hrw_pool, key);
local pn: PoolNode = site$user_data;
return node_topic_prefix + pn$name;
}
function rr_topic(pool: Pool, key: string): string
{
if ( key !in pool$rr_key_seq )
pool$rr_key_seq[key] = 0;
local next_idx = pool$rr_key_seq[key];
local start = next_idx;
local rval = "";
if ( next_idx >= |pool$node_list| )
return rval;
while ( T )
{
local pn = pool$node_list[next_idx];
++next_idx;
if ( next_idx == |pool$node_list| )
next_idx = 0;
if ( pn$alive )
{
rval = node_topic_prefix + pn$name;
break;
}
if ( next_idx == start )
# no nodes alive
break;
}
pool$rr_key_seq[key] = next_idx;
return rval;
}
function rr_log_topic(id: Log::ID, path: string): string
{
local rval = rr_topic(logger_pool, "Cluster::rr_log_topic");
if ( rval != "" )
return rval;
rval = Broker::default_log_topic(id, path);
return rval;
}
event Cluster::node_up(name: string, id: string) &priority=10
{
for ( i in registered_pools )
{
local pool = registered_pools[i];
if ( name in pool$nodes )
mark_pool_node_alive(pool, name);
}
}
event Cluster::node_down(name: string, id: string) &priority=10
{
for ( i in registered_pools )
{
local pool = registered_pools[i];
if ( name in pool$nodes )
mark_pool_node_dead(pool, name);
}
}
function site_id_in_pool(pool: Pool, site_id: count): bool
{
for ( i in pool$nodes )
{
local pn = pool$nodes[i];
if ( pn$site_id == site_id )
return T;
}
return F;
}
function init_pool_node(pool: Pool, name: string): bool
{
if ( name in pool$nodes )
return F;
local loop = T;
local c = 0;
while ( loop )
{
# site id collisions are unlikely, but using aliases handles it...
# alternatively could terminate and ask user to pick a new node name
# if it ends up colliding.
local alias = name + fmt(".%s", c);
local site_id = fnv1a32(alias);
if ( site_id_in_pool(pool, site_id) )
++c;
else
{
local pn = PoolNode($name=name, $alias=alias, $site_id=site_id,
$alive=Cluster::node == name);
pool$nodes[name] = pn;
pool$node_list[|pool$node_list|] = pn;
loop = F;
}
}
return T;
}
function mark_pool_node_alive(pool: Pool, name: string): bool
{
if ( name !in pool$nodes )
return F;
local pn = pool$nodes[name];
pn$alive = T;
HashHRW::add_site(pool$hrw_pool, HashHRW::Site($id=pn$site_id, $user_data=pn));
return T;
}
function mark_pool_node_dead(pool: Pool, name: string): bool
{
if ( name !in pool$nodes )
return F;
local pn = pool$nodes[name];
pn$alive = F;
HashHRW::rem_site(pool$hrw_pool, HashHRW::Site($id=pn$site_id, $user_data=pn));
return T;
}
event bro_init()
{
worker_pool = register_pool(worker_pool_spec);
proxy_pool = register_pool(proxy_pool_spec);
logger_pool = register_pool(logger_pool_spec);
}
type PoolEligibilityTracking: record {
eligible_nodes: vector of NamedNode &default = vector();
next_idx: count &default = 0;
excluded: count &default = 0;
};
global pool_eligibility: table[Cluster::NodeType] of PoolEligibilityTracking = table();
function pool_sorter(a: Pool, b: Pool): int
{
return strcmp(a$spec$topic, b$spec$topic);
}
# Needs to execute before the bro_init in setup-connections
event bro_init() &priority=-5
{
if ( ! Cluster::is_enabled() )
return;
# Sorting now ensures the node distribution process is stable even if
# there's a change in the order of time-of-registration between Bro runs.
sort(registered_pools, pool_sorter);
pool_eligibility[Cluster::WORKER] =
PoolEligibilityTracking($eligible_nodes = nodes_with_type(Cluster::WORKER));
pool_eligibility[Cluster::PROXY] =
PoolEligibilityTracking($eligible_nodes = nodes_with_type(Cluster::PROXY));
pool_eligibility[Cluster::LOGGER] =
PoolEligibilityTracking($eligible_nodes = nodes_with_type(Cluster::LOGGER));
if ( manager_is_logger )
{
local mgr = nodes_with_type(Cluster::MANAGER);
if ( |mgr| > 0 )
{
local eln = pool_eligibility[Cluster::LOGGER]$eligible_nodes;
eln[|eln|] = mgr[0];
}
}
local pool: Pool;
local pet: PoolEligibilityTracking;
local en: vector of NamedNode;
for ( i in registered_pools )
{
pool = registered_pools[i];
if ( pool$spec$node_type !in pool_eligibility )
Reporter::fatal(fmt("invalid pool node type: %s", pool$spec$node_type));
if ( ! pool$spec$exclusive )
next;
if ( ! pool$spec?$max_nodes )
Reporter::fatal("Cluster::PoolSpec 'max_nodes' field must be set when using the 'exclusive' flag");
pet = pool_eligibility[pool$spec$node_type];
pet$excluded += pool$spec$max_nodes;
}
for ( nt in pool_eligibility )
{
pet = pool_eligibility[nt];
if ( pet$excluded > |pet$eligible_nodes| )
Reporter::fatal(fmt("not enough %s nodes to satisfy pool exclusivity requirements: need %d nodes", nt, pet$excluded));
}
for ( i in registered_pools )
{
pool = registered_pools[i];
if ( ! pool$spec$exclusive )
next;
pet = pool_eligibility[pool$spec$node_type];
local e = 0;
while ( e < pool$spec$max_nodes )
{
init_pool_node(pool, pet$eligible_nodes[e]$name);
++e;
}
local nen: vector of NamedNode = vector();
for ( j in pet$eligible_nodes )
{
if ( j < e )
next;
nen[|nen|] = pet$eligible_nodes[j];
}
pet$eligible_nodes = nen;
}
for ( i in registered_pools )
{
pool = registered_pools[i];
if ( pool$spec$exclusive )
next;
pet = pool_eligibility[pool$spec$node_type];
local nodes_to_init = |pet$eligible_nodes|;
if ( pool$spec?$max_nodes &&
pool$spec$max_nodes < |pet$eligible_nodes| )
nodes_to_init = pool$spec$max_nodes;
local nodes_inited = 0;
while ( nodes_inited < nodes_to_init )
{
init_pool_node(pool, pet$eligible_nodes[pet$next_idx]$name);
++nodes_inited;
++pet$next_idx;
if ( pet$next_idx == |pet$eligible_nodes| )
pet$next_idx = 0;
}
}
}

View file

@ -2,142 +2,122 @@
##! as defined by :bro:id:`Cluster::nodes`.
@load ./main
@load base/frameworks/communication
@if ( Cluster::node in Cluster::nodes )
@load ./pools
@load base/frameworks/broker
module Cluster;
event bro_init() &priority=9
function connect_peer(node_type: NodeType, node_name: string)
{
local me = nodes[node];
local nn = nodes_with_type(node_type);
for ( i in Cluster::nodes )
for ( i in nn )
{
local n = nodes[i];
local n = nn[i];
# Connections from the control node for runtime control and update events.
# Every node in a cluster is eligible for control from this host.
if ( n$node_type == CONTROL )
Communication::nodes["control"] = [$host=n$ip, $zone_id=n$zone_id,
$connect=F, $class="control",
$events=control_events];
if ( n$name != node_name )
next;
if ( me$node_type == LOGGER )
{
if ( n$node_type == MANAGER && n$logger == node )
Communication::nodes[i] =
[$host=n$ip, $zone_id=n$zone_id, $connect=F,
$class=i, $events=manager2logger_events, $request_logs=T];
if ( n$node_type == PROXY && n$logger == node )
Communication::nodes[i] =
[$host=n$ip, $zone_id=n$zone_id, $connect=F,
$class=i, $events=proxy2logger_events, $request_logs=T];
if ( n$node_type == WORKER && n$logger == node )
Communication::nodes[i] =
[$host=n$ip, $zone_id=n$zone_id, $connect=F,
$class=i, $events=worker2logger_events, $request_logs=T];
}
else if ( me$node_type == MANAGER )
{
if ( n$node_type == LOGGER && me$logger == i )
Communication::nodes["logger"] =
[$host=n$ip, $zone_id=n$zone_id, $p=n$p,
$connect=T, $retry=retry_interval,
$class=node];
if ( n$node_type == WORKER && n$manager == node )
Communication::nodes[i] =
[$host=n$ip, $zone_id=n$zone_id, $connect=F,
$class=i, $events=worker2manager_events,
$request_logs=Cluster::manager_is_logger];
if ( n$node_type == PROXY && n$manager == node )
Communication::nodes[i] =
[$host=n$ip, $zone_id=n$zone_id, $connect=F,
$class=i, $events=proxy2manager_events,
$request_logs=Cluster::manager_is_logger];
if ( n$node_type == TIME_MACHINE && me?$time_machine && me$time_machine == i )
Communication::nodes["time-machine"] = [$host=nodes[i]$ip,
$zone_id=nodes[i]$zone_id,
$p=nodes[i]$p,
$connect=T, $retry=retry_interval,
$events=tm2manager_events];
}
else if ( me$node_type == PROXY )
{
if ( n$node_type == LOGGER && me$logger == i )
Communication::nodes["logger"] =
[$host=n$ip, $zone_id=n$zone_id, $p=n$p,
$connect=T, $retry=retry_interval,
$class=node];
if ( n$node_type == WORKER && n$proxy == node )
Communication::nodes[i] =
[$host=n$ip, $zone_id=n$zone_id, $connect=F, $class=i,
$sync=T, $auth=T, $events=worker2proxy_events];
# accepts connections from the previous one.
# (This is not ideal for setups with many proxies)
# FIXME: Once we're using multiple proxies, we should also figure out some $class scheme ...
if ( n$node_type == PROXY )
{
if ( n?$proxy )
Communication::nodes[i]
= [$host=n$ip, $zone_id=n$zone_id, $p=n$p,
$connect=T, $auth=F, $sync=T, $retry=retry_interval];
else if ( me?$proxy && me$proxy == i )
Communication::nodes[me$proxy]
= [$host=nodes[i]$ip, $zone_id=nodes[i]$zone_id,
$connect=F, $auth=T, $sync=T];
}
# Finally the manager, to send it status updates.
if ( n$node_type == MANAGER && me$manager == i )
Communication::nodes["manager"] = [$host=nodes[i]$ip,
$zone_id=nodes[i]$zone_id,
$p=nodes[i]$p,
$connect=T, $retry=retry_interval,
$class=node,
$events=manager2proxy_events];
}
else if ( me$node_type == WORKER )
{
if ( n$node_type == LOGGER && me$logger == i )
Communication::nodes["logger"] =
[$host=n$ip, $zone_id=n$zone_id, $p=n$p,
$connect=T, $retry=retry_interval,
$class=node];
if ( n$node_type == MANAGER && me$manager == i )
Communication::nodes["manager"] = [$host=nodes[i]$ip,
$zone_id=nodes[i]$zone_id,
$p=nodes[i]$p,
$connect=T, $retry=retry_interval,
$class=node,
$events=manager2worker_events];
if ( n$node_type == PROXY && me$proxy == i )
Communication::nodes["proxy"] = [$host=nodes[i]$ip,
$zone_id=nodes[i]$zone_id,
$p=nodes[i]$p,
$connect=T, $retry=retry_interval,
$sync=T, $class=node,
$events=proxy2worker_events];
if ( n$node_type == TIME_MACHINE &&
me?$time_machine && me$time_machine == i )
Communication::nodes["time-machine"] = [$host=nodes[i]$ip,
$zone_id=nodes[i]$zone_id,
$p=nodes[i]$p,
$connect=T,
$retry=retry_interval,
$events=tm2worker_events];
}
local status = Broker::peer(cat(n$node$ip), n$node$p,
Cluster::retry_interval);
Cluster::log(fmt("initiate peering with %s:%s, retry=%s, status=%s",
n$node$ip, n$node$p, Cluster::retry_interval,
status));
}
}
@endif
function connect_peers_with_type(node_type: NodeType)
{
local rval: vector of NamedNode = vector();
local nn = nodes_with_type(node_type);
for ( i in nn )
{
local n = nn[i];
local status = Broker::peer(cat(n$node$ip), n$node$p,
Cluster::retry_interval);
Cluster::log(fmt("initiate peering with %s:%s, retry=%s, status=%s",
n$node$ip, n$node$p, Cluster::retry_interval,
status));
}
}
event bro_init() &priority=-10
{
local self = nodes[node];
for ( i in registered_pools )
{
local pool = registered_pools[i];
if ( node in pool$nodes )
Broker::subscribe(pool$spec$topic);
}
switch ( self$node_type ) {
case NONE:
return;
case CONTROL:
break;
case LOGGER:
Broker::subscribe(Cluster::logger_topic);
Broker::subscribe(Broker::default_log_topic_prefix);
break;
case MANAGER:
Broker::subscribe(Cluster::manager_topic);
if ( Cluster::manager_is_logger )
Broker::subscribe(Broker::default_log_topic_prefix);
break;
case PROXY:
Broker::subscribe(Cluster::proxy_topic);
break;
case WORKER:
Broker::subscribe(Cluster::worker_topic);
break;
case TIME_MACHINE:
Broker::subscribe(Cluster::time_machine_topic);
break;
default:
Reporter::error(fmt("Unhandled cluster node type: %s", self$node_type));
return;
}
Broker::subscribe(Cluster::broadcast_topic);
Broker::subscribe(node_topic(node));
Broker::listen(Broker::default_listen_address,
self$p,
Broker::default_listen_retry);
Cluster::log(fmt("listening on %s:%s", Broker::default_listen_address, self$p));
switch ( self$node_type ) {
case MANAGER:
connect_peers_with_type(LOGGER);
if ( self?$time_machine )
connect_peer(TIME_MACHINE, self$time_machine);
break;
case PROXY:
connect_peers_with_type(LOGGER);
if ( self?$manager )
connect_peer(MANAGER, self$manager);
break;
case WORKER:
connect_peers_with_type(LOGGER);
connect_peers_with_type(PROXY);
if ( self?$manager )
connect_peer(MANAGER, self$manager);
if ( self?$time_machine )
connect_peer(TIME_MACHINE, self$time_machine);
break;
}
}

View file

@ -1,2 +0,0 @@
The communication framework facilitates connecting to remote Bro or
Broccoli instances to share state and transfer events.

View file

@ -1 +0,0 @@
@load ./main

View file

@ -1,354 +0,0 @@
##! Facilitates connecting to remote Bro or Broccoli instances to share state
##! and/or transfer events.
@load base/frameworks/packet-filter
@load base/utils/addrs
module Communication;
export {
## The communication logging stream identifier.
redef enum Log::ID += { LOG };
## Which interface to listen on. The addresses ``0.0.0.0`` and ``[::]``
## are wildcards.
const listen_interface = 0.0.0.0 &redef;
## Which port to listen on. Note that BroControl sets this
## automatically.
const listen_port = 47757/tcp &redef;
## This defines if a listening socket should use SSL.
const listen_ssl = F &redef;
## Defines if a listening socket can bind to IPv6 addresses.
##
## Note that this is overridden by the BroControl IPv6Comm option.
const listen_ipv6 = F &redef;
## If :bro:id:`Communication::listen_interface` is a non-global
## IPv6 address and requires a specific :rfc:`4007` ``zone_id``,
## it can be specified here.
const listen_ipv6_zone_id = "" &redef;
## Defines the interval at which to retry binding to
## :bro:id:`Communication::listen_interface` on
## :bro:id:`Communication::listen_port` if it's already in use.
const listen_retry = 30 secs &redef;
## Default compression level. Compression level is 0-9, with 0 = no
## compression.
global compression_level = 0 &redef;
## A record type containing the column fields of the communication log.
type Info: record {
## The network time at which a communication event occurred.
ts: time &log;
## The peer name (if any) with which a communication event is
## concerned.
peer: string &log &optional;
## Where the communication event message originated from, that
## is, either from the scripting layer or inside the Bro process.
src_name: string &log &optional;
## .. todo:: currently unused.
connected_peer_desc: string &log &optional;
## .. todo:: currently unused.
connected_peer_addr: addr &log &optional;
## .. todo:: currently unused.
connected_peer_port: port &log &optional;
## The severity of the communication event message.
level: string &log &optional;
## A message describing the communication event between Bro or
## Broccoli instances.
message: string &log;
};
## A remote peer to which we would like to talk.
## If there's no entry for a peer, it may still connect
## and request state, but not send us any.
type Node: record {
## Remote address.
host: addr;
## If the *host* field is a non-global IPv6 address, this field
## can specify a particular :rfc:`4007` ``zone_id``.
zone_id: string &optional;
## Port of the remote Bro communication endpoint if we are
## initiating the connection (based on the *connect* field).
p: port &optional;
## When accepting a connection, the configuration only
## applies if the class matches the one transmitted by
## the peer.
##
## When initiating a connection, the class is sent to
## the other side.
class: string &optional;
## Events requested from remote side.
events: pattern &optional;
## Whether we are going to connect (rather than waiting
## for the other side to connect to us).
connect: bool &default = F;
## If disconnected, reconnect after this many seconds.
retry: interval &default = 0 secs;
## Whether to accept remote events.
accept_input: bool &default = T;
## Whether to perform state synchronization with peer.
sync: bool &default = F;
## Whether to request logs from the peer.
request_logs: bool &default = F;
## When performing state synchronization, whether we consider
## our state to be authoritative (only one side can be
## authoritative). If so, we will send the peer our current
## set when the connection is set up.
auth: bool &default = F;
## If not set, no capture filter is sent.
## If set to an empty string, then the default capture filter
## is sent.
capture_filter: string &optional;
## Whether to use SSL-based communication.
ssl: bool &default = F;
## Compression level is 0-9, with 0 = no compression.
compression: count &default = compression_level;
## The remote peer.
peer: event_peer &optional;
## Indicates the status of the node.
connected: bool &default = F;
};
## The table of Bro or Broccoli nodes that Bro will initiate connections
## to or respond to connections from. Note that BroControl sets this
## automatically.
global nodes: table[string] of Node &redef;
## A table of peer nodes for which this node issued a
## :bro:id:`Communication::connect_peer` call but with which a connection
## has not yet been established or with which a connection has been
## closed and is currently in the process of retrying to establish.
## When a connection is successfully established, the peer is removed
## from the table.
global pending_peers: table[peer_id] of Node;
## A table of peer nodes for which this node has an established connection.
## Peers are automatically removed if their connection is closed and
## automatically added back if a connection is re-established later.
global connected_peers: table[peer_id] of Node;
## Connect to a node in :bro:id:`Communication::nodes` independent
## of its "connect" flag.
##
## peer: the string used to index a particular node within the
## :bro:id:`Communication::nodes` table.
global connect_peer: function(peer: string);
}
const src_names = {
[REMOTE_SRC_CHILD] = "child",
[REMOTE_SRC_PARENT] = "parent",
[REMOTE_SRC_SCRIPT] = "script",
};
event bro_init() &priority=5
{
Log::create_stream(Communication::LOG, [$columns=Info, $path="communication"]);
}
function do_script_log_common(level: count, src: count, msg: string)
{
Log::write(Communication::LOG, [$ts = network_time(),
$level = (level == REMOTE_LOG_INFO ? "info" : "error"),
$src_name = src_names[src],
$peer = get_event_peer()$descr,
$message = msg]);
}
# This is a core generated event.
event remote_log(level: count, src: count, msg: string)
{
do_script_log_common(level, src, msg);
}
# This is a core generated event.
event remote_log_peer(p: event_peer, level: count, src: count, msg: string)
{
local rmsg = fmt("[#%d/%s:%d] %s", p$id, addr_to_uri(p$host), p$p, msg);
do_script_log_common(level, src, rmsg);
}
function do_script_log(p: event_peer, msg: string)
{
do_script_log_common(REMOTE_LOG_INFO, REMOTE_SRC_SCRIPT, msg);
}
function connect_peer(peer: string)
{
local node = nodes[peer];
local p = listen_port;
if ( node?$p )
p = node$p;
local class = node?$class ? node$class : "";
local zone_id = node?$zone_id ? node$zone_id : "";
local id = connect(node$host, zone_id, p, class, node$retry, node$ssl);
if ( id == PEER_ID_NONE )
Log::write(Communication::LOG, [$ts = network_time(),
$peer = get_event_peer()$descr,
$message = "can't trigger connect"]);
pending_peers[id] = node;
}
function setup_peer(p: event_peer, node: Node)
{
if ( node?$events )
{
do_script_log(p, fmt("requesting events matching %s", node$events));
request_remote_events(p, node$events);
}
if ( node?$capture_filter && node$capture_filter != "" )
{
local filter = node$capture_filter;
do_script_log(p, fmt("sending capture_filter: %s", filter));
send_capture_filter(p, filter);
}
if ( node$accept_input )
{
do_script_log(p, "accepting state");
set_accept_state(p, T);
}
set_compression_level(p, node$compression);
if ( node$sync )
{
do_script_log(p, "requesting synchronized state");
request_remote_sync(p, node$auth);
}
if ( node$request_logs )
{
do_script_log(p, "requesting logs");
request_remote_logs(p);
}
node$peer = p;
node$connected = T;
connected_peers[p$id] = node;
}
event remote_connection_established(p: event_peer)
{
if ( is_remote_event() )
return;
do_script_log(p, "connection established");
if ( p$id in pending_peers )
{
# We issued the connect.
local node = pending_peers[p$id];
setup_peer(p, node);
delete pending_peers[p$id];
}
else
{ # The other side connected to us.
local found = F;
for ( i in nodes )
{
node = nodes[i];
if ( node$host == p$host )
{
local c = 0;
# See if classes match = either both have
# the same class, or neither of them has
# a class.
if ( p?$class && p$class != "" )
++c;
if ( node?$class && node$class != "" )
++c;
if ( c == 1 ||
(c == 2 && p$class != node$class) )
next;
found = T;
setup_peer(p, node);
break;
}
}
if ( ! found )
set_compression_level(p, compression_level);
}
complete_handshake(p);
}
event remote_connection_closed(p: event_peer)
{
if ( is_remote_event() )
return;
do_script_log(p, "connection closed");
if ( p$id in connected_peers )
{
local node = connected_peers[p$id];
node$connected = F;
delete connected_peers[p$id];
if ( node$retry != 0secs )
# The core will retry.
pending_peers[p$id] = node;
}
}
event remote_state_inconsistency(operation: string, id: string,
expected_old: string, real_old: string)
{
if ( is_remote_event() )
return;
local msg = fmt("state inconsistency: %s should be %s but is %s before %s",
id, expected_old, real_old, operation);
Log::write(Communication::LOG, [$ts = network_time(),
$peer = get_event_peer()$descr,
$message = msg]);
}
# Actually initiate the connections that need to be established.
event bro_init() &priority = -10 # let others modify nodes
{
if ( |nodes| > 0 )
enable_communication();
for ( tag in nodes )
{
if ( ! nodes[tag]$connect )
next;
connect_peer(tag);
}
}

View file

@ -5,6 +5,13 @@
module Control;
export {
## The topic prefix used for exchanging control messages via Broker.
const topic_prefix = "bro/control";
## Whether the controllee should call :bro:see:`Broker::listen`.
## In a cluster, this isn't needed since the setup process calls it.
const controllee_listen = T &redef;
## The address of the host that will be controlled.
const host = 0.0.0.0 &redef;
@ -22,12 +29,6 @@ export {
## This can be used by commands that take an argument.
const arg = "" &redef;
## Events that need to be handled by controllers.
const controller_events = /Control::.*_request/ &redef;
## Events that need to be handled by controllees.
const controllee_events = /Control::.*_response/ &redef;
## The commands that can currently be given on the command line for
## remote control.
const commands: set[string] = {
@ -73,8 +74,7 @@ export {
global shutdown_response: event();
}
event terminate_event()
{
terminate_communication();
terminate();
}

View file

@ -6,69 +6,96 @@
module Intel;
redef record Item += {
## This field is used internally for cluster transparency to avoid
## re-dispatching intelligence items over and over from workers.
first_dispatch: bool &default=T;
};
export {
## Broker topic for management of intel items. Sending insert_item and
## remove_item events, peers can manage intelligence data.
const item_topic = "bro/intel/items" &redef;
## Broker topic for management of intel indicators as stored on workers
## for matching. Sending insert_indicator and remove_indicator events,
## the back-end manages indicators.
const indicator_topic = "bro/intel/indicators" &redef;
## Broker topic for matching events, generated by workers and sent to
## the back-end for metadata enrichment and logging.
const match_topic = "bro/intel/match" &redef;
}
# Internal events for cluster data distribution.
global insert_item: event(item: Item);
global insert_indicator: event(item: Item);
# If this process is not a manager process, we don't want the full metadata.
@if ( Cluster::local_node_type() != Cluster::MANAGER )
redef have_full_data = F;
@endif
# Internal event for cluster data distribution.
global cluster_new_item: event(item: Item);
# Primary intelligence management is done by the manager.
# The manager informs the workers about new items and item removal.
redef Cluster::manager2worker_events += /^Intel::(cluster_new_item|purge_item)$/;
# A worker queries the manager to insert, remove or indicate the match of an item.
redef Cluster::worker2manager_events += /^Intel::(cluster_new_item|remove_item|match_no_items)$/;
@if ( Cluster::local_node_type() == Cluster::MANAGER )
event bro_init()
{
Broker::subscribe(item_topic);
Broker::subscribe(match_topic);
Broker::auto_publish(indicator_topic, remove_indicator);
}
# Handling of new worker nodes.
event remote_connection_handshake_done(p: event_peer)
event Cluster::node_up(name: string, id: string)
{
# When a worker connects, send it the complete minimal data store.
# It will be kept up to date after this by the cluster_new_item event.
if ( p$descr in Cluster::nodes && Cluster::nodes[p$descr]$node_type == Cluster::WORKER )
# It will be kept up to date after this by the insert_indicator event.
if ( name in Cluster::nodes && Cluster::nodes[name]$node_type == Cluster::WORKER )
{
send_id(p, "Intel::min_data_store");
Broker::publish_id(Cluster::node_topic(name), "Intel::min_data_store");
}
}
# Handling of matches triggered by worker nodes.
event Intel::match_no_items(s: Seen) &priority=5
# On the manager, the new_item event indicates a new indicator that
# has to be distributed.
event Intel::new_item(item: Item) &priority=5
{
Broker::publish(indicator_topic, Intel::insert_indicator, item);
}
# Handling of item insertion triggered by remote node.
event Intel::insert_item(item: Intel::Item) &priority=5
{
Intel::_insert(item, T);
}
# Handling of item removal triggered by remote node.
event Intel::remove_item(item: Item, purge_indicator: bool) &priority=5
{
remove(item, purge_indicator);
}
# Handling of match triggered by remote node.
event Intel::match_remote(s: Seen) &priority=5
{
if ( Intel::find(s) )
event Intel::match(s, Intel::get_items(s));
}
# Handling of item removal triggered by worker nodes.
event Intel::remove_item(item: Item, purge_indicator: bool)
{
remove(item, purge_indicator);
}
@endif
# Handling of item insertion.
event Intel::new_item(item: Intel::Item) &priority=5
@if ( Cluster::local_node_type() == Cluster::WORKER )
event bro_init()
{
# The cluster manager always rebroadcasts intelligence.
# Workers redistribute it if it was locally generated.
if ( Cluster::local_node_type() == Cluster::MANAGER ||
item$first_dispatch )
{
item$first_dispatch=F;
event Intel::cluster_new_item(item);
}
Broker::subscribe(indicator_topic);
Broker::auto_publish(match_topic, match_remote);
Broker::auto_publish(item_topic, remove_item);
}
# Handling of item insertion by remote node.
event Intel::cluster_new_item(item: Intel::Item) &priority=5
# On a worker, the new_item event requires to trigger the insertion
# on the manager to update the back-end data store.
event Intel::new_item(item: Intel::Item) &priority=5
{
# Ignore locally generated events to avoid event storms.
if ( is_remote_event() )
Intel::insert(item);
Broker::publish(item_topic, Intel::insert_item, item);
}
# Handling of new indicators published by the manager.
event Intel::insert_indicator(item: Intel::Item) &priority=5
{
Intel::_insert(item, F);
}
@endif

View file

@ -177,12 +177,12 @@ export {
}
# Internal handler for matches with no metadata available.
global match_no_items: event(s: Seen);
global match_remote: event(s: Seen);
# Internal events for cluster data distribution.
# Internal events for (cluster) data distribution.
global new_item: event(item: Item);
global remove_item: event(item: Item, purge_indicator: bool);
global purge_item: event(item: Item);
global remove_indicator: event(item: Item);
# Optionally store metadata. This is used internally depending on
# if this is a cluster deployment or not.
@ -357,7 +357,7 @@ function Intel::seen(s: Seen)
}
else
{
event Intel::match_no_items(s);
event Intel::match_remote(s);
}
}
}
@ -389,9 +389,11 @@ hook extend_match(info: Info, s: Seen, items: set[Item]) &priority=5
}
}
function insert(item: Item)
# Function to insert metadata of an item. The function returns T
# if the given indicator is new.
function insert_meta_data(item: Item): bool
{
# Create and fill out the metadata item.
# Prepare the metadata entry.
local meta = item$meta;
local meta_tbl: table [string] of MetaData;
local is_new: bool = T;
@ -399,11 +401,11 @@ function insert(item: Item)
# All intelligence is case insensitive at the moment.
local lower_indicator = to_lower(item$indicator);
if ( item$indicator_type == ADDR )
switch ( item$indicator_type )
{
local host = to_addr(item$indicator);
if ( have_full_data )
{
case ADDR:
local host = to_addr(item$indicator);
if ( host !in data_store$host_data )
data_store$host_data[host] = table();
else
@ -414,15 +416,10 @@ function insert(item: Item)
}
meta_tbl = data_store$host_data[host];
}
break;
case SUBNET:
local net = to_subnet(item$indicator);
add min_data_store$host_data[host];
}
else if ( item$indicator_type == SUBNET )
{
local net = to_subnet(item$indicator);
if ( have_full_data )
{
if ( !check_subnet(net, data_store$subnet_data) )
data_store$subnet_data[net] = table();
else
@ -433,14 +430,8 @@ function insert(item: Item)
}
meta_tbl = data_store$subnet_data[net];
}
add min_data_store$subnet_data[net];
}
else
{
if ( have_full_data )
{
break;
default:
if ( [lower_indicator, item$indicator_type] !in data_store$string_data )
data_store$string_data[lower_indicator, item$indicator_type] = table();
else
@ -452,23 +443,59 @@ function insert(item: Item)
}
meta_tbl = data_store$string_data[lower_indicator, item$indicator_type];
}
break;
}
add min_data_store$string_data[lower_indicator, item$indicator_type];
# Insert new metadata or update if already present.
meta_tbl[meta$source] = meta;
return is_new;
}
# Function to encapsulate insertion logic. The first_dispatch parameter
# indicates whether the item might be new for other nodes.
function _insert(item: Item, first_dispatch: bool &default = T)
{
# Assume that the item is new by default.
local is_new: bool = T;
# All intelligence is case insensitive at the moment.
local lower_indicator = to_lower(item$indicator);
# Insert indicator into MinDataStore (might exist already).
switch ( item$indicator_type )
{
case ADDR:
local host = to_addr(item$indicator);
add min_data_store$host_data[host];
break;
case SUBNET:
local net = to_subnet(item$indicator);
add min_data_store$subnet_data[net];
break;
default:
add min_data_store$string_data[lower_indicator, item$indicator_type];
break;
}
if ( have_full_data )
{
# Insert new metadata or update if already present
meta_tbl[meta$source] = meta;
# Insert new metadata or update if already present.
is_new = insert_meta_data(item);
}
if ( is_new )
# Trigger insert for cluster in case the item is new
# or insert was called on a worker
if ( first_dispatch && is_new )
# Announce a (possibly) new item if this is the first dispatch and
# we know it is new or have to assume that on a worker.
event Intel::new_item(item);
}
function insert(item: Item)
{
# Insert possibly new item.
_insert(item, T);
}
# Function to check whether an item is present.
function item_exists(item: Item): bool
{
@ -549,12 +576,12 @@ function remove(item: Item, purge_indicator: bool)
break;
}
# Trigger deletion in minimal data stores
event Intel::purge_item(item);
event Intel::remove_indicator(item);
}
}
# Handling of indicator removal in minimal data stores.
event purge_item(item: Item)
event remove_indicator(item: Item)
{
switch ( item$indicator_type )
{
@ -571,4 +598,3 @@ event purge_item(item: Item)
break;
}
}

View file

@ -225,9 +225,22 @@ global blocks: table[addr] of BlockInfo = {}
@if ( Cluster::is_enabled() )
@load base/frameworks/cluster
redef Cluster::manager2worker_events += /NetControl::catch_release_block_(new|delete)/;
redef Cluster::worker2manager_events += /NetControl::catch_release_(add|delete|encountered)/;
@if ( Cluster::local_node_type() == Cluster::MANAGER )
event bro_init()
{
Broker::auto_publish(Cluster::worker_topic, NetControl::catch_release_block_new);
Broker::auto_publish(Cluster::worker_topic, NetControl::catch_release_block_delete);
}
@else
event bro_init()
{
Broker::auto_publish(Cluster::manager_topic, NetControl::catch_release_add);
Broker::auto_publish(Cluster::manager_topic, NetControl::catch_release_delete);
Broker::auto_publish(Cluster::manager_topic, NetControl::catch_release_encountered);
}
@endif
@endif
function cr_check_rule(r: Rule): bool

View file

@ -16,10 +16,25 @@ export {
global cluster_netcontrol_delete_rule: event(id: string, reason: string);
}
## Workers need ability to forward commands to manager.
redef Cluster::worker2manager_events += /NetControl::cluster_netcontrol_(add|remove|delete)_rule/;
## Workers need to see the result events from the manager.
redef Cluster::manager2worker_events += /NetControl::rule_(added|removed|timeout|error|exists|new|destroyed)/;
@if ( Cluster::local_node_type() == Cluster::MANAGER )
event bro_init()
{
Broker::auto_publish(Cluster::worker_topic, NetControl::rule_added);
Broker::auto_publish(Cluster::worker_topic, NetControl::rule_removed);
Broker::auto_publish(Cluster::worker_topic, NetControl::rule_timeout);
Broker::auto_publish(Cluster::worker_topic, NetControl::rule_error);
Broker::auto_publish(Cluster::worker_topic, NetControl::rule_exists);
Broker::auto_publish(Cluster::worker_topic, NetControl::rule_new);
Broker::auto_publish(Cluster::worker_topic, NetControl::rule_destroyed);
}
@else
event bro_init()
{
Broker::auto_publish(Cluster::manager_topic, NetControl::cluster_netcontrol_add_rule);
Broker::auto_publish(Cluster::manager_topic, NetControl::cluster_netcontrol_remove_rule);
Broker::auto_publish(Cluster::manager_topic, NetControl::cluster_netcontrol_delete_rule);
}
@endif
function activate(p: PluginState, priority: int)
{

View file

@ -6,8 +6,6 @@ module NetControl;
@load ../plugin
@load base/frameworks/broker
@ifdef ( Broker::__enable )
export {
type AclRule : record {
command: string;
@ -243,7 +241,7 @@ function acld_add_rule_fun(p: PluginState, r: Rule) : bool
if ( ar$command == "" )
return F;
Broker::send_event(p$acld_config$acld_topic, Broker::event_args(acld_add_rule, p$acld_id, r, ar));
Broker::publish(p$acld_config$acld_topic, acld_add_rule, p$acld_id, r, ar);
return T;
}
@ -266,19 +264,20 @@ function acld_remove_rule_fun(p: PluginState, r: Rule, reason: string) : bool
ar$comment = reason;
}
Broker::send_event(p$acld_config$acld_topic, Broker::event_args(acld_remove_rule, p$acld_id, r, ar));
Broker::publish(p$acld_config$acld_topic, acld_remove_rule, p$acld_id, r, ar);
return T;
}
function acld_init(p: PluginState)
{
Broker::enable();
Broker::connect(cat(p$acld_config$acld_host), p$acld_config$acld_port, 1sec);
Broker::subscribe_to_events(p$acld_config$acld_topic);
Broker::peer(cat(p$acld_config$acld_host), p$acld_config$acld_port);
Broker::subscribe(p$acld_config$acld_topic);
}
event Broker::outgoing_connection_established(peer_address: string, peer_port: port, peer_name: string)
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
local peer_address = cat(endpoint$network$address);
local peer_port = endpoint$network$bound_port;
if ( [peer_port, peer_address] !in netcontrol_acld_peers )
# ok, this one was none of ours...
return;
@ -315,5 +314,3 @@ function create_acld(config: AcldConfig) : PluginState
return p;
}
@endif

View file

@ -8,8 +8,6 @@ module NetControl;
@load ../plugin
@load base/frameworks/broker
@ifdef ( Broker::__enable )
export {
## This record specifies the configuration that is passed to :bro:see:`NetControl::create_broker`.
type BrokerConfig: record {
@ -151,7 +149,7 @@ function broker_add_rule_fun(p: PluginState, r: Rule) : bool
if ( ! broker_check_rule(p, r) )
return F;
Broker::send_event(p$broker_config$topic, Broker::event_args(broker_add_rule, p$broker_id, r));
Broker::publish(p$broker_config$topic, Broker::make_event(broker_add_rule, p$broker_id, r));
return T;
}
@ -160,19 +158,20 @@ function broker_remove_rule_fun(p: PluginState, r: Rule, reason: string) : bool
if ( ! broker_check_rule(p, r) )
return F;
Broker::send_event(p$broker_config$topic, Broker::event_args(broker_remove_rule, p$broker_id, r, reason));
Broker::publish(p$broker_config$topic, Broker::make_event(broker_remove_rule, p$broker_id, r, reason));
return T;
}
function broker_init(p: PluginState)
{
Broker::enable();
Broker::connect(cat(p$broker_config$host), p$broker_config$bport, 1sec);
Broker::subscribe_to_events(p$broker_config$topic);
Broker::peer(cat(p$broker_config$host), p$broker_config$bport);
Broker::subscribe(p$broker_config$topic);
}
event Broker::outgoing_connection_established(peer_address: string, peer_port: port, peer_name: string)
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
local peer_address = cat(endpoint$network$address);
local peer_port = endpoint$network$bound_port;
if ( [peer_port, peer_address] !in netcontrol_broker_peers )
return;
@ -219,5 +218,3 @@ function create_broker(config: BrokerConfig, can_expire: bool) : PluginState
return p;
}
@endif

View file

@ -8,14 +8,5 @@
@load ./actions/page
@load ./actions/add-geodata
# The cluster framework must be loaded first.
@load base/frameworks/cluster
@if ( Cluster::is_enabled() )
@load ./cluster
@else
@load ./non-cluster
@endif
# Load here so that it can check whether clustering is enabled.
@load ./actions/pp-alarms

View file

@ -155,9 +155,11 @@ function pretty_print_alarm(out: file, n: Info)
@if ( Cluster::is_enabled() )
pdescr = "local";
if ( n?$src_peer )
pdescr = n$src_peer?$descr ? n$src_peer$descr : fmt("%s", n$src_peer$host);
if ( n?$peer_descr )
pdescr = n$peer_descr;
else if ( n?$peer_name )
pdescr = n$peer_name;
pdescr = fmt("<%s> ", pdescr);
@endif

View file

@ -1,53 +0,0 @@
##! Implements notice functionality across clusters. Worker nodes
##! will disable notice/alarm logging streams and forward notice
##! events to the manager node for logging/processing.
@load ./main
@load base/frameworks/cluster
module Notice;
export {
## This is the event used to transport notices on the cluster.
##
## n: The notice information to be sent to the cluster manager for
## further processing.
global cluster_notice: event(n: Notice::Info);
}
## Manager can communicate notice suppression to workers.
redef Cluster::manager2worker_events += /Notice::begin_suppression/;
## Workers need ability to forward notices to manager.
redef Cluster::worker2manager_events += /Notice::cluster_notice/;
@if ( Cluster::local_node_type() != Cluster::MANAGER )
event Notice::begin_suppression(ts: time, suppress_for: interval, note: Type, identifier: string)
{
local suppress_until = ts + suppress_for;
suppressing[note, identifier] = suppress_until;
}
@endif
@if ( Cluster::local_node_type() == Cluster::MANAGER )
event Notice::cluster_notice(n: Notice::Info)
{
# Raise remotely received notices on the manager
NOTICE(n);
}
@endif
module GLOBAL;
## This is the entry point in the global namespace for the notice framework.
function NOTICE(n: Notice::Info)
{
# Suppress this notice if necessary.
if ( Notice::is_being_suppressed(n) )
return;
if ( Cluster::local_node_type() == Cluster::MANAGER )
Notice::internal_NOTICE(n);
else
# For non-managers, send the notice on to the manager.
event Notice::cluster_notice(n);
}

View file

@ -4,6 +4,8 @@
##! what is bad activity for sites. More extensive documentation about using
##! the notice framework can be found in :doc:`/frameworks/notice`.
@load base/frameworks/cluster
module Notice;
export {
@ -117,9 +119,10 @@ export {
## Associated count, or perhaps a status code.
n: count &log &optional;
## Peer that raised this notice.
src_peer: event_peer &optional;
## Textual description for the peer that raised this notice.
## Name of remote peer that raised this notice.
peer_name: string &optional;
## Textual description for the peer that raised this notice,
## including name, host address and port.
peer_descr: string &log &optional;
## The actions which have been applied to this notice.
@ -316,8 +319,36 @@ export {
##
## n: The record of notice data.
global internal_NOTICE: function(n: Notice::Info);
## This is the event used to transport notices on the cluster.
##
## n: The notice information to be sent to the cluster manager for
## further processing.
global cluster_notice: event(n: Notice::Info);
}
module GLOBAL;
function NOTICE(n: Notice::Info)
{
if ( Notice::is_being_suppressed(n) )
return;
@if ( Cluster::is_enabled() )
if ( Cluster::local_node_type() == Cluster::MANAGER )
Notice::internal_NOTICE(n);
else
{
n$peer_name = n$peer_descr = Cluster::node;
Broker::publish(Cluster::manager_topic, Notice::cluster_notice, n);
}
@else
Notice::internal_NOTICE(n);
@endif
}
module Notice;
# This is used as a hack to implement per-item expiration intervals.
function per_notice_suppression_interval(t: table[Notice::Type, string] of time, idx: any): interval
{
@ -368,24 +399,6 @@ event bro_init() &priority=5
$interv=24hrs, $postprocessor=log_mailing_postprocessor]);
}
# TODO: fix this.
#function notice_tags(n: Notice::Info) : table[string] of string
# {
# local tgs: table[string] of string = table();
# if ( is_remote_event() )
# {
# if ( n$src_peer$descr != "" )
# tgs["es"] = n$src_peer$descr;
# else
# tgs["es"] = fmt("%s/%s", n$src_peer$host, n$src_peer$p);
# }
# else
# {
# tgs["es"] = peer_description;
# }
# return tgs;
# }
function email_headers(subject_desc: string, dest: string): string
{
local header_text = string_cat(
@ -506,11 +519,25 @@ hook Notice::notice(n: Notice::Info) &priority=-5
if ( n?$identifier &&
[n$note, n$identifier] !in suppressing &&
n$suppress_for != 0secs )
{
local suppress_until = n$ts + n$suppress_for;
suppressing[n$note, n$identifier] = suppress_until;
{
event Notice::begin_suppression(n$ts, n$suppress_for, n$note, n$identifier);
}
}
}
event Notice::begin_suppression(ts: time, suppress_for: interval, note: Type,
identifier: string)
{
local suppress_until = ts + suppress_for;
suppressing[note, identifier] = suppress_until;
}
event bro_init()
{
if ( ! Cluster::is_enabled() )
return;
Broker::auto_publish(Cluster::worker_topic, Notice::begin_suppression);
Broker::auto_publish(Cluster::proxy_topic, Notice::begin_suppression);
}
function is_being_suppressed(n: Notice::Info): bool
@ -612,12 +639,6 @@ function apply_policy(n: Notice::Info)
n$dst = n$iconn$resp_h;
}
if ( ! n?$src_peer )
n$src_peer = get_event_peer();
if ( ! n?$peer_descr )
n$peer_descr = n$src_peer?$descr ?
n$src_peer$descr : fmt("%s", n$src_peer$host);
if ( ! n?$email_body_sections )
n$email_body_sections = vector();
if ( ! n?$email_delay_tokens )
@ -652,6 +673,7 @@ function internal_NOTICE(n: Notice::Info)
hook Notice::notice(n);
}
module GLOBAL;
global NOTICE: function(n: Notice::Info);
event Notice::cluster_notice(n: Notice::Info)
{
NOTICE(n);
}

View file

@ -1,14 +0,0 @@
@load ./main
module GLOBAL;
## This is the entry point in the global namespace for the notice framework.
function NOTICE(n: Notice::Info)
{
# Suppress this notice if necessary.
if ( Notice::is_being_suppressed(n) )
return;
Notice::internal_NOTICE(n);
}

View file

@ -13,8 +13,14 @@ export {
global cluster_flow_clear: event(name: string);
}
## Workers need ability to forward commands to manager.
redef Cluster::worker2manager_events += /OpenFlow::cluster_flow_(mod|clear)/;
@if ( Cluster::local_node_type() != Cluster::MANAGER )
# Workers need ability to forward commands to manager.
event bro_init()
{
Broker::auto_publish(Cluster::manager_topic, OpenFlow::cluster_flow_mod);
Broker::auto_publish(Cluster::manager_topic, OpenFlow::cluster_flow_clear);
}
@endif
# the flow_mod function wrapper
function flow_mod(controller: Controller, match: ofp_match, flow_mod: ofp_flow_mod): bool

View file

@ -5,8 +5,6 @@
module OpenFlow;
@ifdef ( Broker::__enable )
export {
redef enum Plugin += {
BROKER,
@ -49,27 +47,28 @@ function broker_describe(state: ControllerState): string
function broker_flow_mod_fun(state: ControllerState, match: ofp_match, flow_mod: OpenFlow::ofp_flow_mod): bool
{
Broker::send_event(state$broker_topic, Broker::event_args(broker_flow_mod, state$_name, state$broker_dpid, match, flow_mod));
Broker::publish(state$broker_topic, Broker::make_event(broker_flow_mod, state$_name, state$broker_dpid, match, flow_mod));
return T;
}
function broker_flow_clear_fun(state: OpenFlow::ControllerState): bool
{
Broker::send_event(state$broker_topic, Broker::event_args(broker_flow_clear, state$_name, state$broker_dpid));
Broker::publish(state$broker_topic, Broker::make_event(broker_flow_clear, state$_name, state$broker_dpid));
return T;
}
function broker_init(state: OpenFlow::ControllerState)
{
Broker::enable();
Broker::connect(cat(state$broker_host), state$broker_port, 1sec);
Broker::subscribe_to_events(state$broker_topic); # openflow success and failure events are directly sent back via the other plugin via broker.
Broker::peer(cat(state$broker_host), state$broker_port);
Broker::subscribe(state$broker_topic); # openflow success and failure events are directly sent back via the other plugin via broker.
}
event Broker::outgoing_connection_established(peer_address: string, peer_port: port, peer_name: string)
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
local peer_address = cat(endpoint$network$address);
local peer_port = endpoint$network$bound_port;
if ( [peer_port, peer_address] !in broker_peers )
# ok, this one was none of ours...
return;
@ -94,5 +93,3 @@ function broker_new(name: string, host: addr, host_port: port, topic: string, dp
return c;
}
@endif

View file

@ -6,6 +6,7 @@
@load base/utils/directions-and-hosts
@load base/utils/numbers
@load base/frameworks/cluster
module Software;
@ -68,8 +69,9 @@ export {
## Hosts whose software should be detected and tracked.
## Choices are: LOCAL_HOSTS, REMOTE_HOSTS, ALL_HOSTS, NO_HOSTS.
const asset_tracking = LOCAL_HOSTS &redef;
## Other scripts should call this function when they detect software.
##
## id: The connection id where the software was discovered.
##
## info: A record representing the software discovered.
@ -102,15 +104,21 @@ export {
## The set of software associated with an address. Data expires from
## this table after one day by default so that a detected piece of
## software will be logged once each day.
global tracked: table[addr] of SoftwareSet
&create_expire=1day
&synchronized
&redef;
## software will be logged once each day. In a cluster, this table is
## uniformly distributed among proxy nodes.
global tracked: table[addr] of SoftwareSet &create_expire=1day;
## This event can be handled to access the :bro:type:`Software::Info`
## record as it is sent on to the logging framework.
global log_software: event(rec: Info);
## This event can be handled to access software information whenever it's
## version is found to have changed.
global version_change: event(old: Info, new: Info);
## This event is raised when software is about to be registered for
## tracking in :bro:see:`Software::tracked`.
global register: event(info: Info);
}
event bro_init() &priority=5
@ -437,63 +445,70 @@ function software_fmt(i: Info): string
return fmt("%s %s", i$name, software_fmt_version(i$version));
}
# Insert a mapping into the table
# Overides old entries for the same software and generates events if needed.
event register(id: conn_id, info: Info)
event Software::register(info: Info)
{
# Host already known?
if ( info$host !in tracked )
tracked[info$host] = table();
local ts: SoftwareSet;
if ( info$host in tracked )
ts = tracked[info$host];
else
ts = tracked[info$host] = SoftwareSet();
local ts = tracked[info$host];
# Software already registered for this host? We don't want to endlessly
# log the same thing.
if ( info$name in ts )
{
local old = ts[info$name];
# If the version hasn't changed, then we're just redetecting the
# same thing, then we don't care. This results in no extra logging.
# But if the $force_log value is set then we'll continue.
if ( ! info$force_log && cmp_versions(old$version, info$version) == 0 )
local changed = cmp_versions(old$version, info$version) != 0;
if ( changed )
event Software::version_change(old, info);
else if ( ! info$force_log )
# If the version hasn't changed, then we're just redetecting the
# same thing, then we don't care.
return;
}
ts[info$name] = info;
Log::write(Software::LOG, info);
}
function found(id: conn_id, info: Info): bool
{
if ( info$force_log || addr_matches_host(info$host, asset_tracking) )
{
if ( !info?$ts )
info$ts=network_time();
if ( info?$version ) # we have a version number and don't have to parse. check if the name is also set...
{
if ( ! info?$name )
{
Reporter::error("Required field name not present in Software::found");
return F;
}
}
else # no version present, we have to parse...
{
if ( !info?$unparsed_version )
{
Reporter::error("No unparsed version string present in Info record with version in Software::found");
return F;
}
local sw = parse(info$unparsed_version);
info$unparsed_version = sw$unparsed_version;
info$name = sw$name;
info$version = sw$version;
}
event register(id, info);
return T;
}
else
if ( ! info$force_log && ! addr_matches_host(info$host, asset_tracking) )
return F;
if ( ! info?$ts )
info$ts = network_time();
if ( info?$version )
{
if ( ! info?$name )
{
Reporter::error("Required field name not present in Software::found");
return F;
}
}
else if ( ! info?$unparsed_version )
{
Reporter::error("No unparsed version string present in Info record with version in Software::found");
return F;
}
if ( ! info?$version )
{
local sw = parse(info$unparsed_version);
info$unparsed_version = sw$unparsed_version;
info$name = sw$name;
info$version = sw$version;
}
@if ( Cluster::is_enabled() )
Cluster::publish_hrw(Cluster::proxy_pool, info$host, Software::register,
info);
@else
event Software::register(info);
@endif
return T;
}

View file

@ -55,18 +55,20 @@ export {
global cluster_threshold_crossed: event(ss_name: string, key: SumStats::Key, thold_index: count);
}
# Add events to the cluster framework to make this work.
redef Cluster::manager2worker_events += /SumStats::cluster_(ss_request|get_result|threshold_crossed)/;
redef Cluster::manager2worker_events += /SumStats::(get_a_key)/;
redef Cluster::worker2manager_events += /SumStats::cluster_(send_result|key_intermediate_response)/;
redef Cluster::worker2manager_events += /SumStats::(send_a_key|send_no_key)/;
# This variable is maintained to know what keys have recently sent or received
# intermediate updates so they don't overwhelm the manager.
global recent_global_view_keys: set[string, Key] &create_expire=1min;
@if ( Cluster::local_node_type() != Cluster::MANAGER )
event bro_init() &priority=100
{
Broker::auto_publish(Cluster::manager_topic, SumStats::cluster_send_result);
Broker::auto_publish(Cluster::manager_topic, SumStats::cluster_key_intermediate_response);
Broker::auto_publish(Cluster::manager_topic, SumStats::send_a_key);
Broker::auto_publish(Cluster::manager_topic, SumStats::send_no_key);
}
# Result tables indexed on a uid that are currently being sent to the
# manager.
global sending_results: table[string] of ResultTable = table() &read_expire=1min;
@ -207,6 +209,14 @@ function request_key(ss_name: string, key: Key): Result
@if ( Cluster::local_node_type() == Cluster::MANAGER )
event bro_init() &priority=100
{
Broker::auto_publish(Cluster::worker_topic, SumStats::cluster_ss_request);
Broker::auto_publish(Cluster::worker_topic, SumStats::cluster_get_result);
Broker::auto_publish(Cluster::worker_topic, SumStats::cluster_threshold_crossed);
Broker::auto_publish(Cluster::worker_topic, SumStats::get_a_key);
}
# This variable is maintained by manager nodes as they collect and aggregate
# results.
# Index on a uid.