Store a single map of Sessions instead of split maps of Connections.

This commit also includes:
- Storing the transport protocol in ConnID and ConnIDKey to allow tcp and
  udp connections from the same IP/Port combinations. This happens in the
  core.cisco-fabric-path test, for example.
- Lots of test updates. The reasons for these are two fold. First, with
  the change to only store a single map means that TCP, UDP, and ICMP
  connections are now mixed. When Zeek drains the map at shutdown, it drains
  each of those protocols together instead of separately. The second is
  because of how Sessions are stored in the map. We're now storing them
  keyed by the hash of the key stored by the Session objects, which causes
  them to again be in the map in a different order.
This commit is contained in:
Tim Wojtulewicz 2021-04-08 15:46:49 -07:00
parent 008e2cbaef
commit f7e3556a67
22 changed files with 263 additions and 367 deletions

View file

@ -61,6 +61,7 @@ struct ConnID {
uint32_t src_port;
uint32_t dst_port;
bool is_one_way; // if true, don't canonicalize order
TransportProto proto;
};
static inline int addr_port_canon_lt(const IPAddr& addr1, uint32_t p1,

View file

@ -42,6 +42,8 @@ detail::ConnIDKey detail::BuildConnIDKey(const ConnID& id)
key.port2 = id.src_port;
}
key.transport = id.proto;
return key;
}

View file

@ -26,8 +26,9 @@ struct ConnIDKey {
in6_addr ip2;
uint16_t port1;
uint16_t port2;
TransportProto transport;
ConnIDKey() : port1(0), port2(0)
ConnIDKey() : port1(0), port2(0), transport(TRANSPORT_UNKNOWN)
{
memset(&ip1, 0, sizeof(in6_addr));
memset(&ip2, 0, sizeof(in6_addr));

View file

@ -32,7 +32,6 @@
#include "zeek/analyzer/protocol/stepping-stone/events.bif.h"
zeek::NetSessions* zeek::sessions;
zeek::NetSessions*& sessions = zeek::sessions;
namespace zeek {
@ -45,14 +44,7 @@ NetSessions::NetSessions()
NetSessions::~NetSessions()
{
for ( const auto& entry : tcp_conns )
Unref(entry.second);
for ( const auto& entry : udp_conns )
Unref(entry.second);
for ( const auto& entry : icmp_conns )
Unref(entry.second);
detail::fragment_mgr->Clear();
Clear();
}
void NetSessions::Done()
@ -84,7 +76,6 @@ void NetSessions::ProcessTransportLayer(double t, const Packet* pkt, size_t rema
ConnID id;
id.src_addr = ip_hdr->SrcAddr();
id.dst_addr = ip_hdr->DstAddr();
ConnectionMap* d = nullptr;
BifEnum::Tunnel::Type tunnel_type = BifEnum::Tunnel::IP;
switch ( proto ) {
@ -94,7 +85,7 @@ void NetSessions::ProcessTransportLayer(double t, const Packet* pkt, size_t rema
id.src_port = tp->th_sport;
id.dst_port = tp->th_dport;
id.is_one_way = false;
d = &tcp_conns;
id.proto = TRANSPORT_TCP;
break;
}
@ -104,7 +95,7 @@ void NetSessions::ProcessTransportLayer(double t, const Packet* pkt, size_t rema
id.src_port = up->uh_sport;
id.dst_port = up->uh_dport;
id.is_one_way = false;
d = &udp_conns;
id.proto = TRANSPORT_UDP;
break;
}
@ -116,11 +107,9 @@ void NetSessions::ProcessTransportLayer(double t, const Packet* pkt, size_t rema
id.dst_port = analyzer::icmp::ICMP4_counterpart(icmpp->icmp_type,
icmpp->icmp_code,
id.is_one_way);
id.src_port = htons(id.src_port);
id.dst_port = htons(id.dst_port);
d = &icmp_conns;
id.proto = TRANSPORT_ICMP;
break;
}
@ -132,11 +121,9 @@ void NetSessions::ProcessTransportLayer(double t, const Packet* pkt, size_t rema
id.dst_port = analyzer::icmp::ICMP6_counterpart(icmpp->icmp_type,
icmpp->icmp_code,
id.is_one_way);
id.src_port = htons(id.src_port);
id.dst_port = htons(id.dst_port);
d = &icmp_conns;
id.proto = TRANSPORT_ICMP;
break;
}
@ -145,20 +132,21 @@ void NetSessions::ProcessTransportLayer(double t, const Packet* pkt, size_t rema
return;
}
detail::ConnIDKey key = detail::BuildConnIDKey(id);
detail::ConnIDKey conn_key = detail::BuildConnIDKey(id);
detail::SessionKey key(&conn_key, sizeof(conn_key), false);
Connection* conn = nullptr;
// FIXME: The following is getting pretty complex. Need to split up
// into separate functions.
auto it = d->find(key);
if ( it != d->end() )
conn = it->second;
auto it = session_map.find(key);
if (it != session_map.end() )
conn = static_cast<Connection*>(it->second);
if ( ! conn )
{
conn = NewConn(key, t, &id, data, proto, ip_hdr->FlowLabel(), pkt);
conn = NewConn(conn_key, t, &id, data, proto, ip_hdr->FlowLabel(), pkt);
if ( conn )
InsertConnection(d, key, conn);
InsertSession(std::move(key), conn);
}
else
{
@ -168,9 +156,9 @@ void NetSessions::ProcessTransportLayer(double t, const Packet* pkt, size_t rema
conn->Event(connection_reused, nullptr);
Remove(conn);
conn = NewConn(key, t, &id, data, proto, ip_hdr->FlowLabel(), pkt);
conn = NewConn(conn_key, t, &id, data, proto, ip_hdr->FlowLabel(), pkt);
if ( conn )
InsertConnection(d, key, conn);
InsertSession(std::move(key), conn);
}
else
{
@ -341,28 +329,15 @@ Connection* NetSessions::FindConnection(Val* v)
id.dst_port = htons((unsigned short) resp_portv->Port());
id.is_one_way = false; // ### incorrect for ICMP connections
id.proto = orig_portv->PortType();
detail::ConnIDKey key = detail::BuildConnIDKey(id);
ConnectionMap* d;
if ( orig_portv->IsTCP() )
d = &tcp_conns;
else if ( orig_portv->IsUDP() )
d = &udp_conns;
else if ( orig_portv->IsICMP() )
d = &icmp_conns;
else
{
// This can happen due to pseudo-connections we
// construct, for example for packet headers embedded
// in ICMPs.
return nullptr;
}
detail::ConnIDKey conn_key = detail::BuildConnIDKey(id);
detail::SessionKey key(&conn_key, sizeof(conn_key), false);
Connection* conn = nullptr;
auto it = d->find(key);
if ( it != d->end() )
conn = it->second;
auto it = session_map.find(key);
if ( it != session_map.end() )
conn = static_cast<Connection*>(it->second);
return conn;
}
@ -371,85 +346,53 @@ void NetSessions::Remove(Session* s)
{
Connection* c = static_cast<Connection*>(s);
if ( c->IsKeyValid() )
if ( s->IsKeyValid() )
{
const detail::ConnIDKey& key = c->Key();
c->CancelTimers();
s->CancelTimers();
s->Done();
s->RemovalEvent();
c->Done();
c->RemovalEvent();
// Clears out the session's copy of the key so that if the
// session has been Ref()'d somewhere, we know that on a future
// call to Remove() that it's no longer in the map.
detail::SessionKey key = s->SessionKey(false);
// Zero out c's copy of the key, so that if c has been Ref()'d
// up, we know on a future call to Remove() that it's no
// longer in the dictionary.
c->ClearKey();
switch ( c->ConnTransport() ) {
case TRANSPORT_TCP:
if ( tcp_conns.erase(key) == 0 )
reporter->InternalWarning("connection missing");
else
if ( session_map.erase(key) == 0 )
reporter->InternalWarning("connection missing");
else
switch ( c->ConnTransport() ) {
case TRANSPORT_TCP:
stats.GetOrAdd({{"tcp", "num_conns"}}).Dec();
break;
case TRANSPORT_UDP:
if ( udp_conns.erase(key) == 0 )
reporter->InternalWarning("connection missing");
else
break;
case TRANSPORT_UDP:
stats.GetOrAdd({{"udp", "num_conns"}}).Dec();
break;
case TRANSPORT_ICMP:
if ( icmp_conns.erase(key) == 0 )
reporter->InternalWarning("connection missing");
else
break;
case TRANSPORT_ICMP:
stats.GetOrAdd({{"icmp", "num_conns"}}).Dec();
break;
break;
case TRANSPORT_UNKNOWN: break;
}
case TRANSPORT_UNKNOWN:
reporter->InternalWarning("unknown transport when removing connection");
break;
}
Unref(c);
s->ClearKey();
Unref(s);
}
}
void NetSessions::Insert(Connection* c)
void NetSessions::Insert(Session* s)
{
assert(c->IsKeyValid());
assert(s->IsKeyValid());
Connection* old = nullptr;
Session* old = nullptr;
detail::SessionKey key = s->SessionKey(true);
switch ( c->ConnTransport() ) {
// Remove first. Otherwise the map would still reference the old key for
// already existing connections.
auto it = session_map.find(key);
if ( it != session_map.end() )
old = it->second;
case TRANSPORT_TCP:
old = LookupConn(tcp_conns, c->Key());
tcp_conns.erase(c->Key());
InsertConnection(&tcp_conns, c->Key(), c);
break;
session_map.erase(key);
InsertSession(std::move(key), s);
case TRANSPORT_UDP:
old = LookupConn(udp_conns, c->Key());
udp_conns.erase(c->Key());
InsertConnection(&udp_conns, c->Key(), c);
break;
case TRANSPORT_ICMP:
old = LookupConn(icmp_conns, c->Key());
icmp_conns.erase(c->Key());
InsertConnection(&icmp_conns, c->Key(), c);
break;
default:
reporter->InternalWarning("unknown connection type");
Unref(c);
return;
}
if ( old && old != c )
if ( old && old != s )
{
// Some clean-ups similar to those in Remove() (but invisible
// to the script layer).
@ -461,40 +404,20 @@ void NetSessions::Insert(Connection* c)
void NetSessions::Drain()
{
for ( const auto& entry : tcp_conns )
for ( const auto& entry : session_map )
{
Connection* tc = entry.second;
Session* tc = entry.second;
tc->Done();
tc->RemovalEvent();
}
for ( const auto& entry : udp_conns )
{
Connection* uc = entry.second;
uc->Done();
uc->RemovalEvent();
}
for ( const auto& entry : icmp_conns )
{
Connection* ic = entry.second;
ic->Done();
ic->RemovalEvent();
}
}
void NetSessions::Clear()
{
for ( const auto& entry : tcp_conns )
Unref(entry.second);
for ( const auto& entry : udp_conns )
Unref(entry.second);
for ( const auto& entry : icmp_conns )
for ( const auto& entry : session_map )
Unref(entry.second);
tcp_conns.clear();
udp_conns.clear();
icmp_conns.clear();
session_map.clear();
detail::fragment_mgr->Clear();
}
@ -578,15 +501,6 @@ Connection* NetSessions::NewConn(const detail::ConnIDKey& k, double t, const Con
return conn;
}
Connection* NetSessions::LookupConn(const ConnectionMap& conns, const detail::ConnIDKey& key)
{
auto it = conns.find(key);
if ( it != conns.end() )
return it->second;
return nullptr;
}
bool NetSessions::IsLikelyServerPort(uint32_t port, TransportProto proto) const
{
// We keep a cached in-core version of the table to speed up the lookup.
@ -695,13 +609,7 @@ unsigned int NetSessions::ConnectionMemoryUsage()
// Connections have been flushed already.
return 0;
for ( const auto& entry : tcp_conns )
mem += entry.second->MemoryAllocation();
for ( const auto& entry : udp_conns )
mem += entry.second->MemoryAllocation();
for ( const auto& entry : icmp_conns )
for ( const auto& entry : session_map )
mem += entry.second->MemoryAllocation();
return mem;
@ -715,13 +623,7 @@ unsigned int NetSessions::ConnectionMemoryUsageConnVals()
// Connections have been flushed already.
return 0;
for ( const auto& entry : tcp_conns )
mem += entry.second->MemoryAllocationConnVal();
for ( const auto& entry : udp_conns )
mem += entry.second->MemoryAllocationConnVal();
for ( const auto& entry : icmp_conns )
for ( const auto& entry : session_map )
mem += entry.second->MemoryAllocationConnVal();
return mem;
@ -735,48 +637,42 @@ unsigned int NetSessions::MemoryAllocation()
return ConnectionMemoryUsage()
+ padded_sizeof(*this)
+ (tcp_conns.size() * (sizeof(ConnectionMap::key_type) + sizeof(ConnectionMap::value_type)))
+ (udp_conns.size() * (sizeof(ConnectionMap::key_type) + sizeof(ConnectionMap::value_type)))
+ (icmp_conns.size() * (sizeof(ConnectionMap::key_type) + sizeof(ConnectionMap::value_type)))
+ (session_map.size() * (sizeof(SessionMap::key_type) + sizeof(SessionMap::value_type)))
+ detail::fragment_mgr->MemoryAllocation();
// FIXME: MemoryAllocation() not implemented for rest.
;
}
void NetSessions::InsertConnection(ConnectionMap* m, const detail::ConnIDKey& key, Connection* conn)
void NetSessions::InsertSession(detail::SessionKey key, Session* session)
{
(*m)[key] = conn;
key.CopyData();
session_map.insert_or_assign(std::move(key), session);
switch ( conn->ConnTransport() )
std::string protocol;
switch ( static_cast<Connection*>(session)->ConnTransport() )
{
case TRANSPORT_TCP:
{
stats.GetOrAdd({{"tcp", "num_conns"}}).Inc();
stats.GetOrAdd({{"tcp", "cumulative_conns"}}).Inc();
auto max = stats.GetOrAdd({{"tcp", "max_conns"}});
if ( m->size() > max.Value() )
max.Inc();
protocol = "tcp";
break;
}
case TRANSPORT_UDP:
{
stats.GetOrAdd({{"udp", "num_conns"}}).Inc();
stats.GetOrAdd({{"udp", "cumulative_conns"}}).Inc();
auto max = stats.GetOrAdd({{"udp", "max_conns"}});
if ( m->size() > max.Value() )
max.Inc();
protocol = "udp";
break;
}
case TRANSPORT_ICMP:
{
stats.GetOrAdd({{"icmp", "num_conns"}}).Inc();
stats.GetOrAdd({{"icmp", "cumulative_conns"}}).Inc();
auto max = stats.GetOrAdd({{"icmp", "max_conns"}});
if ( m->size() > max.Value() )
max.Inc();
protocol = "icmp";
break;
}
default: break;
default:
break;
}
if ( ! protocol.empty() )
{
auto max = stats.GetOrAdd({{protocol, "max_conns"}});
auto num = stats.GetOrAdd({{protocol, "num_conns"}});
num.Inc();
stats.GetOrAdd({{protocol, "cumulative_conns"}}).Inc();
if ( num.Value() > max.Value() )
max.Inc();
}
}

View file

@ -10,6 +10,8 @@
#include "zeek/NetVar.h"
#include "zeek/analyzer/protocol/tcp/Stats.h"
#include "zeek/telemetry/Gauge.h"
#include "zeek/Hash.h"
#include "zeek/Session.h"
namespace zeek {
@ -18,7 +20,6 @@ namespace detail { class PacketFilter; }
class EncapsulationStack;
class Packet;
class Connection;
class Session;
struct ConnID;
struct SessionStats {
@ -39,7 +40,7 @@ struct SessionStats {
uint64_t num_packets;
};
class NetSessions {
class NetSessions final {
public:
NetSessions();
~NetSessions();
@ -61,7 +62,7 @@ public:
Connection* FindConnection(const detail::ConnIDKey& key, TransportProto proto);
void Remove(Session* s);
void Insert(Connection* c);
void Insert(Session* c);
// Generating connection_pending events for all connections
// that are still active.
@ -82,7 +83,7 @@ public:
unsigned int CurrentConnections()
{
return tcp_conns.size() + udp_conns.size() + icmp_conns.size();
return session_map.size();
}
/**
@ -130,16 +131,13 @@ public:
analyzer::tcp::TCPStateStats tcp_stats; // keeps statistics on TCP states
protected:
friend class ConnCompressor;
using ConnectionMap = std::map<detail::ConnIDKey, Connection*>;
using SessionMap = std::map<detail::SessionKey, Session*>;
Connection* NewConn(const detail::ConnIDKey& k, double t, const ConnID* id,
const u_char* data, int proto, uint32_t flow_label,
const Packet* pkt);
Connection* LookupConn(const ConnectionMap& conns, const detail::ConnIDKey& key);
// Returns true if the port corresonds to an application
// for which there's a Bro analyzer (even if it might not
// be used by the present policy script), or it's more
@ -167,12 +165,9 @@ protected:
// the new one. Connection count stats get updated either way (so most
// cases should likely check that the key is not already in the map to
// avoid unnecessary incrementing of connecting counts).
void InsertConnection(ConnectionMap* m, const detail::ConnIDKey& key, Connection* conn);
ConnectionMap tcp_conns;
ConnectionMap udp_conns;
ConnectionMap icmp_conns;
void InsertSession(detail::SessionKey key, Session* session);
SessionMap session_map;
telemetry::IntGaugeFamily stats;
};

View file

@ -25,9 +25,10 @@ static zeek::Connection* add_connection()
conn_id.src_port = htons(23132);
conn_id.dst_port = htons(80);
conn_id.is_one_way = false;
conn_id.proto = TRANSPORT_TCP;
zeek::detail::ConnIDKey key = zeek::detail::BuildConnIDKey(conn_id);
zeek::Connection* conn = new zeek::Connection(zeek::sessions, key, network_time_start,
&conn_id, 1, &p);
&conn_id, 1, &p);
conn->SetTransport(TRANSPORT_TCP);
zeek::sessions->Insert(conn);
return conn;