diff --git a/scripts/base/frameworks/pacf/main.bro b/scripts/base/frameworks/pacf/main.bro index aa2eb742f5..abc50fe428 100644 --- a/scripts/base/frameworks/pacf/main.bro +++ b/scripts/base/frameworks/pacf/main.bro @@ -250,11 +250,13 @@ export { redef record Rule += { ##< Internally set to the plugin handling the rule. - _plugin: PluginState &optional; + _plugin_id: count &optional; }; global plugins: vector of PluginState; +global plugin_ids: table[count] of PluginState; global rule_counter: count = 1; +global plugin_counter: count = 1; global rules: table[count] of Rule; event bro_init() &priority=5 @@ -352,6 +354,10 @@ function activate(p: PluginState, priority: int) plugins[|plugins|] = p; sort(plugins, function(p1: PluginState, p2: PluginState) : int { return p2$_priority - p1$_priority; }); + plugin_ids[plugin_counter] = p; + p$_id = plugin_counter; + ++plugin_counter; + log_msg(fmt("activated plugin with priority %d", priority), p); } @@ -395,9 +401,11 @@ function add_rule(r: Rule) : count { local p = plugins[i]; + # set before, in case the plugins sends and regenerates the plugin record later. + r$_plugin_id = p$_id; + if ( p$plugin$add_rule(p, r) ) { - r$_plugin = p; log_rule(r, "ADD", REQUESTED, p); return r$id; } @@ -409,10 +417,16 @@ function add_rule(r: Rule) : count function remove_rule(id: count) : bool { - local r = rules[id]; - local p = r$_plugin; + if ( id !in rules ) + { + Reporter::error(fmt("Rule %d does not exist in Pacf::remove_rule", id)); + return F; + } - if ( ! p$plugin$remove_rule(r$_plugin, r) ) + local r = rules[id]; + local p = plugin_ids[r$_plugin_id]; + + if ( ! p$plugin$remove_rule(p, r) ) { log_rule_error(r, "remove failed", p); return F; @@ -425,7 +439,7 @@ function remove_rule(id: count) : bool event rule_expire(r: Rule, p: PluginState) { if ( r$id !in rules ) - # Remove already. + # Removed already. return; event rule_timeout(r, FlowInfo(), p); diff --git a/scripts/base/frameworks/pacf/plugin.bro b/scripts/base/frameworks/pacf/plugin.bro index 6c33462a99..cc1b1f0ee6 100644 --- a/scripts/base/frameworks/pacf/plugin.bro +++ b/scripts/base/frameworks/pacf/plugin.bro @@ -9,6 +9,9 @@ export { ## Table for a plugin to store custom, instance-specfific state. config: table[string] of string &default=table(); + ## Unique plugin identifier -- used for backlookup of plugins from Rules. Set internally. + _id: count &optional; + ## Set internally. _priority: int &default=+0; }; diff --git a/scripts/base/frameworks/pacf/plugins/__load__.bro b/scripts/base/frameworks/pacf/plugins/__load__.bro index f3468e0b83..11e8723478 100644 --- a/scripts/base/frameworks/pacf/plugins/__load__.bro +++ b/scripts/base/frameworks/pacf/plugins/__load__.bro @@ -1,3 +1,4 @@ @load ./debug @load ./openflow @load ./packetfilter +@load ./broker diff --git a/scripts/base/frameworks/pacf/plugins/broker.bro b/scripts/base/frameworks/pacf/plugins/broker.bro new file mode 100644 index 0000000000..386d1d2373 --- /dev/null +++ b/scripts/base/frameworks/pacf/plugins/broker.bro @@ -0,0 +1,142 @@ +# Broker plugin for the pacf framework. Sends the raw data structures +# used in pacf on to Broker to allow for easy handling, e.g., of +# command-line scripts. + +module Pacf; + +@load ../plugin +@load base/frameworks/broker + +export { + ## Instantiates the broker plugin. + global create_broker: function(host: addr, host_port: port, topic: string, can_expire: bool &default=F) : PluginState; + + redef record PluginState += { + ## The broker topic used to send events to + broker_topic: string &optional; + ## The ID of this broker instance - for the mapping to PluginStates + broker_id: count &optional; + ## Broker host to connect to + broker_host: addr &optional; + ## Broker port to connect to + broker_port: port &optional; + }; + + global broker_add_rule: event(id: count, r: Rule); + global broker_remove_rule: event(id: count, r: Rule); + + global broker_rule_added: event(id: count, r: Rule, msg: string); + global broker_rule_removed: event(id: count, r: Rule, msg: string); + global broker_rule_error: event(id: count, r: Rule, msg: string); + global broker_rule_timeout: event(id: count, r: Rule, i: FlowInfo); +} + +global pacf_broker_topics: set[string] = set(); +global pacf_broker_id: table[count] of PluginState = table(); +global pacf_broker_current_id: count = 0; + +event Pacf::broker_rule_added(id: count, r: Rule, msg: string) + { + if ( id !in pacf_broker_id ) + { + Reporter::error(fmt("Pacf broker plugin with id %d not found, aborting", id)); + return; + } + + local p = pacf_broker_id[id]; + + event Pacf::rule_added(r, p, msg); + } + +event Pacf::broker_rule_removed(id: count, r: Rule, msg: string) + { + if ( id !in pacf_broker_id ) + { + Reporter::error(fmt("Pacf broker plugin with id %d not found, aborting", id)); + return; + } + + local p = pacf_broker_id[id]; + + event Pacf::rule_removed(r, p, msg); + } + +event Pacf::broker_rule_error(id: count, r: Rule, msg: string) + { + if ( id !in pacf_broker_id ) + { + Reporter::error(fmt("Pacf broker plugin with id %d not found, aborting", id)); + return; + } + + local p = pacf_broker_id[id]; + + event Pacf::rule_error(r, p, msg); + } + +event Pacf::broker_rule_timeout(id: count, r: Rule, i: FlowInfo) + { + if ( id !in pacf_broker_id ) + { + Reporter::error(fmt("Pacf broker plugin with id %d not found, aborting", id)); + return; + } + + local p = pacf_broker_id[id]; + + event Pacf::rule_timeout(r, i, p); + } + +function broker_name(p: PluginState) : string + { + return fmt("PACF Broker plugin - topic %s", p$broker_topic); + } + +function broker_add_rule_fun(p: PluginState, r: Rule) : bool + { + BrokerComm::event(p$broker_topic, BrokerComm::event_args(broker_add_rule, p$broker_id, r)); + return T; + } + +function broker_remove_rule_fun(p: PluginState, r: Rule) : bool + { + BrokerComm::event(p$broker_topic, BrokerComm::event_args(broker_remove_rule, p$broker_id, r)); + return T; + } + +global broker_plugin = Plugin( + $name=broker_name, + $can_expire = F, + $add_rule = broker_add_rule_fun, + $remove_rule = broker_remove_rule_fun + ); + +global broker_plugin_can_expire = Plugin( + $name=broker_name, + $can_expire = T, + $add_rule = broker_add_rule_fun, + $remove_rule = broker_remove_rule_fun + ); + +function create_broker(host: addr, host_port: port, topic: string, can_expire: bool &default=F) : PluginState + { + if ( topic in pacf_broker_topics ) + Reporter::warning(fmt("Topic %s was added to Pacf broker plugin twice. Possible duplication of commands", topic)); + else + add pacf_broker_topics[topic]; + + local plugin = broker_plugin; + if ( can_expire ) + plugin = broker_plugin_can_expire; + + local p: PluginState = [$broker_host=host, $broker_port=host_port, $plugin=plugin, $broker_topic=topic, $broker_id=pacf_broker_current_id]; + + pacf_broker_id[pacf_broker_current_id] = p; + ++pacf_broker_current_id; + + BrokerComm::enable(); + BrokerComm::connect(cat(host), host_port, 1sec); + BrokerComm::subscribe_to_events(topic); + + return p; + } diff --git a/scripts/base/frameworks/pacf/plugins/packetfilter.bro b/scripts/base/frameworks/pacf/plugins/packetfilter.bro index dbf6ba1136..116151eeb4 100644 --- a/scripts/base/frameworks/pacf/plugins/packetfilter.bro +++ b/scripts/base/frameworks/pacf/plugins/packetfilter.bro @@ -5,6 +5,8 @@ module Pacf; +@load ../plugin + export { ## Instantiates the packetfilter plugin. global create_packetfilter: function() : PluginState; diff --git a/scripts/base/frameworks/pacf/types.bro b/scripts/base/frameworks/pacf/types.bro index ac8bb06a35..723ac788c5 100644 --- a/scripts/base/frameworks/pacf/types.bro +++ b/scripts/base/frameworks/pacf/types.bro @@ -87,7 +87,7 @@ export { ## A rule for the framework to put in place. Of all rules currently in ## place, the first match will be taken, sorted by priority. All - ## further riles will be ignored. + ## further rules will be ignored. type Rule: record { ty: RuleType; ##< Type of rule. target: TargetType; ##< Where to apply rule. @@ -146,8 +146,6 @@ export { id: count &default=0; ##< Internally determined unique ID for this notification. Will be set when added. }; + } - - - diff --git a/testing/btest/Baseline/scripts.base.frameworks.pacf.basic/.stdout b/testing/btest/Baseline/scripts.base.frameworks.pacf.basic/.stdout index e01d62243e..da1794833d 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.pacf.basic/.stdout +++ b/testing/btest/Baseline/scripts.base.frameworks.pacf.basic/.stdout @@ -1,4 +1,4 @@ -pacf debug (Debug-All): add_rule: [ty=Pacf::DROP, target=Pacf::MONITOR, entity=[ty=Pacf::FLOW, conn=, flow=[src_h=10.10.1.4/32, src_p=1470/tcp, dst_h=74.53.140.153/32, dst_p=25/tcp, src_m=, dst_m=], ip=, mac=], expire=30.0 secs, priority=0, location=, c=, i=, d=, s=, mod=, id=2, _plugin=] -pacf debug (Debug-All): add_rule: [ty=Pacf::DROP, target=Pacf::FORWARD, entity=[ty=Pacf::ADDRESS, conn=, flow=, ip=10.10.1.4/32, mac=], expire=15.0 secs, priority=0, location=, c=, i=, d=, s=, mod=, id=3, _plugin=] -pacf debug (Debug-All): remove_rule: [ty=Pacf::DROP, target=Pacf::FORWARD, entity=[ty=Pacf::ADDRESS, conn=, flow=, ip=10.10.1.4/32, mac=], expire=15.0 secs, priority=0, location=, c=, i=, d=, s=, mod=, id=3, _plugin=[config={\x0a\x09[all] = 1\x0a}, _priority=0, plugin=[name=Pacf::debug_name\x0a{ \x0areturn (fmt(Debug-%s, (Pacf::do_something(Pacf::p) ? All : None)));\x0a}, can_expire=F, init=Pacf::debug_init\x0a{ \x0aPacf::debug_log(Pacf::p, init);\x0a}, done=Pacf::debug_done\x0a{ \x0aPacf::debug_log(Pacf::p, init);\x0a}, add_rule=Pacf::debug_add_rule\x0a{ \x0aPacf::s = fmt(add_rule: %s, Pacf::r);\x0aPacf::debug_log(Pacf::p, Pacf::s);\x0aif (Pacf::do_something(Pacf::p)) \x0a\x09{ \x0a\x09event Pacf::rule_added(Pacf::r, Pacf::p, );\x0a\x09return (T);\x0a\x09}\x0a\x0areturn (F);\x0a}, remove_rule=Pacf::debug_remove_rule\x0a{ \x0aPacf::s = fmt(remove_rule: %s, Pacf::r);\x0aPacf::debug_log(Pacf::p, Pacf::s);\x0aevent Pacf::rule_removed(Pacf::r, Pacf::p, );\x0areturn (T);\x0a}, add_notification=Pacf::debug_add_notification\x0a{ \x0aPacf::s = fmt(add_notification: %s, Pacf::r);\x0aPacf::debug_log(Pacf::p, Pacf::s);\x0aif (Pacf::do_something(Pacf::p)) \x0a\x09{ \x0a\x09event Pacf::notification_added(Pacf::r, Pacf::p, );\x0a\x09return (T);\x0a\x09}\x0a\x0areturn (F);\x0a}, remove_notification=Pacf::debug_remove_notification\x0a{ \x0aPacf::s = fmt(remove_notification: %s, Pacf::r);\x0aPacf::debug_log(Pacf::p, Pacf::s);\x0areturn (Pacf::do_something(Pacf::p));\x0a}, transaction_begin=Pacf::debug_transaction_begin\x0a{ \x0aPacf::debug_log(Pacf::p, transaction_begin);\x0a}, transaction_end=Pacf::debug_transaction_end\x0a{ \x0aPacf::debug_log(Pacf::p, transaction_end);\x0a}], of_controller=, of_config=]] -pacf debug (Debug-All): remove_rule: [ty=Pacf::DROP, target=Pacf::MONITOR, entity=[ty=Pacf::FLOW, conn=, flow=[src_h=10.10.1.4/32, src_p=1470/tcp, dst_h=74.53.140.153/32, dst_p=25/tcp, src_m=, dst_m=], ip=, mac=], expire=30.0 secs, priority=0, location=, c=, i=, d=, s=, mod=, id=2, _plugin=[config={\x0a\x09[all] = 1\x0a}, _priority=0, plugin=[name=Pacf::debug_name\x0a{ \x0areturn (fmt(Debug-%s, (Pacf::do_something(Pacf::p) ? All : None)));\x0a}, can_expire=F, init=Pacf::debug_init\x0a{ \x0aPacf::debug_log(Pacf::p, init);\x0a}, done=Pacf::debug_done\x0a{ \x0aPacf::debug_log(Pacf::p, init);\x0a}, add_rule=Pacf::debug_add_rule\x0a{ \x0aPacf::s = fmt(add_rule: %s, Pacf::r);\x0aPacf::debug_log(Pacf::p, Pacf::s);\x0aif (Pacf::do_something(Pacf::p)) \x0a\x09{ \x0a\x09event Pacf::rule_added(Pacf::r, Pacf::p, );\x0a\x09return (T);\x0a\x09}\x0a\x0areturn (F);\x0a}, remove_rule=Pacf::debug_remove_rule\x0a{ \x0aPacf::s = fmt(remove_rule: %s, Pacf::r);\x0aPacf::debug_log(Pacf::p, Pacf::s);\x0aevent Pacf::rule_removed(Pacf::r, Pacf::p, );\x0areturn (T);\x0a}, add_notification=Pacf::debug_add_notification\x0a{ \x0aPacf::s = fmt(add_notification: %s, Pacf::r);\x0aPacf::debug_log(Pacf::p, Pacf::s);\x0aif (Pacf::do_something(Pacf::p)) \x0a\x09{ \x0a\x09event Pacf::notification_added(Pacf::r, Pacf::p, );\x0a\x09return (T);\x0a\x09}\x0a\x0areturn (F);\x0a}, remove_notification=Pacf::debug_remove_notification\x0a{ \x0aPacf::s = fmt(remove_notification: %s, Pacf::r);\x0aPacf::debug_log(Pacf::p, Pacf::s);\x0areturn (Pacf::do_something(Pacf::p));\x0a}, transaction_begin=Pacf::debug_transaction_begin\x0a{ \x0aPacf::debug_log(Pacf::p, transaction_begin);\x0a}, transaction_end=Pacf::debug_transaction_end\x0a{ \x0aPacf::debug_log(Pacf::p, transaction_end);\x0a}], of_controller=, of_config=]] +pacf debug (Debug-All): add_rule: [ty=Pacf::DROP, target=Pacf::MONITOR, entity=[ty=Pacf::FLOW, conn=, flow=[src_h=10.10.1.4/32, src_p=1470/tcp, dst_h=74.53.140.153/32, dst_p=25/tcp, src_m=, dst_m=], ip=, mac=], expire=30.0 secs, priority=0, location=, c=, i=, d=, s=, mod=, id=2, _plugin_id=] +pacf debug (Debug-All): add_rule: [ty=Pacf::DROP, target=Pacf::FORWARD, entity=[ty=Pacf::ADDRESS, conn=, flow=, ip=10.10.1.4/32, mac=], expire=15.0 secs, priority=0, location=, c=, i=, d=, s=, mod=, id=3, _plugin_id=] +pacf debug (Debug-All): remove_rule: [ty=Pacf::DROP, target=Pacf::FORWARD, entity=[ty=Pacf::ADDRESS, conn=, flow=, ip=10.10.1.4/32, mac=], expire=15.0 secs, priority=0, location=, c=, i=, d=, s=, mod=, id=3, _plugin_id=1] +pacf debug (Debug-All): remove_rule: [ty=Pacf::DROP, target=Pacf::MONITOR, entity=[ty=Pacf::FLOW, conn=, flow=[src_h=10.10.1.4/32, src_p=1470/tcp, dst_h=74.53.140.153/32, dst_p=25/tcp, src_m=, dst_m=], ip=, mac=], expire=30.0 secs, priority=0, location=, c=, i=, d=, s=, mod=, id=2, _plugin_id=1] diff --git a/testing/btest/Baseline/scripts.base.frameworks.pacf.broker/recv.recv.out b/testing/btest/Baseline/scripts.base.frameworks.pacf.broker/recv.recv.out new file mode 100644 index 0000000000..922b649a98 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.pacf.broker/recv.recv.out @@ -0,0 +1,5 @@ +BrokerComm::incoming_connection_established +add_rule, 0, [ty=Pacf::DROP, target=Pacf::MONITOR, entity=[ty=Pacf::FLOW, conn=, flow=[src_h=10.10.1.4/32, src_p=1470/tcp, dst_h=74.53.140.153/32, dst_p=25/tcp, src_m=, dst_m=], ip=, mac=], expire=36000.0, priority=0, location=, c=, i=, d=, s=, mod=, id=2, _plugin_id=1] +add_rule, 0, [ty=Pacf::DROP, target=Pacf::FORWARD, entity=[ty=Pacf::ADDRESS, conn=, flow=, ip=10.10.1.4/32, mac=], expire=36000.0, priority=0, location=, c=, i=, d=, s=, mod=, id=3, _plugin_id=1] +remove_rule, 0, [ty=Pacf::DROP, target=Pacf::MONITOR, entity=[ty=Pacf::FLOW, conn=, flow=[src_h=10.10.1.4/32, src_p=1470/tcp, dst_h=74.53.140.153/32, dst_p=25/tcp, src_m=, dst_m=], ip=, mac=], expire=36000.0, priority=0, location=, c=, i=, d=, s=, mod=, id=2, _plugin_id=1] +remove_rule, 0, [ty=Pacf::DROP, target=Pacf::FORWARD, entity=[ty=Pacf::ADDRESS, conn=, flow=, ip=10.10.1.4/32, mac=], expire=36000.0, priority=0, location=, c=, i=, d=, s=, mod=, id=3, _plugin_id=1] diff --git a/testing/btest/Baseline/scripts.base.frameworks.pacf.broker/send.send.out b/testing/btest/Baseline/scripts.base.frameworks.pacf.broker/send.send.out new file mode 100644 index 0000000000..dab0129d5d --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.pacf.broker/send.send.out @@ -0,0 +1,7 @@ +BrokerComm::outgoing_connection_established, 127.0.0.1, 9999/tcp +rule added, [ty=Pacf::DROP, target=Pacf::MONITOR, entity=[ty=Pacf::FLOW, conn=, flow=[src_h=10.10.1.4/32, src_p=1470/tcp, dst_h=74.53.140.153/32, dst_p=25/tcp, src_m=, dst_m=], ip=, mac=], expire=36000.0, priority=0, location=, c=, i=, d=, s=, mod=, id=2, _plugin_id=1] +rule added, [ty=Pacf::DROP, target=Pacf::FORWARD, entity=[ty=Pacf::ADDRESS, conn=, flow=, ip=10.10.1.4/32, mac=], expire=36000.0, priority=0, location=, c=, i=, d=, s=, mod=, id=3, _plugin_id=1] +ruke timeout, [ty=Pacf::DROP, target=Pacf::MONITOR, entity=[ty=Pacf::FLOW, conn=, flow=[src_h=10.10.1.4/32, src_p=1470/tcp, dst_h=74.53.140.153/32, dst_p=25/tcp, src_m=, dst_m=], ip=, mac=], expire=36000.0, priority=0, location=, c=, i=, d=, s=, mod=, id=2, _plugin_id=1], [duration=, packet_count=, byte_count=] +rule removed, [ty=Pacf::DROP, target=Pacf::MONITOR, entity=[ty=Pacf::FLOW, conn=, flow=[src_h=10.10.1.4/32, src_p=1470/tcp, dst_h=74.53.140.153/32, dst_p=25/tcp, src_m=, dst_m=], ip=, mac=], expire=36000.0, priority=0, location=, c=, i=, d=, s=, mod=, id=2, _plugin_id=1] +ruke timeout, [ty=Pacf::DROP, target=Pacf::FORWARD, entity=[ty=Pacf::ADDRESS, conn=, flow=, ip=10.10.1.4/32, mac=], expire=36000.0, priority=0, location=, c=, i=, d=, s=, mod=, id=3, _plugin_id=1], [duration=, packet_count=, byte_count=] +rule removed, [ty=Pacf::DROP, target=Pacf::FORWARD, entity=[ty=Pacf::ADDRESS, conn=, flow=, ip=10.10.1.4/32, mac=], expire=36000.0, priority=0, location=, c=, i=, d=, s=, mod=, id=3, _plugin_id=1] diff --git a/testing/btest/scripts/base/frameworks/pacf/broker.bro b/testing/btest/scripts/base/frameworks/pacf/broker.bro new file mode 100644 index 0000000000..a3a59e5acb --- /dev/null +++ b/testing/btest/scripts/base/frameworks/pacf/broker.bro @@ -0,0 +1,102 @@ +# @TEST-SERIALIZE: brokercomm +# @TEST-REQUIRES: grep -q ENABLE_BROKER $BUILD/CMakeCache.txt +# @TEST-EXEC: btest-bg-run recv "bro -b ../recv.bro broker_port=$BROKER_PORT >recv.out" +# @TEST-EXEC: btest-bg-run send "bro -b -r $TRACES/smtp.trace --pseudo-realtime ../send.bro broker_port=$BROKER_PORT >send.out" + +# @TEST-EXEC: btest-bg-wait 20 +# @TEST-EXEC: btest-diff recv/recv.out +# @TEST-EXEC: btest-diff send/send.out + +@TEST-START-FILE send.bro + +@load base/frameworks/pacf + +const broker_port: port &redef; +redef exit_only_after_terminate = T; + +event bro_init() + { + suspend_processing(); + local pacf_broker = Pacf::create_broker(127.0.0.1, broker_port, "bro/event/pacftest", T); + Pacf::activate(pacf_broker, 0); + } + +event BrokerComm::outgoing_connection_established(peer_address: string, + peer_port: port, + peer_name: string) + { + print "BrokerComm::outgoing_connection_established", peer_address, peer_port; + continue_processing(); + } + +event BrokerComm::outgoing_connection_broken(peer_address: string, + peer_port: port) + { + terminate(); + } + +event connection_established(c: connection) + { + local id = c$id; + Pacf::shunt_flow([$src_h=id$orig_h, $src_p=id$orig_p, $dst_h=id$resp_h, $dst_p=id$resp_p], 10hrs); + Pacf::drop_address(id$orig_h, 10hrs); + } + +event Pacf::rule_added(r: Pacf::Rule, p: Pacf::PluginState, msg: string) + { + print "rule added", r; + Pacf::remove_rule(r$id); + } + +event Pacf::rule_removed(r: Pacf::Rule, p: Pacf::PluginState, msg: string) + { + print "rule removed", r; + } + +event Pacf::rule_timeout(r: Pacf::Rule, i: Pacf::FlowInfo, p: Pacf::PluginState) + { + print "ruke timeout", r, i; + } + +@TEST-END-FILE + +@TEST-START-FILE recv.bro + +@load base/frameworks/pacf +@load base/frameworks/broker + +const broker_port: port &redef; +redef exit_only_after_terminate = T; + +event bro_init() + { + BrokerComm::enable(); + BrokerComm::subscribe_to_events("bro/event/pacftest"); + BrokerComm::listen(broker_port, "127.0.0.1"); + } + +event BrokerComm::incoming_connection_established(peer_name: string) + { + print "BrokerComm::incoming_connection_established"; + } + +event Pacf::broker_add_rule(id: count, r: Pacf::Rule) + { + print "add_rule", id, r; + + BrokerComm::event("bro/event/pacftest", BrokerComm::event_args(Pacf::broker_rule_added, id, r, "")); + } + +event Pacf::broker_remove_rule(id: count, r: Pacf::Rule) + { + print "remove_rule", id, r; + + BrokerComm::event("bro/event/pacftest", BrokerComm::event_args(Pacf::broker_rule_timeout, id, r, Pacf::FlowInfo())); + BrokerComm::event("bro/event/pacftest", BrokerComm::event_args(Pacf::broker_rule_removed, id, r, "")); + + if ( r$id == 3 ) + terminate(); + } + +@TEST-END-FILE +