add pacf plugin that directly outputs messages to broker.

Also fix a few problems in pacf in the process of doing this.
This commit is contained in:
Johanna Amann 2015-05-26 11:19:55 -07:00
parent 94fbd492ca
commit 0a49b8cdf6
10 changed files with 288 additions and 14 deletions

View file

@ -250,11 +250,13 @@ export {
redef record Rule += {
##< Internally set to the plugin handling the rule.
_plugin: PluginState &optional;
_plugin_id: count &optional;
};
global plugins: vector of PluginState;
global plugin_ids: table[count] of PluginState;
global rule_counter: count = 1;
global plugin_counter: count = 1;
global rules: table[count] of Rule;
event bro_init() &priority=5
@ -352,6 +354,10 @@ function activate(p: PluginState, priority: int)
plugins[|plugins|] = p;
sort(plugins, function(p1: PluginState, p2: PluginState) : int { return p2$_priority - p1$_priority; });
plugin_ids[plugin_counter] = p;
p$_id = plugin_counter;
++plugin_counter;
log_msg(fmt("activated plugin with priority %d", priority), p);
}
@ -395,9 +401,11 @@ function add_rule(r: Rule) : count
{
local p = plugins[i];
# set before, in case the plugins sends and regenerates the plugin record later.
r$_plugin_id = p$_id;
if ( p$plugin$add_rule(p, r) )
{
r$_plugin = p;
log_rule(r, "ADD", REQUESTED, p);
return r$id;
}
@ -409,10 +417,16 @@ function add_rule(r: Rule) : count
function remove_rule(id: count) : bool
{
local r = rules[id];
local p = r$_plugin;
if ( id !in rules )
{
Reporter::error(fmt("Rule %d does not exist in Pacf::remove_rule", id));
return F;
}
if ( ! p$plugin$remove_rule(r$_plugin, r) )
local r = rules[id];
local p = plugin_ids[r$_plugin_id];
if ( ! p$plugin$remove_rule(p, r) )
{
log_rule_error(r, "remove failed", p);
return F;
@ -425,7 +439,7 @@ function remove_rule(id: count) : bool
event rule_expire(r: Rule, p: PluginState)
{
if ( r$id !in rules )
# Remove already.
# Removed already.
return;
event rule_timeout(r, FlowInfo(), p);

View file

@ -9,6 +9,9 @@ export {
## Table for a plugin to store custom, instance-specfific state.
config: table[string] of string &default=table();
## Unique plugin identifier -- used for backlookup of plugins from Rules. Set internally.
_id: count &optional;
## Set internally.
_priority: int &default=+0;
};

View file

@ -1,3 +1,4 @@
@load ./debug
@load ./openflow
@load ./packetfilter
@load ./broker

View file

@ -0,0 +1,142 @@
# Broker plugin for the pacf framework. Sends the raw data structures
# used in pacf on to Broker to allow for easy handling, e.g., of
# command-line scripts.
module Pacf;
@load ../plugin
@load base/frameworks/broker
export {
## Instantiates the broker plugin.
global create_broker: function(host: addr, host_port: port, topic: string, can_expire: bool &default=F) : PluginState;
redef record PluginState += {
## The broker topic used to send events to
broker_topic: string &optional;
## The ID of this broker instance - for the mapping to PluginStates
broker_id: count &optional;
## Broker host to connect to
broker_host: addr &optional;
## Broker port to connect to
broker_port: port &optional;
};
global broker_add_rule: event(id: count, r: Rule);
global broker_remove_rule: event(id: count, r: Rule);
global broker_rule_added: event(id: count, r: Rule, msg: string);
global broker_rule_removed: event(id: count, r: Rule, msg: string);
global broker_rule_error: event(id: count, r: Rule, msg: string);
global broker_rule_timeout: event(id: count, r: Rule, i: FlowInfo);
}
global pacf_broker_topics: set[string] = set();
global pacf_broker_id: table[count] of PluginState = table();
global pacf_broker_current_id: count = 0;
event Pacf::broker_rule_added(id: count, r: Rule, msg: string)
{
if ( id !in pacf_broker_id )
{
Reporter::error(fmt("Pacf broker plugin with id %d not found, aborting", id));
return;
}
local p = pacf_broker_id[id];
event Pacf::rule_added(r, p, msg);
}
event Pacf::broker_rule_removed(id: count, r: Rule, msg: string)
{
if ( id !in pacf_broker_id )
{
Reporter::error(fmt("Pacf broker plugin with id %d not found, aborting", id));
return;
}
local p = pacf_broker_id[id];
event Pacf::rule_removed(r, p, msg);
}
event Pacf::broker_rule_error(id: count, r: Rule, msg: string)
{
if ( id !in pacf_broker_id )
{
Reporter::error(fmt("Pacf broker plugin with id %d not found, aborting", id));
return;
}
local p = pacf_broker_id[id];
event Pacf::rule_error(r, p, msg);
}
event Pacf::broker_rule_timeout(id: count, r: Rule, i: FlowInfo)
{
if ( id !in pacf_broker_id )
{
Reporter::error(fmt("Pacf broker plugin with id %d not found, aborting", id));
return;
}
local p = pacf_broker_id[id];
event Pacf::rule_timeout(r, i, p);
}
function broker_name(p: PluginState) : string
{
return fmt("PACF Broker plugin - topic %s", p$broker_topic);
}
function broker_add_rule_fun(p: PluginState, r: Rule) : bool
{
BrokerComm::event(p$broker_topic, BrokerComm::event_args(broker_add_rule, p$broker_id, r));
return T;
}
function broker_remove_rule_fun(p: PluginState, r: Rule) : bool
{
BrokerComm::event(p$broker_topic, BrokerComm::event_args(broker_remove_rule, p$broker_id, r));
return T;
}
global broker_plugin = Plugin(
$name=broker_name,
$can_expire = F,
$add_rule = broker_add_rule_fun,
$remove_rule = broker_remove_rule_fun
);
global broker_plugin_can_expire = Plugin(
$name=broker_name,
$can_expire = T,
$add_rule = broker_add_rule_fun,
$remove_rule = broker_remove_rule_fun
);
function create_broker(host: addr, host_port: port, topic: string, can_expire: bool &default=F) : PluginState
{
if ( topic in pacf_broker_topics )
Reporter::warning(fmt("Topic %s was added to Pacf broker plugin twice. Possible duplication of commands", topic));
else
add pacf_broker_topics[topic];
local plugin = broker_plugin;
if ( can_expire )
plugin = broker_plugin_can_expire;
local p: PluginState = [$broker_host=host, $broker_port=host_port, $plugin=plugin, $broker_topic=topic, $broker_id=pacf_broker_current_id];
pacf_broker_id[pacf_broker_current_id] = p;
++pacf_broker_current_id;
BrokerComm::enable();
BrokerComm::connect(cat(host), host_port, 1sec);
BrokerComm::subscribe_to_events(topic);
return p;
}

View file

@ -5,6 +5,8 @@
module Pacf;
@load ../plugin
export {
## Instantiates the packetfilter plugin.
global create_packetfilter: function() : PluginState;

View file

@ -87,7 +87,7 @@ export {
## A rule for the framework to put in place. Of all rules currently in
## place, the first match will be taken, sorted by priority. All
## further riles will be ignored.
## further rules will be ignored.
type Rule: record {
ty: RuleType; ##< Type of rule.
target: TargetType; ##< Where to apply rule.
@ -146,8 +146,6 @@ export {
id: count &default=0; ##< Internally determined unique ID for this notification. Will be set when added.
};
}