From e0086005f8ff52f99a11e26169b0a35434e610ee Mon Sep 17 00:00:00 2001 From: Seth Hall Date: Wed, 25 Apr 2012 17:12:12 -0400 Subject: [PATCH] Checkpoint on the packet filter framework. - Packet loss interval changed to 5 minutes by default. Users were getting too many notices from this. - BPF load balancing (ipv4 and ipv6). This will tie in with upcoming BroControl support for configuring this. - BPF based connection sampling. - Small improvements to how and when filters are installed. --- .../frameworks/packet-filter/__load__.bro | 1 + .../frameworks/packet-filter/load-balance.bro | 120 ++++++++++++++++ .../base/frameworks/packet-filter/main.bro | 54 +++++-- .../frameworks/packet-filter/netstats.bro | 2 +- .../base/frameworks/packet-filter/shunt.bro | 136 +++++++++++++++--- .../base/frameworks/packet-filter/utils.bro | 17 ++- 6 files changed, 296 insertions(+), 34 deletions(-) create mode 100644 scripts/base/frameworks/packet-filter/load-balance.bro diff --git a/scripts/base/frameworks/packet-filter/__load__.bro b/scripts/base/frameworks/packet-filter/__load__.bro index 45c2488c00..14da4e4893 100644 --- a/scripts/base/frameworks/packet-filter/__load__.bro +++ b/scripts/base/frameworks/packet-filter/__load__.bro @@ -1,4 +1,5 @@ @load ./utils @load ./main @load ./shunt +@load ./load-balance @load ./netstats diff --git a/scripts/base/frameworks/packet-filter/load-balance.bro b/scripts/base/frameworks/packet-filter/load-balance.bro new file mode 100644 index 0000000000..105a37b617 --- /dev/null +++ b/scripts/base/frameworks/packet-filter/load-balance.bro @@ -0,0 +1,120 @@ +##! This script implements an automated BPF based load balancing solution for Bro clusters. +##! It is completely automated when multiple worker processes are configured for a single +##! interface on a host. One caveat is that in order for this script to work, your traffic +##! can't have any headers above the Ethernet header (vlan, mpls). + +@load base/frameworks/cluster +@load base/frameworks/packet-filter + +module PacketFilter; + +export { + redef record Cluster::Node += { + ## A BPF filter for load balancing traffic sniffed on a single interface + ## across a number of processes. In normal uses, this will be assigned + ## dynamically by the manager and installed by the workers. + lb_filter: string &optional; + }; + + ## Control if BPF based load balancing is enabled on cluster deployments. + const enable_BPF_load_balancing = F &redef; + + # Configure the cluster framework to enable the load balancing filter configuration. + #global send_filter: event(for_node: string, filter: string); + #global confirm_filter_installation: event(success: bool); +} + +#redef Cluster::manager2worker_events += /LoadBalancing::send_filter/; +#redef Cluster::worker2manager_events += /LoadBalancing::confirm_filter_installation/; + +@if ( Cluster::is_enabled() ) + +@if ( Cluster::local_node_type() == Cluster::MANAGER ) + +event bro_init() &priority=5 + { + if ( ! enable_BPF_load_balancing ) + return; + + local worker_ip_interface: table[addr, string] of count = table(); + for ( n in Cluster::nodes ) + { + local this_node = Cluster::nodes[n]; + + # Only workers! + if ( this_node$node_type != Cluster::WORKER || + ! this_node?$interface ) + next; + + if ( [this_node$ip, this_node$interface] !in worker_ip_interface ) + worker_ip_interface[this_node$ip, this_node$interface] = 0; + ++worker_ip_interface[this_node$ip, this_node$interface]; + } + + # Now that we've counted up how many processes are running on an interface + # let's create the filters for each worker. + local lb_proc_track: table[addr, string] of count = table(); + for ( no in Cluster::nodes ) + { + local that_node = Cluster::nodes[no]; + if ( that_node$node_type == Cluster::WORKER && + that_node?$interface && [that_node$ip, that_node$interface] in worker_ip_interface ) + { + if ( [that_node$ip, that_node$interface] !in lb_proc_track ) + lb_proc_track[that_node$ip, that_node$interface] = 0; + + local this_lb_proc = lb_proc_track[that_node$ip, that_node$interface]; + local total_lb_procs = worker_ip_interface[that_node$ip, that_node$interface]; + + ++lb_proc_track[that_node$ip, that_node$interface]; + if ( total_lb_procs > 1 ) + { + that_node$lb_filter = PacketFilter::sample_filter(total_lb_procs, this_lb_proc); + Communication::nodes[no]$capture_filter = that_node$lb_filter; + } + } + } + } + +#event remote_connection_established(p: event_peer) &priority=-5 +# { +# if ( is_remote_event() ) +# return; +# +# local for_node = p$descr; +# # Send the filter to the peer. +# if ( for_node in Cluster::nodes && +# Cluster::nodes[for_node]?$lb_filter ) +# { +# local filter = Cluster::nodes[for_node]$lb_filter; +# event LoadBalancing::send_filter(for_node, filter); +# } +# } + +#event LoadBalancing::confirm_filter_installation(success: bool) +# { +# # This doesn't really matter yet since we aren't getting back a meaningful success response. +# } + +@endif + + +@if ( Cluster::local_node_type() == Cluster::WORKER ) + +#event LoadBalancing::send_filter(for_node: string, filter: string) +event remote_capture_filter(p: event_peer, filter: string) + { + #if ( for_node !in Cluster::nodes ) + # return; + # + #if ( Cluster::node == for_node ) + # { + restrict_filters["lb_filter"] = filter; + PacketFilter::install(); + #event LoadBalancing::confirm_filter_installation(T); + # } + } + +@endif + +@endif diff --git a/scripts/base/frameworks/packet-filter/main.bro b/scripts/base/frameworks/packet-filter/main.bro index 66a557f53d..6e839c9210 100644 --- a/scripts/base/frameworks/packet-filter/main.bro +++ b/scripts/base/frameworks/packet-filter/main.bro @@ -55,6 +55,11 @@ export { ## dynamically built filter. const unrestricted_filter = "" &redef; + ## Filter string which is unconditionally and'ed to the beginning of every + ## dynamically built filter. This is mostly used when a custom filter is being + ## used but MPLS or VLAN tags are on the traffic. + const restricted_filter = "" &redef; + ## The maximum amount of time that you'd like to allow for filters to compile. ## If this time is exceeded, compensation measures may be taken by the framework ## to reduce the filter size. This threshold being crossed also results in @@ -69,7 +74,7 @@ export { ## ## filter: A BPF expression of traffic that should be excluded. ## - ## Returns: A boolean value to indicate if the fitler was successfully + ## Returns: A boolean value to indicate if the filter was successfully ## installed or not. global exclude: function(filter_id: string, filter: string): bool; @@ -90,7 +95,7 @@ export { ## Call this function to build and install a new dynamically built ## packet filter. - global install: function(); + global install: function(): bool; ## A data structure to represent filter generating factories. type FilterFactory: record { @@ -121,6 +126,9 @@ redef capture_filters += { ["default"] = default_capture_filter }; # install the filter. global currently_building = F; +# Internal tracking for if the the filter being built has possibly been changed. +global filter_changed = F; + global filter_factories: set[FilterFactory] = {}; redef enum PcapFilterID += { @@ -139,7 +147,17 @@ function test_filter(filter: string): bool return T; } -event bro_init() &priority=6 +# This tracks any changes for filtering mechanisms that play along nice +# and set filter_changed to T. +event filter_change_tracking() + { + if ( filter_changed ) + install(); + + schedule 5min { filter_change_tracking() }; + } + +event bro_init() &priority=5 { Log::create_stream(PacketFilter::LOG, [$columns=Info]); @@ -155,8 +173,13 @@ event bro_init() &priority=6 if ( ! test_filter(restrict_filters[id]) ) Reporter::fatal(fmt("Invalid restrict filter named '%s' - '%s'", id, restrict_filters[id])); } + } +event bro_init() &priority=-5 + { install(); + + event filter_change_tracking(); } function register_filter_factory(ff: FilterFactory) @@ -233,27 +256,35 @@ function build(): string if ( unrestricted_filter != "" ) filter = combine_filters(unrestricted_filter, "or", filter); + if ( restricted_filter != "" ) + filter = combine_filters(restricted_filter, "and", filter); currently_building = F; return filter; } -function install() +function install(): bool { if ( currently_building ) - return; + return F; - current_filter = build(); + local tmp_filter = build(); #local ts = current_time(); - if ( ! precompile_pcap_filter(DefaultPcapFilter, current_filter) ) + if ( ! precompile_pcap_filter(DefaultPcapFilter, tmp_filter) ) { NOTICE([$note=Compile_Failure, $msg=fmt("Compiling packet filter failed"), - $sub=current_filter]); - Reporter::fatal(fmt("Bad pcap filter '%s'", current_filter)); + $sub=tmp_filter]); + if ( network_time() == 0.0 ) + Reporter::fatal(fmt("Bad pcap filter '%s'", tmp_filter)); + else + Reporter::warning(fmt("Bad pcap filter '%s'", tmp_filter)); } + # Set it to the current filter if it passed precompiling + current_filter = tmp_filter; + #local diff = current_time()-ts; #if ( diff > max_filter_compile_time ) # NOTICE([$note=Too_Long_To_Compile_Filter, @@ -278,8 +309,11 @@ function install() $msg=fmt("Installing packet filter failed"), $sub=current_filter]); } - if ( reading_live_traffic() || reading_traces() ) Log::write(PacketFilter::LOG, info); + + # Update the filter change tracking + filter_changed = F; + return T; } diff --git a/scripts/base/frameworks/packet-filter/netstats.bro b/scripts/base/frameworks/packet-filter/netstats.bro index 9fbaa5cd1d..b5ffe24f54 100644 --- a/scripts/base/frameworks/packet-filter/netstats.bro +++ b/scripts/base/frameworks/packet-filter/netstats.bro @@ -13,7 +13,7 @@ export { }; ## This is the interval between individual statistics collection. - const stats_collection_interval = 10secs; + const stats_collection_interval = 5min; } event net_stats_update(last_stat: NetStats) diff --git a/scripts/base/frameworks/packet-filter/shunt.bro b/scripts/base/frameworks/packet-filter/shunt.bro index b001da0640..5527592642 100644 --- a/scripts/base/frameworks/packet-filter/shunt.bro +++ b/scripts/base/frameworks/packet-filter/shunt.bro @@ -3,45 +3,73 @@ module PacketFilter; export { + ## The maximum number of BPF based shunts that Bro is allowed to perform. const max_bpf_shunts = 100 &redef; - + + ## Call this function to use BPF to shunt a connection (to prevent the + ## data packets from reaching Bro). For TCP connections, control packets + ## are still allowed through so that Bro can continue logging the connection + ## and it can stop shunting once the connection ends. global shunt_conn: function(id: conn_id): bool; + ## This function will use a BPF expresssion to shunt traffic between + ## the two hosts given in the `conn_id` so that the traffic is never + ## exposed to Bro's traffic processing. + global shunt_host_pair: function(id: conn_id): bool; + + ## Remove shunting for a host pair given as a `conn_id`. The filter + ## is not immediately removed. It waits for the occassional filter + ## update done by the `PacketFilter` framework. + global unshunt_host_pair: function(id: conn_id): bool; + + ## Performs the same function as the `unshunt_host_pair` function, but + ## it forces an immediate filter update. + global force_unshunt_host_pair: function(id: conn_id): bool; + + ## Retrieve the currently shunted connections. + global current_shunted_conns: function(): set[conn_id]; + + ## Retrieve the currently shunted host pairs. + global current_shunted_host_pairs: function(): set[conn_id]; + redef enum Notice::Type += { ## Indicative that :bro:id:`max_bpf_shunts` connections are already ## being shunted with BPF filters and no more are allowed. No_More_Conn_Shunts_Available, + + ## Limitations in BPF make shunting some connections with BPF impossible. + ## This notice encompasses those various cases. + Cannot_BPF_Shunt_Conn, }; } global shunted_conns: set[conn_id]; -global shunted_conns_non_flag_tracking: set[conn_id]; +global shunted_host_pairs: set[conn_id]; function conn_shunt_filters() { - # TODO: this could wrongly match if a connection happens with the ports reversed. - local filter = ""; - local ipv4_tcp_filter = ""; + # NOTE: this could wrongly match if a connection happens with the ports reversed. + local tcp_filter = "tcp and tcp[tcpflags] & (tcp-syn|tcp-fin|tcp-rst) == 0"; + local udp_filter = ""; for ( id in shunted_conns ) { local prot = get_port_transport_proto(id$resp_p); - # TODO: add ipv6 - #if ( prot == udp ) #|| is_ipv6_addr(id$orig_h) ) - # { - # next; - # shunt_for() - # } - - if ( prot == tcp ) - ipv4_tcp_filter = combine_filters(ipv4_tcp_filter, "and", fmt("host %s and port %d and host %s and port %d and %s", id$orig_h, id$orig_p, id$resp_h, id$resp_p, prot)); + local filt = fmt("host %s and port %d and host %s and port %d", id$orig_h, id$orig_p, id$resp_h, id$resp_p); + if ( prot == udp ) + udp_filter = combine_filters(udp_filter, "and", filt); + else if ( prot == tcp ) + tcp_filter = combine_filters(tcp_filter, "and", filt); } + local conn_shunt_filter = combine_filters(tcp_filter, "and", udp_filter); - ipv4_tcp_filter = combine_filters(ipv4_tcp_filter, "and", "tcp[tcpflags] & (tcp-syn|tcp-fin|tcp-rst) == 0"); + for ( id in shunted_host_pairs ) + { + local hp_filter = fmt("host %s and host %s", id$orig_h, id$resp_h); + + } - if ( ipv4_tcp_filter == "" ) - return; - PacketFilter::exclude("conn_shunt_filters", ipv4_tcp_filter); + PacketFilter::exclude("conn_shunt_filters", conn_shunt_filter); } event bro_init() &priority=5 @@ -51,15 +79,79 @@ event bro_init() &priority=5 ]); } -function shunt_conn(id: conn_id): bool +function current_shunted_conns(): set[conn_id] { - if ( |shunted_conns| + |shunted_conns_non_flag_tracking| > max_bpf_shunts ) + return shunted_conns; + } + +function current_shunted_host_pairs(): set[conn_id] + { + return shunted_host_pairs; + } + +function reached_max_shunts(): bool + { + if ( |shunted_conns| + |shunted_host_pairs| > max_bpf_shunts ) { NOTICE([$note=No_More_Conn_Shunts_Available, $msg=fmt("%d BPF shunts are in place and no more will be added until space clears.", max_bpf_shunts)]); + return T; + } + else + return F; + } + +function shunt_host_pair(id: conn_id): bool + { + PacketFilter::filter_changed = T; + + if ( reached_max_shunts() ) + return F; + + add shunted_host_pairs[id]; + install(); + return T; + } + +function unshunt_host_pair(id: conn_id): bool + { + PacketFilter::filter_changed = T; + + if ( id in shunted_host_pairs ) + { + delete shunted_host_pairs[id]; + return T; + } + else + return F; + } + +function force_unshunt_host_pair(id: conn_id): bool + { + if ( unshunt_host_pair(id) ) + { + install(); + return T; + } + else + return F; + } + +function shunt_conn(id: conn_id): bool + { + if ( is_v6_addr(id$orig_h) ) + { + NOTICE([$note=Cannot_BPF_Shunt_Conn, + $msg="IPv6 connections can't be shunted with BPF due to limitations in BPF", + $sub="ipv6_conn", + $id=id, $identifier=string_cat(id)]); return F; } + if ( reached_max_shunts() ) + return F; + + PacketFilter::filter_changed = T; add shunted_conns[id]; install(); return T; @@ -67,8 +159,8 @@ function shunt_conn(id: conn_id): bool event connection_state_remove(c: connection) &priority=-5 { - # Don't rebuild the filter right away because the packet filter framework will check every few minutes - # and update the filter if things have changed. + # Don't rebuild the filter right away because the packet filter framework + # will check every few minutes and update the filter if things have changed. if ( c$id in shunted_conns ) delete shunted_conns[c$id]; } \ No newline at end of file diff --git a/scripts/base/frameworks/packet-filter/utils.bro b/scripts/base/frameworks/packet-filter/utils.bro index 6ee2993050..242d30e45a 100644 --- a/scripts/base/frameworks/packet-filter/utils.bro +++ b/scripts/base/frameworks/packet-filter/utils.bro @@ -9,6 +9,13 @@ export { ## Returns: A valid BPF filter string for matching the port. global port_to_bpf: function(p: port): string; + ## Create a BPF filter to sample IPv4 and IPv6 traffic. + ## + ## num_parts: The number of parts the traffic should be split into. + ## + ## this_part: The part of the traffic this filter will accept. 0-based. + global sampling_filter: function(num_parts: count, this_part: count): string; + ## Combines two valid BPF filter strings with a string based operator ## to form a new filter. ## @@ -40,4 +47,12 @@ function combine_filters(lfilter: string, op: string, rfilter: string): string return lfilter; else return fmt("(%s) %s (%s)", lfilter, op, rfilter); - } \ No newline at end of file + } + +function sampling_filter(num_parts: count, this_part: count): string + { + local v4_filter = fmt("ip and ((ip[14:2]+ip[18:2]) - (%d*((ip[14:2]+ip[18:2])/%d)) == %d)", num_parts, num_parts, this_part); + # TODO: this is probably a fairly suboptimal filter, but it should work for now. + local v6_filter = fmt("ip6 and ((ip6[22:2]+ip6[38:2]) - (%d*((ip6[22:2]+ip6[38:2])/%d)) == %d)", num_parts, num_parts, this_part); + return combine_filters(v4_filter, "or", v6_filter); + }