mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Publish ftp_data_expected updates to other workers for synchronization
This commit is contained in:
parent
90771d4aba
commit
d698bddc7d
1 changed files with 54 additions and 1 deletions
|
@ -9,6 +9,7 @@
|
||||||
@load base/utils/paths
|
@load base/utils/paths
|
||||||
@load base/utils/numbers
|
@load base/utils/numbers
|
||||||
@load base/utils/addrs
|
@load base/utils/addrs
|
||||||
|
@load base/frameworks/cluster
|
||||||
|
|
||||||
module FTP;
|
module FTP;
|
||||||
|
|
||||||
|
@ -76,6 +77,19 @@ const directory_cmds = {
|
||||||
["XPWD", 257],
|
["XPWD", 257],
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@if ( Cluster::is_enabled() )
|
||||||
|
function ftp_relay_topic(): string
|
||||||
|
{
|
||||||
|
local rval = Cluster::rr_topic(Cluster::proxy_pool, "ftp_transfer_rr_key");
|
||||||
|
|
||||||
|
if ( rval == "" )
|
||||||
|
# No proxy is alive, so relay via manager instead.
|
||||||
|
return Cluster::manager_topic;
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
@endif
|
||||||
|
|
||||||
function parse_ftp_reply_code(code: count): ReplyCode
|
function parse_ftp_reply_code(code: count): ReplyCode
|
||||||
{
|
{
|
||||||
local a: ReplyCode;
|
local a: ReplyCode;
|
||||||
|
@ -137,6 +151,29 @@ function ftp_message(s: Info)
|
||||||
delete s$data_channel;
|
delete s$data_channel;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
event sync_add_expected_data(s: Info, chan: ExpectedDataChannel)
|
||||||
|
{
|
||||||
|
@if ( Cluster::local_node_type() == Cluster::PROXY ||
|
||||||
|
Cluster::local_node_type() == Cluster::MANAGER )
|
||||||
|
Broker::publish(Cluster::worker_topic, sync_add_expected_data, s, chan);
|
||||||
|
@else
|
||||||
|
ftp_data_expected[chan$resp_h, chan$resp_p] = s;
|
||||||
|
Analyzer::schedule_analyzer(chan$orig_h, chan$resp_h, chan$resp_p,
|
||||||
|
Analyzer::ANALYZER_FTP_DATA,
|
||||||
|
5mins);
|
||||||
|
@endif
|
||||||
|
}
|
||||||
|
|
||||||
|
event sync_remove_expected_data(resp_h: addr, resp_p: port)
|
||||||
|
{
|
||||||
|
@if ( Cluster::local_node_type() == Cluster::PROXY ||
|
||||||
|
Cluster::local_node_type() == Cluster::MANAGER )
|
||||||
|
Broker::publish(Cluster::worker_topic, sync_remove_expected_data, resp_h, resp_p);
|
||||||
|
@else
|
||||||
|
delete ftp_data_expected[resp_h, resp_p];
|
||||||
|
@endif
|
||||||
|
}
|
||||||
|
|
||||||
function add_expected_data_channel(s: Info, chan: ExpectedDataChannel)
|
function add_expected_data_channel(s: Info, chan: ExpectedDataChannel)
|
||||||
{
|
{
|
||||||
s$passive = chan$passive;
|
s$passive = chan$passive;
|
||||||
|
@ -145,6 +182,9 @@ function add_expected_data_channel(s: Info, chan: ExpectedDataChannel)
|
||||||
Analyzer::schedule_analyzer(chan$orig_h, chan$resp_h, chan$resp_p,
|
Analyzer::schedule_analyzer(chan$orig_h, chan$resp_h, chan$resp_p,
|
||||||
Analyzer::ANALYZER_FTP_DATA,
|
Analyzer::ANALYZER_FTP_DATA,
|
||||||
5mins);
|
5mins);
|
||||||
|
@if ( Cluster::is_enabled() )
|
||||||
|
Broker::publish(ftp_relay_topic(), sync_add_expected_data, s, chan);
|
||||||
|
@endif
|
||||||
}
|
}
|
||||||
|
|
||||||
event ftp_request(c: connection, command: string, arg: string) &priority=5
|
event ftp_request(c: connection, command: string, arg: string) &priority=5
|
||||||
|
@ -179,6 +219,7 @@ event ftp_request(c: connection, command: string, arg: string) &priority=5
|
||||||
|
|
||||||
if ( data$valid )
|
if ( data$valid )
|
||||||
{
|
{
|
||||||
|
# print fmt("ADD(%d) [%s %s] from %s", getpid(), data$h, data$p, c$id);
|
||||||
add_expected_data_channel(c$ftp, [$passive=F, $orig_h=id$resp_h,
|
add_expected_data_channel(c$ftp, [$passive=F, $orig_h=id$resp_h,
|
||||||
$resp_h=data$h, $resp_p=data$p]);
|
$resp_h=data$h, $resp_p=data$p]);
|
||||||
}
|
}
|
||||||
|
@ -229,6 +270,7 @@ event ftp_reply(c: connection, code: count, msg: string, cont_resp: bool) &prior
|
||||||
if ( code == 229 && data$h == [::] )
|
if ( code == 229 && data$h == [::] )
|
||||||
data$h = c$id$resp_h;
|
data$h = c$id$resp_h;
|
||||||
|
|
||||||
|
# print fmt("ADD(%d) [%s %s] from %s", getpid(), data$h, data$p, c$id);
|
||||||
add_expected_data_channel(c$ftp, [$passive=T, $orig_h=c$id$orig_h,
|
add_expected_data_channel(c$ftp, [$passive=T, $orig_h=c$id$orig_h,
|
||||||
$resp_h=data$h, $resp_p=data$p]);
|
$resp_h=data$h, $resp_p=data$p]);
|
||||||
}
|
}
|
||||||
|
@ -264,8 +306,11 @@ event scheduled_analyzer_applied(c: connection, a: Analyzer::Tag) &priority=10
|
||||||
{
|
{
|
||||||
local id = c$id;
|
local id = c$id;
|
||||||
if ( [id$resp_h, id$resp_p] in ftp_data_expected )
|
if ( [id$resp_h, id$resp_p] in ftp_data_expected )
|
||||||
|
{
|
||||||
|
# print fmt("FOUND(%d) [%s %s] from %s", getpid(), id$resp_h, id$resp_p, c$id);
|
||||||
add c$service["ftp-data"];
|
add c$service["ftp-data"];
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
event file_transferred(c: connection, prefix: string, descr: string,
|
event file_transferred(c: connection, prefix: string, descr: string,
|
||||||
mime_type: string) &priority=5
|
mime_type: string) &priority=5
|
||||||
|
@ -273,6 +318,7 @@ event file_transferred(c: connection, prefix: string, descr: string,
|
||||||
local id = c$id;
|
local id = c$id;
|
||||||
if ( [id$resp_h, id$resp_p] in ftp_data_expected )
|
if ( [id$resp_h, id$resp_p] in ftp_data_expected )
|
||||||
{
|
{
|
||||||
|
# print fmt("FXFER(%d) [%s %s] from %s", getpid(), id$resp_h, id$resp_p, c$id);
|
||||||
local s = ftp_data_expected[id$resp_h, id$resp_p];
|
local s = ftp_data_expected[id$resp_h, id$resp_p];
|
||||||
s$mime_type = split_string1(mime_type, /;/)[0];
|
s$mime_type = split_string1(mime_type, /;/)[0];
|
||||||
}
|
}
|
||||||
|
@ -287,7 +333,14 @@ event connection_reused(c: connection) &priority=5
|
||||||
event connection_state_remove(c: connection) &priority=-5
|
event connection_state_remove(c: connection) &priority=-5
|
||||||
{
|
{
|
||||||
if ( c$ftp_data_reuse ) return;
|
if ( c$ftp_data_reuse ) return;
|
||||||
|
if ( [c$id$resp_h, c$id$resp_p] in ftp_data_expected )
|
||||||
|
{
|
||||||
|
# print fmt("CREMOVE(%d) [%s %s] from %s", getpid(), c$id$resp_h, c$id$resp_p, c$id);
|
||||||
delete ftp_data_expected[c$id$resp_h, c$id$resp_p];
|
delete ftp_data_expected[c$id$resp_h, c$id$resp_p];
|
||||||
|
@if ( Cluster::is_enabled() )
|
||||||
|
Broker::publish(ftp_relay_topic(), sync_remove_expected_data, c$id$resp_h, c$id$resp_p);
|
||||||
|
@endif
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# Use state remove event to cover connections terminated by RST.
|
# Use state remove event to cover connections terminated by RST.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue