Remove concept of multiple timer managers

- All timers are now handled by a single global timer manager, which simplifies how they handled by the IOSource manager.
- This change flows down a number of changes to other parts of the code. The timer manager tag field is removed, which means that matching connections to a timer manager is also removed. This removes the ability to tag a connection as internal or external, since that's how the connections where differentiated. This in turn removes the `current_conns_extern` field from the `ConnStats` record type in the script layer.
This commit is contained in:
Tim Wojtulewicz 2019-12-02 17:10:57 -07:00
parent 2dcc936787
commit be42608b51
15 changed files with 28 additions and 284 deletions

View file

@ -548,7 +548,6 @@ type NetStats: record {
type ConnStats: record {
total_conns: count; ##<
current_conns: count; ##<
current_conns_extern: count; ##<
sess_current_conns: count; ##<
num_packets: count;

View file

@ -29,8 +29,6 @@ event rexmit_inconsistency(c: connection, t1: string, t2: string, tcp_flags: str
event content_gap(c: connection, is_orig: bool, seq: count, length: count)
{
NOTICE([$note=Content_Gap, $conn=c,
$msg=fmt("%s content gap (%s %d/%d)%s",
id_string(c$id), is_orig ? ">" : "<", seq, length,
is_external_connection(c) ? " [external]" : "")]);
$msg=fmt("%s content gap (%s %d/%d)",
id_string(c$id), is_orig ? ">" : "<", seq, length)]);
}

View file

@ -53,7 +53,6 @@ void ConnectionTimer::Dispatch(double t, int is_expire)
uint64_t Connection::total_connections = 0;
uint64_t Connection::current_connections = 0;
uint64_t Connection::external_connections = 0;
Connection::Connection(NetSessions* s, const ConnIDKey& k, double t, const ConnID* id,
uint32_t flow, const Packet* pkt,
@ -115,23 +114,10 @@ Connection::Connection(NetSessions* s, const ConnIDKey& k, double t, const ConnI
++current_connections;
++total_connections;
std::string* tag = current_iosrc->GetCurrentTag();
conn_timer_mgr = tag ? new std::string(*tag) : nullptr;
if ( arg_encap )
encapsulation = new EncapsulationStack(*arg_encap);
else
encapsulation = 0;
if ( conn_timer_mgr )
{
++external_connections;
// We schedule a timer which removes this connection from memory
// indefinitively into the future. Ii will expire when the timer
// mgr is drained but not before.
ADD_TIMER(&Connection::RemoveConnectionTimer, 1e20, 1,
TIMER_REMOVE_CONNECTION);
}
}
Connection::~Connection()
@ -148,12 +134,9 @@ Connection::~Connection()
}
delete root_analyzer;
delete conn_timer_mgr;
delete encapsulation;
--current_connections;
if ( conn_timer_mgr )
--external_connections;
}
void Connection::CheckEncapsulation(const EncapsulationStack* arg_encap)
@ -512,14 +495,14 @@ void Connection::ConnectionEvent(EventHandlerPtr f, analyzer::Analyzer* a, val_l
// "this" is passed as a cookie for the event
mgr.QueueEvent(f, std::move(vl), SOURCE_LOCAL,
a ? a->GetID() : 0, GetTimerMgr(), this);
a ? a->GetID() : 0, timer_mgr, this);
}
void Connection::ConnectionEventFast(EventHandlerPtr f, analyzer::Analyzer* a, val_list vl)
{
// "this" is passed as a cookie for the event
mgr.QueueEventFast(f, std::move(vl), SOURCE_LOCAL,
a ? a->GetID() : 0, GetTimerMgr(), this);
a ? a->GetID() : 0, timer_mgr, this);
}
void Connection::ConnectionEvent(EventHandlerPtr f, analyzer::Analyzer* a, val_list* vl)
@ -547,7 +530,7 @@ void Connection::AddTimer(timer_func timer, double t, bool do_expire,
return;
Timer* conn_timer = new ConnectionTimer(this, timer, t, do_expire, type);
GetTimerMgr()->Add(conn_timer);
timer_mgr->Add(conn_timer);
timers.push_back(conn_timer);
}
@ -566,25 +549,12 @@ void Connection::CancelTimers()
std::copy(timers.begin(), timers.end(), std::back_inserter(tmp));
for ( const auto& timer : tmp )
GetTimerMgr()->Cancel(timer);
timer_mgr->Cancel(timer);
timers_canceled = 1;
timers.clear();
}
TimerMgr* Connection::GetTimerMgr() const
{
if ( ! conn_timer_mgr )
// Global manager.
return timer_mgr;
// We need to check whether the local timer manager still exists;
// it may have already been timed out, in which case we fall back
// to the global manager (though this should be rare).
TimerMgr* local_mgr = sessions->LookupTimerMgr(conn_timer_mgr, false);
return local_mgr ? local_mgr : timer_mgr;
}
void Connection::FlipRoles()
{
IPAddr tmp_addr = resp_addr;

View file

@ -228,11 +228,6 @@ public:
void Describe(ODesc* d) const override;
void IDString(ODesc* d) const;
TimerMgr* GetTimerMgr() const;
// Returns true if connection has been received externally.
bool IsExternal() const { return conn_timer_mgr != 0; }
// Statistics.
// Just a lower bound.
@ -243,8 +238,6 @@ public:
{ return total_connections; }
static uint64_t CurrentConnections()
{ return current_connections; }
static uint64_t CurrentExternalConnections()
{ return external_connections; }
// Returns true if the history was already seen, false otherwise.
int CheckHistory(uint32_t mask, char code)
@ -320,8 +313,6 @@ protected:
ConnIDKey key;
bool key_valid;
// Timer manager to use for this conn (or nil).
std::string* conn_timer_mgr;
timer_list timers;
IPAddr orig_addr;
@ -353,7 +344,6 @@ protected:
// Count number of connections.
static uint64_t total_connections;
static uint64_t current_connections;
static uint64_t external_connections;
string history;
uint32_t hist_seen;

View file

@ -213,12 +213,9 @@ void net_init(const std::string& interface,
void expire_timers(iosource::PktSrc* src_ps)
{
SegmentProfiler(segment_logger, "expiring-timers");
TimerMgr* tmgr =
src_ps ? sessions->LookupTimerMgr(src_ps->GetCurrentTag())
: timer_mgr;
current_dispatched +=
tmgr->Advance(network_time,
timer_mgr->Advance(network_time,
max_timer_expires - current_dispatched);
}
@ -227,10 +224,8 @@ void net_packet_dispatch(double t, const Packet* pkt, iosource::PktSrc* src_ps)
if ( ! bro_start_network_time )
bro_start_network_time = t;
TimerMgr* tmgr = sessions->LookupTimerMgr(src_ps->GetCurrentTag());
// network_time never goes back.
net_update_time(tmgr->Time() < t ? t : tmgr->Time());
net_update_time(timer_mgr->Time() < t ? t : timer_mgr->Time());
current_pktsrc = src_ps;
current_iosrc = src_ps;

View file

@ -42,32 +42,6 @@ enum NetBIOS_Service {
NetSessions* sessions;
void TimerMgrExpireTimer::Dispatch(double t, int is_expire)
{
if ( mgr->LastAdvance() + timer_mgr_inactivity_timeout < timer_mgr->Time() )
{
// Expired.
DBG_LOG(DBG_TM, "TimeMgr %p has timed out", mgr);
mgr->Expire();
// Make sure events are executed. They depend on the TimerMgr.
::mgr.Drain();
sessions->timer_mgrs.erase(mgr->GetTag());
delete mgr;
}
else
{
// Reinstall timer.
if ( ! is_expire )
{
double n = mgr->LastAdvance() +
timer_mgr_inactivity_timeout;
timer_mgr->Add(new TimerMgrExpireTimer(n, mgr));
}
}
}
void IPTunnelTimer::Dispatch(double t, int is_expire)
{
NetSessions::IPTunnelMap::const_iterator it =
@ -210,59 +184,6 @@ void NetSessions::NextPacket(double t, const Packet* pkt)
DumpPacket(pkt);
}
int NetSessions::CheckConnectionTag(Connection* conn)
{
if ( current_iosrc->GetCurrentTag() )
{
// Packet is tagged.
if ( conn->GetTimerMgr() == timer_mgr )
{
// Connection uses global timer queue. But the
// packet has a tag that means we got it externally,
// probably from the Time Machine.
DBG_LOG(DBG_TM, "got packet with tag %s for already"
"known connection, reinstantiating",
current_iosrc->GetCurrentTag()->c_str());
return 0;
}
else
{
// Connection uses local timer queue.
TimerMgrMap::iterator i =
timer_mgrs.find(*current_iosrc->GetCurrentTag());
if ( i != timer_mgrs.end() &&
conn->GetTimerMgr() != i->second )
{
// Connection uses different local queue
// than the tag for the current packet
// indicates.
//
// This can happen due to:
// (1) getting same packets with
// different tags
// (2) timer mgr having already expired
DBG_LOG(DBG_TM, "packet ignored due old/inconsistent tag");
return -1;
}
return 1;
}
}
// Packet is not tagged.
if ( conn->GetTimerMgr() != timer_mgr )
{
// Connection does not use the global timer queue. That
// means that this is a live packet belonging to a
// connection for which we have already switched to
// processing external input.
DBG_LOG(DBG_TM, "packet ignored due to processing it in external data");
return -1;
}
return 1;
}
static unsigned int gre_header_len(uint16_t flags)
{
unsigned int len = 4; // Always has 2 byte flags and 2 byte protocol type.
@ -735,13 +656,8 @@ void NetSessions::DoNextPacket(double t, const Packet* pkt, const IP_Hdr* ip_hdr
else
{
// We already know that connection.
int consistent = CheckConnectionTag(conn);
if ( consistent < 0 )
return;
if ( ! consistent || conn->IsReuse(t, data) )
if ( conn->IsReuse(t, data) )
{
if ( consistent )
conn->Event(connection_reused, 0);
Remove(conn);
@ -1158,8 +1074,6 @@ void NetSessions::Drain()
ic->Done();
ic->RemovalEvent();
}
ExpireTimerMgrs();
}
void NetSessions::GetStats(SessionStats& s) const
@ -1233,25 +1147,9 @@ Connection* NetSessions::NewConn(const ConnIDKey& k, double t, const ConnID* id,
return 0;
}
bool external = conn->IsExternal();
if ( external )
conn->AppendAddl(fmt("tag=%s",
conn->GetTimerMgr()->GetTag().c_str()));
if ( new_connection )
{
conn->Event(new_connection, 0);
if ( external && connection_external )
{
conn->ConnectionEventFast(connection_external, 0, {
conn->BuildConnVal(),
new StringVal(conn->GetTimerMgr()->GetTag().c_str()),
});
}
}
return conn;
}
@ -1338,45 +1236,6 @@ bool NetSessions::WantConnection(uint16_t src_port, uint16_t dst_port,
return true;
}
TimerMgr* NetSessions::LookupTimerMgr(const std::string* tag, bool create)
{
if ( ! tag )
{
DBG_LOG(DBG_TM, "no tag, using global timer mgr %p", timer_mgr);
return timer_mgr;
}
TimerMgrMap::iterator i = timer_mgrs.find(*tag);
if ( i != timer_mgrs.end() )
{
DBG_LOG(DBG_TM, "tag %s, using non-global timer mgr %p", tag->c_str(), i->second);
return i->second;
}
else
{
if ( ! create )
return 0;
// Create new queue for tag.
TimerMgr* mgr = new PQ_TimerMgr(*tag);
DBG_LOG(DBG_TM, "tag %s, creating new non-global timer mgr %p", tag->c_str(), mgr);
timer_mgrs.insert(TimerMgrMap::value_type(*tag, mgr));
double t = timer_mgr->Time() + timer_mgr_inactivity_timeout;
timer_mgr->Add(new TimerMgrExpireTimer(t, mgr));
return mgr;
}
}
void NetSessions::ExpireTimerMgrs()
{
for ( TimerMgrMap::iterator i = timer_mgrs.begin();
i != timer_mgrs.end(); ++i )
{
i->second->Expire();
delete i->second;
}
}
void NetSessions::DumpPacket(const Packet *pkt, int len)
{
if ( ! pkt_dumper )

View file

@ -44,20 +44,6 @@ struct SessionStats {
uint64_t num_packets;
};
// Drains and deletes a timer manager if it hasn't seen any advances
// for an interval timer_mgr_inactivity_timeout.
class TimerMgrExpireTimer : public Timer {
public:
TimerMgrExpireTimer(double t, TimerMgr* arg_mgr)
: Timer(t, TIMER_TIMERMGR_EXPIRE), mgr(arg_mgr)
{ }
void Dispatch(double t, int is_expire) override;
protected:
TimerMgr* mgr;
};
class NetSessions {
public:
NetSessions();
@ -101,13 +87,6 @@ public:
return packet_filter;
}
// Looks up timer manager associated with tag. If tag is unknown and
// "create" is true, creates new timer manager and stores it. Returns
// global timer manager if tag is nil.
TimerMgr* LookupTimerMgr(const std::string* tag, bool create = true);
void ExpireTimerMgrs();
analyzer::stepping_stone::SteppingStoneManager* GetSTPManager() { return stp_manager; }
unsigned int CurrentConnections()
@ -168,7 +147,6 @@ public:
protected:
friend class ConnCompressor;
friend class TimerMgrExpireTimer;
friend class IPTunnelTimer;
using ConnectionMap = std::map<ConnIDKey, Connection*>;
@ -180,13 +158,6 @@ protected:
Connection* LookupConn(const ConnectionMap& conns, const ConnIDKey& key);
// Check whether the tag of the current packet is consistent with
// the given connection. Returns:
// -1 if current packet is to be completely ignored.
// 0 if tag is not consistent and new conn should be instantiated.
// 1 if tag is consistent, i.e., packet is part of connection.
int CheckConnectionTag(Connection* conn);
// Returns true if the port corresonds to an application
// for which there's a Bro analyzer (even if it might not
// be used by the present policy script), or it's more
@ -243,11 +214,6 @@ protected:
int dump_this_packet; // if true, current packet should be recorded
uint64_t num_packets_processed;
PacketProfiler* pkt_profiler;
// We may use independent timer managers for different sets of related
// activity. The managers are identified by a unique tag.
typedef std::map<std::string, TimerMgr*> TimerMgrMap;
TimerMgrMap timer_mgrs;
};

View file

@ -117,12 +117,11 @@ void ProfileLogger::Log()
int conn_mem_use = expensive ? sessions->ConnectionMemoryUsage() : 0;
file->Write(fmt("%.06f Conns: total=%" PRIu64 " current=%" PRIu64 "/%" PRIi32 " ext=%" PRIu64 " mem=%" PRIi32 "K avg=%.1f table=%" PRIu32 "K connvals=%" PRIu32 "K\n",
file->Write(fmt("%.06f Conns: total=%" PRIu64 " current=%" PRIu64 "/%" PRIi32 " mem=%" PRIi32 "K avg=%.1f table=%" PRIu32 "K connvals=%" PRIu32 "K\n",
network_time,
Connection::TotalConnections(),
Connection::CurrentConnections(),
sessions->CurrentConnections(),
Connection::CurrentExternalConnections(),
conn_mem_use,
expensive ? (conn_mem_use / double(sessions->CurrentConnections())) : 0,
expensive ? sessions->MemoryAllocation() / 1024 : 0,

View file

@ -60,12 +60,11 @@ void Timer::Describe(ODesc* d) const
unsigned int TimerMgr::current_timers[NUM_TIMER_TYPES];
TimerMgr::TimerMgr(const std::string& arg_tag)
TimerMgr::TimerMgr()
{
t = 0.0;
num_expired = 0;
last_advance = last_timestamp = 0;
tag = arg_tag;
if ( iosource_mgr )
iosource_mgr->Register(this, true);
@ -73,13 +72,11 @@ TimerMgr::TimerMgr(const std::string& arg_tag)
TimerMgr::~TimerMgr()
{
DBG_LOG(DBG_TM, "deleting timer mgr %p", this);
}
int TimerMgr::Advance(double arg_t, int max_expire)
{
DBG_LOG(DBG_TM, "advancing %stimer mgr %p to %.6f",
this == timer_mgr ? "global " : "", this, arg_t);
DBG_LOG(DBG_TM, "advancing timer mgr to %.6f", arg_t);
t = arg_t;
last_timestamp = 0;
@ -111,7 +108,7 @@ void TimerMgr::InitPostScript()
}
PQ_TimerMgr::PQ_TimerMgr(const std::string& tag) : TimerMgr(tag)
PQ_TimerMgr::PQ_TimerMgr() : TimerMgr()
{
q = new PriorityQueue;
}
@ -123,8 +120,8 @@ PQ_TimerMgr::~PQ_TimerMgr()
void PQ_TimerMgr::Add(Timer* timer)
{
DBG_LOG(DBG_TM, "Adding timer %s to TimeMgr %p at %.6f",
timer_type_to_string(timer->Type()), this, timer->Time());
DBG_LOG(DBG_TM, "Adding timer %s (%p) at %.6f",
timer_type_to_string(timer->Type()), timer, timer->Time());
// Add the timer even if it's already expired - that way, if
// multiple already-added timers are added, they'll still
@ -140,8 +137,8 @@ void PQ_TimerMgr::Expire()
Timer* timer;
while ( (timer = Remove()) )
{
DBG_LOG(DBG_TM, "Dispatching timer %s in TimeMgr %p",
timer_type_to_string(timer->Type()), this);
DBG_LOG(DBG_TM, "Dispatching timer %s (%p)",
timer_type_to_string(timer->Type()), timer);
timer->Dispatch(t, 1);
--current_timers[timer->Type()];
delete timer;
@ -162,8 +159,8 @@ int PQ_TimerMgr::DoAdvance(double new_t, int max_expire)
// whether we should delete it too.
(void) Remove();
DBG_LOG(DBG_TM, "Dispatching timer %s in TimeMgr %p",
timer_type_to_string(timer->Type()), this);
DBG_LOG(DBG_TM, "Dispatching timer %s (%p)",
timer_type_to_string(timer->Type()), timer);
timer->Dispatch(new_t, 0);
delete timer;

View file

@ -104,8 +104,6 @@ public:
double Time() const { return t ? t : 1; } // 1 > 0
const std::string& GetTag() const { return tag; }
virtual int Size() const = 0;
virtual int PeakSize() const = 0;
virtual uint64_t CumulativeNum() const = 0;
@ -122,7 +120,7 @@ public:
// IOSource API methods
virtual double GetNextTimeout() override { return -1; }
virtual void Process() override;
virtual const char* Tag() override { return fmt("TimerMgr %s", tag.c_str()); }
virtual const char* Tag() override { return "TimerMgr"; }
/**
* Performs some extra initialization on a timer manager. This shouldn't
@ -131,7 +129,7 @@ public:
void InitPostScript();
protected:
explicit TimerMgr(const std::string& arg_tag);
TimerMgr();
virtual int DoAdvance(double t, int max_expire) = 0;
virtual void Remove(Timer* timer) = 0;
@ -139,7 +137,6 @@ protected:
double t;
double last_timestamp;
double last_advance;
std::string tag;
int num_expired;
@ -148,7 +145,7 @@ protected:
class PQ_TimerMgr : public TimerMgr {
public:
explicit PQ_TimerMgr(const std::string& arg_tag);
PQ_TimerMgr();
~PQ_TimerMgr() override;
void Add(Timer* timer) override;

View file

@ -731,7 +731,7 @@ void Analyzer::AddTimer(analyzer_timer_func timer, double t,
Timer* analyzer_timer = new
AnalyzerTimer(this, timer, t, do_expire, type);
Conn()->GetTimerMgr()->Add(analyzer_timer);
timer_mgr->Add(analyzer_timer);
timers.push_back(analyzer_timer);
}
@ -751,7 +751,7 @@ void Analyzer::CancelTimers()
// TODO: could be a for_each
for ( auto timer : tmp )
Conn()->GetTimerMgr()->Cancel(timer);
timer_mgr->Cancel(timer);
timers_canceled = 1;
timers.clear();
@ -923,4 +923,3 @@ void TransportLayerAnalyzer::PacketContents(const u_char* data, int len)
Event(packet_contents, contents);
}
}

View file

@ -70,17 +70,6 @@ public:
*/
virtual void Process() = 0;
/**
* Returns the tag of the timer manager associated with the last
* procesees data item.
*
* Can be overridden by derived classes.
*
* @return The tag, or null for the global timer manager.
*
*/
virtual std::string* GetCurrentTag() { return 0; }
/**
* Returns a descriptive tag representing the source for debugging.
*

View file

@ -550,7 +550,7 @@ int main(int argc, char** argv)
createCurrentDoc("1.0"); // Set a global XML document
#endif
timer_mgr = new PQ_TimerMgr("<GLOBAL>");
timer_mgr = new PQ_TimerMgr();
auto zeekygen_cfg = options.zeekygen_config_file.value_or("");
zeekygen_mgr = new zeekygen::Manager(zeekygen_cfg, bro_argv[0]);

View file

@ -88,7 +88,6 @@ function get_conn_stats%(%): ConnStats
r->Assign(n++, val_mgr->GetCount(Connection::TotalConnections()));
r->Assign(n++, val_mgr->GetCount(Connection::CurrentConnections()));
r->Assign(n++, val_mgr->GetCount(Connection::CurrentExternalConnections()));
r->Assign(n++, val_mgr->GetCount(sessions->CurrentConnections()));
SessionStats s;

View file

@ -1788,19 +1788,6 @@ function log10%(d: double%): double
#
# ===========================================================================
## Determines whether a connection has been received externally. For example,
## Broccoli or the Time Machine can send packets to Zeek via a mechanism that is
## one step lower than sending events. This function checks whether the packets
## of a connection stem from one of these external *packet sources*.
##
## c: The connection to test.
##
## Returns: True if *c* has been received externally.
function is_external_connection%(c: connection%) : bool
%{
return val_mgr->GetBool(c && c->IsExternal());
%}
## Returns the ID of the analyzer which raised the current event.
##
## Returns: The ID of the analyzer which raised the current event, or 0 if