Move threading classes to zeek namespaces

This commit is contained in:
Tim Wojtulewicz 2020-08-01 10:48:46 -07:00
parent f310795d79
commit 1262109e5a
42 changed files with 299 additions and 210 deletions

View file

@ -7,7 +7,7 @@
#include "Manager.h"
#include "util.h"
using namespace threading;
namespace zeek::threading {
static const int STD_FMT_BUF_LEN = 2048;
@ -193,3 +193,5 @@ void* BasicThread::launcher(void *arg)
return nullptr;
}
} // namespace zeek::threading

View file

@ -1,14 +1,16 @@
#pragma once
#include "zeek-config.h"
#include <stdint.h>
#include <iosfwd>
#include <thread>
namespace threading {
ZEEK_FORWARD_DECLARE_NAMESPACED(Manager, zeek, threading);
class Manager;
namespace zeek::threading {
/**
* Base class for all threads.
@ -211,4 +213,8 @@ private:
static uint64_t thread_counter;
};
} // namespace zeek::threading
namespace threading {
using BasicThread [[deprecated("Remove in v4.1. Use zeek::threading::BasicThread.")]] = zeek::threading::BasicThread;
}

View file

@ -8,10 +8,10 @@
#include "MsgThread.h"
#include "bro_inet_ntop.h"
using namespace threading;
using namespace formatter;
using threading::Value;
using threading::Field;
using zeek::threading::Value;
using zeek::threading::Field;
namespace zeek::threading {
Formatter::Formatter(threading::MsgThread* t)
{
@ -125,3 +125,5 @@ std::string Formatter::Render(TransportProto proto)
else
return "unknown";
}
} // namespace zeek::threading::formatter

View file

@ -7,11 +7,9 @@
#include "Type.h"
#include "SerialTypes.h"
namespace threading {
ZEEK_FORWARD_DECLARE_NAMESPACED(MsgThread, zeek, threading);
class MsgThread;
namespace formatter {
namespace zeek::threading {
/**
* A thread-safe class for converting values into some textual format. This
@ -164,4 +162,8 @@ private:
threading::MsgThread* thread;
};
}}
} // zeek::threading
namespace threading::formatter {
using Formatter [[deprecated("Remove in v4.1. Use zeek::threading::Formatter.")]] = zeek::threading::Formatter;
}

View file

@ -8,7 +8,8 @@
#include "Event.h"
#include "IPAddr.h"
using namespace threading;
namespace zeek::threading {
namespace detail {
void HeartbeatTimer::Dispatch(double t, bool is_expire)
{
@ -19,6 +20,8 @@ void HeartbeatTimer::Dispatch(double t, bool is_expire)
thread_mgr->StartHeartbeatTimer();
}
} // namespace detail
Manager::Manager()
{
DBG_LOG(zeek::DBG_THREADING, "Creating thread manager ...");
@ -127,7 +130,7 @@ void Manager::SendHeartbeats()
void Manager::StartHeartbeatTimer()
{
heartbeat_timer_running = true;
zeek::detail::timer_mgr->Add(new HeartbeatTimer(network_time + zeek::BifConst::Threading::heartbeat_interval));
zeek::detail::timer_mgr->Add(new detail::HeartbeatTimer(network_time + zeek::BifConst::Threading::heartbeat_interval));
}
// Raise everything in here as warnings so it is passed to scriptland without
@ -269,3 +272,5 @@ const threading::Manager::msg_stats_list& threading::Manager::GetMsgThreadStats(
return stats;
}
} // namespace zeek::threading

View file

@ -7,7 +7,9 @@
#include <list>
#include <utility>
namespace zeek {
namespace threading {
namespace detail {
class HeartbeatTimer final : public zeek::detail::Timer {
public:
@ -21,6 +23,8 @@ protected:
void Init();
};
} // namespace detail
/**
* The thread manager coordinates all child threads. Once a BasicThread is
* instantitated, it gets addedd to the manager, which will delete it later
@ -103,7 +107,7 @@ public:
protected:
friend class BasicThread;
friend class MsgThread;
friend class HeartbeatTimer;
friend class detail::HeartbeatTimer;
/**
* Registers a new basic thread with the manager. This is
@ -151,10 +155,21 @@ private:
bool heartbeat_timer_running = false;
};
}
} // namespace threading
/**
* A singleton instance of the thread manager. All methods must only be
* called from Bro's main thread.
* called from Zeek's main thread.
*/
extern threading::Manager* thread_mgr;
} // namespace zeek
extern zeek::threading::Manager*& thread_mgr [[deprecated("Remove in v4.1. Use zeek::thread_mgr.")]];
namespace threading::detail {
using HeartbeatTimer [[deprecated("Remove in v4.1. Use zeek::threading::detail::HeartbeatTimer.")]] = zeek::threading::detail::HeartbeatTimer;
}
namespace threading {
using Manager [[deprecated("Remove in v4.1. Use zeek::threading::Manager.")]] = zeek::threading::Manager;
}

View file

@ -8,9 +8,11 @@
#include "Manager.h"
#include "iosource/Manager.h"
using namespace threading;
// Set by Zeek's main signal handler.
extern int signal_val;
namespace threading {
namespace zeek::threading {
namespace detail {
////// Messages.
@ -124,8 +126,6 @@ private:
};
#endif
}
// An event that the child wants to pass into the main event queue
class SendEventMessage final : public OutputMessage<MsgThread> {
public:
@ -151,13 +151,6 @@ private:
Value* *val;
};
////// Methods.
Message::~Message()
{
delete [] name;
}
bool ReporterMessage::Process()
{
switch ( type ) {
@ -197,6 +190,15 @@ bool ReporterMessage::Process()
return true;
}
} // namespace detail
////// Methods.
Message::~Message()
{
delete [] name;
}
MsgThread::MsgThread() : BasicThread(), queue_in(this, nullptr), queue_out(nullptr, this)
{
cnt_sent_in = cnt_sent_out = 0;
@ -219,9 +221,6 @@ MsgThread::~MsgThread()
zeek::iosource_mgr->UnregisterFd(flare.FD(), this);
}
// Set by Bro's main signal handler.
extern int signal_val;
void MsgThread::OnSignalStop()
{
if ( main_finished || Killed() || child_sent_finish )
@ -229,7 +228,7 @@ void MsgThread::OnSignalStop()
child_sent_finish = true;
// Signal thread to terminate.
SendIn(new FinishMessage(this, network_time), true);
SendIn(new detail::FinishMessage(this, network_time), true);
}
void MsgThread::OnWaitForStop()
@ -303,43 +302,43 @@ void MsgThread::Heartbeat()
if ( child_sent_finish )
return;
SendIn(new HeartbeatMessage(this, network_time, current_time()));
SendIn(new detail::HeartbeatMessage(this, network_time, current_time()));
}
void MsgThread::Finished()
{
child_finished = true;
SendOut(new FinishedMessage(this));
SendOut(new detail::FinishedMessage(this));
}
void MsgThread::Info(const char* msg)
{
SendOut(new ReporterMessage(ReporterMessage::INFO, this, msg));
SendOut(new detail::ReporterMessage(detail::ReporterMessage::INFO, this, msg));
}
void MsgThread::Warning(const char* msg)
{
SendOut(new ReporterMessage(ReporterMessage::WARNING, this, msg));
SendOut(new detail::ReporterMessage(detail::ReporterMessage::WARNING, this, msg));
}
void MsgThread::Error(const char* msg)
{
SendOut(new ReporterMessage(ReporterMessage::ERROR, this, msg));
SendOut(new detail::ReporterMessage(detail::ReporterMessage::ERROR, this, msg));
}
void MsgThread::FatalError(const char* msg)
{
SendOut(new ReporterMessage(ReporterMessage::FATAL_ERROR, this, msg));
SendOut(new detail::ReporterMessage(detail::ReporterMessage::FATAL_ERROR, this, msg));
}
void MsgThread::FatalErrorWithCore(const char* msg)
{
SendOut(new ReporterMessage(ReporterMessage::FATAL_ERROR_WITH_CORE, this, msg));
SendOut(new detail::ReporterMessage(detail::ReporterMessage::FATAL_ERROR_WITH_CORE, this, msg));
}
void MsgThread::InternalWarning(const char* msg)
{
SendOut(new ReporterMessage(ReporterMessage::INTERNAL_WARNING, this, msg));
SendOut(new detail::ReporterMessage(detail::ReporterMessage::INTERNAL_WARNING, this, msg));
}
void MsgThread::InternalError(const char* msg)
@ -353,7 +352,7 @@ void MsgThread::InternalError(const char* msg)
void MsgThread::Debug(zeek::DebugStream stream, const char* msg)
{
SendOut(new DebugMessage(stream, this, msg));
SendOut(new detail::DebugMessage(stream, this, msg));
}
#endif
@ -390,7 +389,7 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force)
void MsgThread::SendEvent(const char* name, const int num_vals, Value* *vals)
{
SendOut(new SendEventMessage(this, name, num_vals, vals));
SendOut(new detail::SendEventMessage(this, name, num_vals, vals));
}
BasicOutputMessage* MsgThread::RetrieveOut()
@ -440,7 +439,7 @@ void MsgThread::Run()
// after all other outgoing messages (in particular
// error messages have been processed by then main
// thread).
SendOut(new KillMeMessage(this));
SendOut(new detail::KillMeMessage(this));
failed = true;
}
}
@ -483,3 +482,5 @@ void MsgThread::Process()
delete msg;
}
}
} // namespace zeek::threading

View file

@ -8,13 +8,29 @@
#include "iosource/IOSource.h"
#include "Flare.h"
namespace zeek::threading {
struct Value;
struct Field;
}
namespace threading {
using Value [[deprecated("Remove in v4.1. Use zeek::threading::Value.")]] = zeek::threading::Value;
using Field [[deprecated("Remove in v4.1. Use zeek::threading::Field.")]] = zeek::threading::Field;
}
namespace zeek::threading {
class BasicInputMessage;
class BasicOutputMessage;
namespace detail {
// These classes are marked as friends later so they need to be forward declared.
class HeartbeatMessage;
struct Value;
struct Field;
class FinishMessage;
class FinishedMessage;
class KillMeMessage;
}
/**
* A specialized thread that provides bi-directional message passing between
@ -205,10 +221,10 @@ public:
protected:
friend class Manager;
friend class HeartbeatMessage;
friend class FinishMessage;
friend class FinishedMessage;
friend class KillMeMessage;
friend class detail::HeartbeatMessage;
friend class detail::FinishMessage;
friend class detail::FinishedMessage;
friend class detail::KillMeMessage;
/**
* Pops a message sent by the child from the child-to-main queue.
@ -464,4 +480,13 @@ private:
O* object;
};
}
} // namespace zeek::threading
namespace threading {
using MsgThread [[deprecated("Remove in v4.1. Use zeek::threading::MsgThread.")]] = zeek::threading::MsgThread;
using Message [[deprecated("Remove in v4.1. Use zeek::threading::Message.")]] = zeek::threading::Message;
using BasicInputMessage [[deprecated("Remove in v4.1. Use zeek::threading::BasicInputMessage.")]] = zeek::threading::BasicInputMessage;
using BasicOutputMessage [[deprecated("Remove in v4.1. Use zeek::threading::BasicOutputMessage.")]] = zeek::threading::BasicOutputMessage;
template<typename O> using InputMessage [[deprecated("Remove in v4.1. Use zeek::threading::InputMessage.")]] = zeek::threading::InputMessage<O>;
template<typename O> using OutputMessage [[deprecated("Remove in v4.1. Use zeek::threading::OutputMessage.")]] = zeek::threading::OutputMessage<O>;
} // namespace threading

View file

@ -12,7 +12,7 @@
#undef Queue // Defined elsewhere unfortunately.
namespace threading {
namespace zeek::threading {
/**
* A thread-safe single-reader single-writer queue.
@ -261,4 +261,8 @@ inline void Queue<T>::WakeUp()
}
}
} // namespace zeek::threading
namespace threading {
template<typename T> using Queue [[deprecated("Remove in v4.1. Use zeek::threading::Queue.")]] = zeek::threading::Queue<T>;
}

View file

@ -14,7 +14,7 @@
#include "Scope.h"
#include "IPAddr.h"
using namespace threading;
namespace zeek::threading {
bool Field::Read(zeek::detail::SerializationFormat* fmt)
{
@ -636,3 +636,5 @@ zeek::Val* Value::ValueToVal(const std::string& source, const Value* val, bool&
assert(false);
return nullptr;
}
} // namespace zeek::threading

View file

@ -11,7 +11,7 @@
ZEEK_FORWARD_DECLARE_NAMESPACED(SerializationFormat, zeek::detail);
namespace threading {
namespace zeek::threading {
/**
* Definition of a log file, i.e., one column of a log stream.
@ -214,4 +214,9 @@ private:
Value(const Value& other) = delete;
};
} // namespace zeek::threading
namespace threading {
using Field [[deprecated("Remove in v4.1. Use zeek::threading::Field.")]] = zeek::threading::Field;
using Value [[deprecated("Remove in v4.1. Use zeek::threading::Value.")]] = zeek::threading::Value;
}

View file

@ -10,7 +10,8 @@
#include <errno.h>
using namespace std;
using namespace threading::formatter;
namespace zeek::threading::formatter {
// If the value we'd write out would match exactly the a reserved string, we
// escape the first character so that the output won't be ambigious. If this
@ -47,7 +48,7 @@ Ascii::SeparatorInfo::SeparatorInfo(const string& arg_separator,
empty_field = arg_empty_field;
}
Ascii::Ascii(threading::MsgThread* t, const SeparatorInfo& info) : Formatter(t)
Ascii::Ascii(zeek::threading::MsgThread* t, const SeparatorInfo& info) : zeek::threading::Formatter(t)
{
separators = info;
}
@ -56,8 +57,8 @@ Ascii::~Ascii()
{
}
bool Ascii::Describe(zeek::ODesc* desc, int num_fields, const threading::Field* const * fields,
threading::Value** vals) const
bool Ascii::Describe(zeek::ODesc* desc, int num_fields, const zeek::threading::Field* const * fields,
zeek::threading::Value** vals) const
{
for ( int i = 0; i < num_fields; i++ )
{
@ -71,7 +72,7 @@ bool Ascii::Describe(zeek::ODesc* desc, int num_fields, const threading::Field*
return true;
}
bool Ascii::Describe(zeek::ODesc* desc, threading::Value* val, const string& name) const
bool Ascii::Describe(zeek::ODesc* desc, zeek::threading::Value* val, const string& name) const
{
if ( ! val->present )
{
@ -207,12 +208,12 @@ bool Ascii::Describe(zeek::ODesc* desc, threading::Value* val, const string& nam
}
threading::Value* Ascii::ParseValue(const string& s, const string& name, zeek::TypeTag type, zeek::TypeTag subtype) const
zeek::threading::Value* Ascii::ParseValue(const string& s, const string& name, zeek::TypeTag type, zeek::TypeTag subtype) const
{
if ( ! separators.unset_field.empty() && s.compare(separators.unset_field) == 0 ) // field is not set...
return new threading::Value(type, false);
return new zeek::threading::Value(type, false);
threading::Value* val = new threading::Value(type, subtype, true);
zeek::threading::Value* val = new zeek::threading::Value(type, subtype, true);
const char* start = s.c_str();
char* end = nullptr;
errno = 0;
@ -373,7 +374,7 @@ threading::Value* Ascii::ParseValue(const string& s, const string& name, zeek::T
if ( separators.empty_field.empty() && s.empty() )
length = 0;
threading::Value** lvals = new threading::Value* [length];
zeek::threading::Value** lvals = new zeek::threading::Value* [length];
if ( type == zeek::TYPE_TABLE )
{
@ -409,7 +410,7 @@ threading::Value* Ascii::ParseValue(const string& s, const string& name, zeek::T
break;
}
threading::Value* newval = ParseValue(element, name, subtype);
zeek::threading::Value* newval = ParseValue(element, name, subtype);
if ( newval == nullptr )
{
GetThread()->Warning("Error while reading set or vector");
@ -474,7 +475,7 @@ parse_error:
bool Ascii::CheckNumberError(const char* start, const char* end) const
{
threading::MsgThread* thread = GetThread();
zeek::threading::MsgThread* thread = GetThread();
if ( end == start && *end != '\0' ) {
thread->Warning(thread->Fmt("String '%s' contained no parseable number", start));
@ -504,3 +505,5 @@ bool Ascii::CheckNumberError(const char* start, const char* end) const
return false;
}
} // namespace zeek::threading::formatter

View file

@ -4,9 +4,9 @@
#include "../Formatter.h"
namespace threading { namespace formatter {
namespace zeek::threading::formatter {
class Ascii final : public Formatter {
class Ascii final : public zeek::threading::Formatter {
public:
/**
* A struct to pass the necessary configuration values to the
@ -44,14 +44,14 @@ public:
* @param info SeparatorInfo structure defining the necessary
* separators.
*/
Ascii(threading::MsgThread* t, const SeparatorInfo& info);
Ascii(zeek::threading::MsgThread* t, const SeparatorInfo& info);
virtual ~Ascii();
virtual bool Describe(zeek::ODesc* desc, threading::Value* val, const std::string& name = "") const;
virtual bool Describe(zeek::ODesc* desc, int num_fields, const threading::Field* const * fields,
threading::Value** vals) const;
virtual threading::Value* ParseValue(const std::string& s, const std::string& name,
zeek::TypeTag type, zeek::TypeTag subtype = zeek::TYPE_ERROR) const;
virtual bool Describe(zeek::ODesc* desc, zeek::threading::Value* val, const std::string& name = "") const;
virtual bool Describe(zeek::ODesc* desc, int num_fields, const zeek::threading::Field* const * fields,
zeek::threading::Value** vals) const;
virtual zeek::threading::Value* ParseValue(const std::string& s, const std::string& name,
zeek::TypeTag type, zeek::TypeTag subtype = zeek::TYPE_ERROR) const;
private:
bool CheckNumberError(const char* start, const char* end) const;
@ -59,4 +59,8 @@ private:
SeparatorInfo separators;
};
}}
} // namespace zeek::threading::formatter
namespace threading::formatter {
using Ascii [[deprecated("Remove in v4.1. Use zeek::threading::formatter::Ascii.")]] = zeek::threading::formatter::Ascii;
}

View file

@ -16,7 +16,7 @@
#include <math.h>
#include <stdint.h>
using namespace threading::formatter;
namespace zeek::threading::formatter {
bool JSON::NullDoubleWriter::Double(double d)
{
@ -26,7 +26,7 @@ bool JSON::NullDoubleWriter::Double(double d)
return rapidjson::Writer<rapidjson::StringBuffer>::Double(d);
}
JSON::JSON(MsgThread* t, TimeFormat tf) : Formatter(t), surrounding_braces(true)
JSON::JSON(zeek::threading::MsgThread* t, TimeFormat tf) : zeek::threading::Formatter(t), surrounding_braces(true)
{
timestamps = tf;
}
@ -35,8 +35,8 @@ JSON::~JSON()
{
}
bool JSON::Describe(zeek::ODesc* desc, int num_fields, const Field* const * fields,
Value** vals) const
bool JSON::Describe(zeek::ODesc* desc, int num_fields, const zeek::threading::Field* const * fields,
zeek::threading::Value** vals) const
{
rapidjson::StringBuffer buffer;
NullDoubleWriter writer(buffer);
@ -55,7 +55,7 @@ bool JSON::Describe(zeek::ODesc* desc, int num_fields, const Field* const * fiel
return true;
}
bool JSON::Describe(zeek::ODesc* desc, Value* val, const std::string& name) const
bool JSON::Describe(zeek::ODesc* desc, zeek::threading::Value* val, const std::string& name) const
{
if ( desc->IsBinary() )
{
@ -78,13 +78,14 @@ bool JSON::Describe(zeek::ODesc* desc, Value* val, const std::string& name) cons
return true;
}
threading::Value* JSON::ParseValue(const std::string& s, const std::string& name, zeek::TypeTag type, zeek::TypeTag subtype) const
zeek::threading::Value* JSON::ParseValue(const std::string& s, const std::string& name,
zeek::TypeTag type, zeek::TypeTag subtype) const
{
GetThread()->Error("JSON formatter does not support parsing yet.");
return nullptr;
}
void JSON::BuildJSON(NullDoubleWriter& writer, Value* val, const std::string& name) const
void JSON::BuildJSON(NullDoubleWriter& writer, zeek::threading::Value* val, const std::string& name) const
{
if ( ! val->present )
{
@ -204,3 +205,5 @@ void JSON::BuildJSON(NullDoubleWriter& writer, Value* val, const std::string& na
break;
}
}
} // namespace zeek::threading::formatter

View file

@ -8,13 +8,13 @@
#include "../Formatter.h"
namespace threading { namespace formatter {
namespace zeek::threading::formatter {
/**
* A thread-safe class for converting values into a JSON representation
* and vice versa.
*/
class JSON : public Formatter {
class JSON : public zeek::threading::Formatter {
public:
enum TimeFormat {
TS_EPOCH, // Doubles that represents seconds from the UNIX epoch.
@ -22,14 +22,14 @@ public:
TS_MILLIS // Milliseconds from the UNIX epoch. Some consumers need this (e.g., elasticsearch).
};
JSON(threading::MsgThread* t, TimeFormat tf);
JSON(zeek::threading::MsgThread* t, TimeFormat tf);
~JSON() override;
bool Describe(zeek::ODesc* desc, threading::Value* val, const std::string& name = "") const override;
bool Describe(zeek::ODesc* desc, int num_fields, const threading::Field* const * fields,
threading::Value** vals) const override;
threading::Value* ParseValue(const std::string& s, const std::string& name, zeek::TypeTag type,
zeek::TypeTag subtype = zeek::TYPE_ERROR) const override;
bool Describe(zeek::ODesc* desc, zeek::threading::Value* val, const std::string& name = "") const override;
bool Describe(zeek::ODesc* desc, int num_fields, const zeek::threading::Field* const * fields,
zeek::threading::Value** vals) const override;
zeek::threading::Value* ParseValue(const std::string& s, const std::string& name, zeek::TypeTag type,
zeek::TypeTag subtype = zeek::TYPE_ERROR) const override;
class NullDoubleWriter : public rapidjson::Writer<rapidjson::StringBuffer> {
public:
@ -38,10 +38,14 @@ public:
};
private:
void BuildJSON(NullDoubleWriter& writer, Value* val, const std::string& name = "") const;
void BuildJSON(NullDoubleWriter& writer, zeek::threading::Value* val, const std::string& name = "") const;
TimeFormat timestamps;
bool surrounding_braces;
};
}}
} // namespace zeek::threading::formatter
namespace threading::formatter {
using JSON [[deprecated("Remove in v4.1. Use zeek::threading::formatter::JSON.")]] = zeek::threading::formatter::JSON;
}