diff --git a/scripts/base/protocols/ftp/main.zeek b/scripts/base/protocols/ftp/main.zeek index 1c2dce17f8..0f21801b69 100644 --- a/scripts/base/protocols/ftp/main.zeek +++ b/scripts/base/protocols/ftp/main.zeek @@ -9,6 +9,7 @@ @load base/utils/paths @load base/utils/numbers @load base/utils/addrs +@load base/frameworks/cluster module FTP; @@ -76,6 +77,19 @@ const directory_cmds = { ["XPWD", 257], }; +@if ( Cluster::is_enabled() ) +function ftp_relay_topic(): string + { + local rval = Cluster::rr_topic(Cluster::proxy_pool, "ftp_transfer_rr_key"); + + if ( rval == "" ) + # No proxy is alive, so relay via manager instead. + return Cluster::manager_topic; + + return rval; + } +@endif + function parse_ftp_reply_code(code: count): ReplyCode { local a: ReplyCode; @@ -137,6 +151,29 @@ function ftp_message(s: Info) delete s$data_channel; } +event sync_add_expected_data(s: Info, chan: ExpectedDataChannel) + { +@if ( Cluster::local_node_type() == Cluster::PROXY || + Cluster::local_node_type() == Cluster::MANAGER ) + Broker::publish(Cluster::worker_topic, sync_add_expected_data, s, chan); +@else + ftp_data_expected[chan$resp_h, chan$resp_p] = s; + Analyzer::schedule_analyzer(chan$orig_h, chan$resp_h, chan$resp_p, + Analyzer::ANALYZER_FTP_DATA, + 5mins); +@endif + } + +event sync_remove_expected_data(resp_h: addr, resp_p: port) + { +@if ( Cluster::local_node_type() == Cluster::PROXY || + Cluster::local_node_type() == Cluster::MANAGER ) + Broker::publish(Cluster::worker_topic, sync_remove_expected_data, resp_h, resp_p); +@else + delete ftp_data_expected[resp_h, resp_p]; +@endif + } + function add_expected_data_channel(s: Info, chan: ExpectedDataChannel) { s$passive = chan$passive; @@ -145,6 +182,9 @@ function add_expected_data_channel(s: Info, chan: ExpectedDataChannel) Analyzer::schedule_analyzer(chan$orig_h, chan$resp_h, chan$resp_p, Analyzer::ANALYZER_FTP_DATA, 5mins); +@if ( Cluster::is_enabled() ) + Broker::publish(ftp_relay_topic(), sync_add_expected_data, s, chan); +@endif } event ftp_request(c: connection, command: string, arg: string) &priority=5 @@ -179,6 +219,7 @@ event ftp_request(c: connection, command: string, arg: string) &priority=5 if ( data$valid ) { + # print fmt("ADD(%d) [%s %s] from %s", getpid(), data$h, data$p, c$id); add_expected_data_channel(c$ftp, [$passive=F, $orig_h=id$resp_h, $resp_h=data$h, $resp_p=data$p]); } @@ -229,6 +270,7 @@ event ftp_reply(c: connection, code: count, msg: string, cont_resp: bool) &prior if ( code == 229 && data$h == [::] ) data$h = c$id$resp_h; + # print fmt("ADD(%d) [%s %s] from %s", getpid(), data$h, data$p, c$id); add_expected_data_channel(c$ftp, [$passive=T, $orig_h=c$id$orig_h, $resp_h=data$h, $resp_p=data$p]); } @@ -264,7 +306,10 @@ event scheduled_analyzer_applied(c: connection, a: Analyzer::Tag) &priority=10 { local id = c$id; if ( [id$resp_h, id$resp_p] in ftp_data_expected ) + { + # print fmt("FOUND(%d) [%s %s] from %s", getpid(), id$resp_h, id$resp_p, c$id); add c$service["ftp-data"]; + } } event file_transferred(c: connection, prefix: string, descr: string, @@ -273,6 +318,7 @@ event file_transferred(c: connection, prefix: string, descr: string, local id = c$id; if ( [id$resp_h, id$resp_p] in ftp_data_expected ) { + # print fmt("FXFER(%d) [%s %s] from %s", getpid(), id$resp_h, id$resp_p, c$id); local s = ftp_data_expected[id$resp_h, id$resp_p]; s$mime_type = split_string1(mime_type, /;/)[0]; } @@ -287,7 +333,14 @@ event connection_reused(c: connection) &priority=5 event connection_state_remove(c: connection) &priority=-5 { if ( c$ftp_data_reuse ) return; - delete ftp_data_expected[c$id$resp_h, c$id$resp_p]; + if ( [c$id$resp_h, c$id$resp_p] in ftp_data_expected ) + { + # print fmt("CREMOVE(%d) [%s %s] from %s", getpid(), c$id$resp_h, c$id$resp_p, c$id); + delete ftp_data_expected[c$id$resp_h, c$id$resp_p]; +@if ( Cluster::is_enabled() ) + Broker::publish(ftp_relay_topic(), sync_remove_expected_data, c$id$resp_h, c$id$resp_p); +@endif + } } # Use state remove event to cover connections terminated by RST.