mirror of
https://github.com/zeek/zeek.git
synced 2025-10-05 08:08:19 +00:00
Communication fix and extension.
- Removing unnecessary log flushing. Closes #498. - Adding new BiF disconnect() that shuts a connection to a peer down. - terminate_connection() now first flushes any still buffered log messages.
This commit is contained in:
parent
3b7806379c
commit
d289db34db
5 changed files with 36 additions and 2 deletions
|
@ -706,6 +706,21 @@ RemoteSerializer::PeerID RemoteSerializer::Connect(addr_type ip, uint16 port,
|
||||||
return p->id;
|
return p->id;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool RemoteSerializer::CloseConnection(PeerID id)
|
||||||
|
{
|
||||||
|
if ( ! using_communication )
|
||||||
|
return true;
|
||||||
|
|
||||||
|
Peer* peer = LookupPeer(id, true);
|
||||||
|
if ( ! peer )
|
||||||
|
{
|
||||||
|
reporter->Error(fmt("unknown peer id %d for closing connection", int(id)));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return CloseConnection(peer);
|
||||||
|
}
|
||||||
|
|
||||||
bool RemoteSerializer::CloseConnection(Peer* peer)
|
bool RemoteSerializer::CloseConnection(Peer* peer)
|
||||||
{
|
{
|
||||||
if ( peer->suspended_processing )
|
if ( peer->suspended_processing )
|
||||||
|
@ -1288,7 +1303,14 @@ void RemoteSerializer::SendFinalSyncPoint()
|
||||||
|
|
||||||
bool RemoteSerializer::Terminate()
|
bool RemoteSerializer::Terminate()
|
||||||
{
|
{
|
||||||
|
loop_over_list(peers, i)
|
||||||
|
{
|
||||||
|
FlushPrintBuffer(peers[i]);
|
||||||
|
FlushLogBuffer(peers[i]);
|
||||||
|
}
|
||||||
|
|
||||||
Log(LogInfo, fmt("terminating..."));
|
Log(LogInfo, fmt("terminating..."));
|
||||||
|
|
||||||
return terminating = SendToChild(MSG_TERMINATE, 0, 0);
|
return terminating = SendToChild(MSG_TERMINATE, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2552,6 +2574,8 @@ bool RemoteSerializer::SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, st
|
||||||
|
|
||||||
len = fmt.EndWrite(&data);
|
len = fmt.EndWrite(&data);
|
||||||
|
|
||||||
|
assert(len > 10);
|
||||||
|
|
||||||
// Do we have enough space in the buffer? If not, flush first.
|
// Do we have enough space in the buffer? If not, flush first.
|
||||||
if ( len > (LOG_BUFFER_SIZE - peer->log_buffer_used) )
|
if ( len > (LOG_BUFFER_SIZE - peer->log_buffer_used) )
|
||||||
{
|
{
|
||||||
|
@ -2568,7 +2592,6 @@ bool RemoteSerializer::SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, st
|
||||||
peer->log_buffer_used += len;
|
peer->log_buffer_used += len;
|
||||||
assert(peer->log_buffer_used <= LOG_BUFFER_SIZE);
|
assert(peer->log_buffer_used <= LOG_BUFFER_SIZE);
|
||||||
|
|
||||||
FlushLogBuffer(peer); // FIXME: This should go away, but then the unit test fails. See #498.
|
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
error:
|
error:
|
||||||
|
|
|
@ -34,6 +34,9 @@ public:
|
||||||
// Connect to host (returns PEER_NONE on error).
|
// Connect to host (returns PEER_NONE on error).
|
||||||
PeerID Connect(addr_type ip, uint16 port, const char* our_class, double retry, bool use_ssl);
|
PeerID Connect(addr_type ip, uint16 port, const char* our_class, double retry, bool use_ssl);
|
||||||
|
|
||||||
|
// Close connection to host.
|
||||||
|
bool CloseConnection(PeerID peer);
|
||||||
|
|
||||||
// Request all events matching pattern from remote side.
|
// Request all events matching pattern from remote side.
|
||||||
bool RequestEvents(PeerID peer, RE_Matcher* pattern);
|
bool RequestEvents(PeerID peer, RE_Matcher* pattern);
|
||||||
|
|
||||||
|
|
|
@ -2117,6 +2117,12 @@ function connect%(ip: addr, p: port, our_class: string, retry: interval, ssl: bo
|
||||||
TYPE_COUNT);
|
TYPE_COUNT);
|
||||||
%}
|
%}
|
||||||
|
|
||||||
|
function disconnect%(p: event_peer%) : bool
|
||||||
|
%{
|
||||||
|
RemoteSerializer::PeerID id = p->AsRecordVal()->Lookup(0)->AsCount();
|
||||||
|
return new Val(remote_serializer->CloseConnection(id), TYPE_BOOL);
|
||||||
|
%}
|
||||||
|
|
||||||
function request_remote_events%(p: event_peer, handlers: pattern%) : bool
|
function request_remote_events%(p: event_peer, handlers: pattern%) : bool
|
||||||
%{
|
%{
|
||||||
RemoteSerializer::PeerID id = p->AsRecordVal()->Lookup(0)->AsCount();
|
RemoteSerializer::PeerID id = p->AsRecordVal()->Lookup(0)->AsCount();
|
||||||
|
|
|
@ -73,6 +73,7 @@ event remote_connection_handshake_done(p: event_peer)
|
||||||
$vc=vector(10, 20, 30),
|
$vc=vector(10, 20, 30),
|
||||||
$ve=empty_vector
|
$ve=empty_vector
|
||||||
]);
|
]);
|
||||||
|
disconnect(p);
|
||||||
}
|
}
|
||||||
@TEST-END-FILE
|
@TEST-END-FILE
|
||||||
|
|
||||||
|
|
|
@ -51,7 +51,7 @@ event remote_connection_handshake_done(p: event_peer)
|
||||||
{
|
{
|
||||||
Log::add_filter(Test::LOG, [$name="f2", $path="test.failure", $pred=fail]);
|
Log::add_filter(Test::LOG, [$name="f2", $path="test.failure", $pred=fail]);
|
||||||
|
|
||||||
local cid = [$orig_h=1.2.3.4, $orig_p=1234/tcp, $resp_h=2.3.4.5, $resp_p=80/tcp];
|
local cid = [$orig_h=1.2.3.4, $orig_p=1234/tcp, $resp_h=2.3.4.5, $resp_p=80/tcp];
|
||||||
|
|
||||||
local r: Log = [$t=network_time(), $id=cid, $status="success"];
|
local r: Log = [$t=network_time(), $id=cid, $status="success"];
|
||||||
|
|
||||||
|
@ -61,6 +61,7 @@ event remote_connection_handshake_done(p: event_peer)
|
||||||
Log::write(Test::LOG, [$t=network_time(), $id=cid, $status="failure", $country="UK"]);
|
Log::write(Test::LOG, [$t=network_time(), $id=cid, $status="failure", $country="UK"]);
|
||||||
Log::write(Test::LOG, [$t=network_time(), $id=cid, $status="success", $country="BR"]);
|
Log::write(Test::LOG, [$t=network_time(), $id=cid, $status="success", $country="BR"]);
|
||||||
Log::write(Test::LOG, [$t=network_time(), $id=cid, $status="failure", $country="MX"]);
|
Log::write(Test::LOG, [$t=network_time(), $id=cid, $status="failure", $country="MX"]);
|
||||||
|
disconnect(p);
|
||||||
}
|
}
|
||||||
@TEST-END-FILE
|
@TEST-END-FILE
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue