Reformat the world

This commit is contained in:
Tim Wojtulewicz 2021-09-16 15:35:39 -07:00
parent 194cb24547
commit b2f171ec69
714 changed files with 35149 additions and 35203 deletions

View file

@ -1,13 +1,14 @@
#include "zeek/zeek-config.h"
#include "zeek/threading/BasicThread.h"
#include <signal.h>
#include <pthread.h>
#include <signal.h>
#include "zeek/threading/Manager.h"
#include "zeek/util.h"
#include "zeek/zeek-config.h"
namespace zeek::threading {
namespace zeek::threading
{
static const int STD_FMT_BUF_LEN = 2048;
@ -20,7 +21,7 @@ BasicThread::BasicThread()
killed = false;
buf_len = STD_FMT_BUF_LEN;
buf = (char*) util::safe_malloc(buf_len);
buf = (char*)util::safe_malloc(buf_len);
strerr_buffer = nullptr;
@ -34,19 +35,20 @@ BasicThread::~BasicThread()
if ( buf )
free(buf);
delete [] name;
delete [] strerr_buffer;
delete[] name;
delete[] strerr_buffer;
}
void BasicThread::SetName(const char* arg_name)
{
delete [] name;
delete[] name;
name = util::copy_string(arg_name);
}
void BasicThread::SetOSName(const char* arg_name)
{
static_assert(std::is_same<std::thread::native_handle_type, pthread_t>::value, "libstdc++ doesn't use pthread_t");
static_assert(std::is_same<std::thread::native_handle_type, pthread_t>::value,
"libstdc++ doesn't use pthread_t");
util::detail::set_thread_name(arg_name, thread.native_handle());
}
@ -55,7 +57,7 @@ const char* BasicThread::Fmt(const char* format, ...)
if ( buf_len > 10 * STD_FMT_BUF_LEN )
{
// Shrink back to normal.
buf = (char*) util::safe_realloc(buf, STD_FMT_BUF_LEN);
buf = (char*)util::safe_realloc(buf, STD_FMT_BUF_LEN);
buf_len = STD_FMT_BUF_LEN;
}
@ -64,10 +66,10 @@ const char* BasicThread::Fmt(const char* format, ...)
int n = vsnprintf(buf, buf_len, format, al);
va_end(al);
if ( (unsigned int) n >= buf_len )
if ( (unsigned int)n >= buf_len )
{ // Not enough room, grow the buffer.
buf_len = n + 32;
buf = (char*) util::safe_realloc(buf, buf_len);
buf = (char*)util::safe_realloc(buf, buf_len);
// Is it portable to restart?
va_start(al, format);
@ -119,7 +121,8 @@ void BasicThread::WaitForStop()
if ( ! started )
return;
DBG_LOG(DBG_THREADING, "Waiting for thread %s to terminate and process last queue items...", name);
DBG_LOG(DBG_THREADING, "Waiting for thread %s to terminate and process last queue items...",
name);
OnWaitForStop();
@ -166,10 +169,11 @@ void BasicThread::Done()
killed = true;
}
void* BasicThread::launcher(void *arg)
void* BasicThread::launcher(void* arg)
{
static_assert(std::is_same<std::thread::native_handle_type, pthread_t>::value, "libstdc++ doesn't use pthread_t");
BasicThread* thread = (BasicThread *)arg;
static_assert(std::is_same<std::thread::native_handle_type, pthread_t>::value,
"libstdc++ doesn't use pthread_t");
BasicThread* thread = (BasicThread*)arg;
// Block signals in thread. We handle signals only in the main
// process.
@ -194,4 +198,4 @@ void* BasicThread::launcher(void *arg)
return nullptr;
}
} // namespace zeek::threading
} // namespace zeek::threading

View file

@ -1,14 +1,14 @@
#pragma once
#include "zeek/zeek-config.h"
#include <stdint.h>
#include <iosfwd>
#include <thread>
namespace zeek::threading {
#include "zeek/zeek-config.h"
namespace zeek::threading
{
class Manager;
@ -21,7 +21,7 @@ class Manager;
* manually).
*/
class BasicThread
{
{
public:
/**
* Creates a new thread object. Instantiating the object does however
@ -35,7 +35,7 @@ public:
BasicThread();
BasicThread(BasicThread const&) = delete;
BasicThread& operator =(BasicThread const&) = delete;
BasicThread& operator=(BasicThread const&) = delete;
/**
* Returns a descriptive name for the thread. If not set via
@ -46,13 +46,13 @@ public:
const char* Name() const { return name; }
/**
* Sets a descriptive name for the thread. This should be a string
* that's useful in output presented to the user and uniquely
* identifies the thread.
*
* This method must be called only from main thread at initialization
* time.
*/
* Sets a descriptive name for the thread. This should be a string
* that's useful in output presented to the user and uniquely
* identifies the thread.
*
* This method must be called only from main thread at initialization
* time.
*/
void SetName(const char* name);
/**
@ -103,14 +103,14 @@ public:
*
* This method is safe to call from any thread.
*/
bool Terminating() const { return terminating; }
bool Terminating() const { return terminating; }
/**
* Returns true if Kill() has been called.
*
* This method is safe to call from any thread.
*/
bool Killed() const { return killed; }
bool Killed() const { return killed; }
/**
* A version of zeek::util::fmt() that the thread can safely use.
@ -118,7 +118,8 @@ public:
* This is safe to call from Run() but must not be used from any
* other thread than the current one.
*/
const char* Fmt(const char* format, ...) __attribute__((format(printf, 2, 3)));;
const char* Fmt(const char* format, ...) __attribute__((format(printf, 2, 3)));
;
/**
* A version of strerror() that the thread can safely use. This is
@ -145,14 +146,14 @@ protected:
* will be called from Bro's main thread after the OS thread has been
* started.
*/
virtual void OnStart() {}
virtual void OnStart() { }
/**
* Executed with SignalStop(). This is a hook into preparing the
* thread for stopping. It will be called from Bro's main thread
* before the thread has been signaled to stop.
*/
virtual void OnSignalStop() {}
virtual void OnSignalStop() { }
/**
* Executed with WaitForStop(). This is a hook into waiting for the
@ -165,7 +166,7 @@ protected:
/**
* Executed with Kill(). This is a hook into killing the thread.
*/
virtual void OnKill() {}
virtual void OnKill() { }
/**
* Destructor. This will be called by the manager.
@ -191,17 +192,17 @@ protected:
void Kill();
/** Called by child thread's launcher when it's done processing. */
ZEEK_DISABLE_TSAN void Done();
ZEEK_DISABLE_TSAN void Done();
private:
// thread entry function.
static void* launcher(void *arg);
static void* launcher(void* arg);
const char* name;
std::thread thread;
bool started; // Set to to true once running.
bool terminating; // Set to to true to signal termination.
bool killed; // Set to true once forcefully killed.
bool started; // Set to to true once running.
bool terminating; // Set to to true to signal termination.
bool killed; // Set to true once forcefully killed.
// For implementing Fmt().
uint32_t buf_len;
@ -211,6 +212,6 @@ private:
char* strerr_buffer;
static uint64_t thread_counter;
};
};
} // namespace zeek::threading
} // namespace zeek::threading

View file

@ -1,26 +1,25 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "zeek/zeek-config.h"
#include "zeek/threading/Formatter.h"
#include <errno.h>
#include "zeek/threading/MsgThread.h"
#include "zeek/bro_inet_ntop.h"
#include "zeek/threading/MsgThread.h"
#include "zeek/zeek-config.h"
using zeek::threading::Value;
using zeek::threading::Field;
using zeek::threading::Value;
namespace zeek::threading {
namespace zeek::threading
{
Formatter::Formatter(threading::MsgThread* t)
{
thread = t;
}
Formatter::~Formatter()
{
}
Formatter::~Formatter() { }
std::string Formatter::Render(const threading::Value::addr_t& addr)
{
@ -44,7 +43,7 @@ std::string Formatter::Render(const threading::Value::addr_t& addr)
}
}
TransportProto Formatter::ParseProto(const std::string &proto) const
TransportProto Formatter::ParseProto(const std::string& proto) const
{
if ( proto == "unknown" )
return TRANSPORT_UNKNOWN;
@ -60,9 +59,8 @@ TransportProto Formatter::ParseProto(const std::string &proto) const
return TRANSPORT_UNKNOWN;
}
// More or less verbose copy from IPAddr.cc -- which uses reporter.
threading::Value::addr_t Formatter::ParseAddr(const std::string &s) const
threading::Value::addr_t Formatter::ParseAddr(const std::string& s) const
{
threading::Value::addr_t val;
@ -126,4 +124,4 @@ std::string Formatter::Render(TransportProto proto)
return "unknown";
}
} // namespace zeek::threading::formatter
} // namespace zeek::threading::formatter

View file

@ -7,16 +7,18 @@
#include "zeek/Type.h"
#include "zeek/threading/SerialTypes.h"
namespace zeek::threading {
namespace zeek::threading
{
class MsgThread;
/**
* A thread-safe class for converting values into some textual format. This
* is a base class that implements the interface for common
* rendering/parsing code needed by a number of input/output threads.
*/
class Formatter {
* A thread-safe class for converting values into some textual format. This
* is a base class that implements the interface for common
* rendering/parsing code needed by a number of input/output threads.
*/
class Formatter
{
public:
/**
* Constructor.
@ -49,7 +51,7 @@ public:
* @return Returns true on success, false on error. Errors must also
* be flagged via the thread.
*/
virtual bool Describe(ODesc* desc, int num_fields, const Field* const * fields,
virtual bool Describe(ODesc* desc, int num_fields, const Field* const* fields,
Value** vals) const = 0;
/**
@ -138,7 +140,7 @@ public:
* @return The transport protocol, which will be \c TRANSPORT_UNKNOWN
* on error. Errors are also flagged via the thread.
*/
TransportProto ParseProto(const std::string &proto) const;
TransportProto ParseProto(const std::string& proto) const;
/**
* Convert a string into a Value::addr_t.
@ -150,17 +152,17 @@ public:
* @return The address, which will be all-zero on error. Errors are
* also flagged via the thread.
*/
Value::addr_t ParseAddr(const std::string &addr) const;
Value::addr_t ParseAddr(const std::string& addr) const;
protected:
/**
* Returns the thread associated with the formatter via the
* constructor.
*/
MsgThread* GetThread() const { return thread; }
MsgThread* GetThread() const { return thread; }
private:
MsgThread* thread;
};
};
} // namespace zeek::threading
} // namespace zeek::threading

View file

@ -3,14 +3,16 @@
#include <sys/socket.h>
#include <unistd.h>
#include "zeek/NetVar.h"
#include "zeek/iosource/Manager.h"
#include "zeek/Event.h"
#include "zeek/IPAddr.h"
#include "zeek/NetVar.h"
#include "zeek/RunState.h"
#include "zeek/iosource/Manager.h"
namespace zeek::threading {
namespace detail {
namespace zeek::threading
{
namespace detail
{
void HeartbeatTimer::Dispatch(double t, bool is_expire)
{
@ -21,7 +23,7 @@ void HeartbeatTimer::Dispatch(double t, bool is_expire)
thread_mgr->StartHeartbeatTimer();
}
} // namespace detail
} // namespace detail
Manager::Manager()
{
@ -44,26 +46,28 @@ void Manager::Terminate()
terminating = true;
// First process remaining thread output for the message threads.
do Flush(); while ( did_process );
do
Flush();
while ( did_process );
// Signal all to stop.
// Signal all to stop.
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
(*i)->SignalStop();
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
(*i)->SignalStop();
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
(*i)->WaitForStop();
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
(*i)->WaitForStop();
// Then join them all.
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
{
(*i)->Join();
delete *i;
}
// Then join them all.
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
{
(*i)->Join();
delete *i;
}
all_threads.clear();
msg_threads.clear();
terminating = false;
all_threads.clear();
msg_threads.clear();
terminating = false;
}
void Manager::AddThread(BasicThread* thread)
@ -87,7 +91,7 @@ void Manager::KillThreads()
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
(*i)->Kill();
}
}
void Manager::KillThread(BasicThread* thread)
{
@ -118,7 +122,7 @@ void Manager::SendHeartbeats()
all_threads.remove(t);
MsgThread* mt = dynamic_cast<MsgThread *>(t);
MsgThread* mt = dynamic_cast<MsgThread*>(t);
if ( mt )
msg_threads.remove(mt);
@ -131,13 +135,15 @@ void Manager::SendHeartbeats()
void Manager::StartHeartbeatTimer()
{
heartbeat_timer_running = true;
zeek::detail::timer_mgr->Add(new detail::HeartbeatTimer(run_state::network_time + BifConst::Threading::heartbeat_interval));
zeek::detail::timer_mgr->Add(new detail::HeartbeatTimer(
run_state::network_time + BifConst::Threading::heartbeat_interval));
}
// Raise everything in here as warnings so it is passed to scriptland without
// looking "fatal". In addition to these warnings, ReaderBackend will queue
// one reporter message.
bool Manager::SendEvent(MsgThread* thread, const std::string& name, const int num_vals, Value* *vals) const
bool Manager::SendEvent(MsgThread* thread, const std::string& name, const int num_vals,
Value** vals) const
{
EventHandler* handler = event_registry->Lookup(name);
if ( handler == nullptr )
@ -148,15 +154,16 @@ bool Manager::SendEvent(MsgThread* thread, const std::string& name, const int nu
}
#ifdef DEBUG
DBG_LOG(DBG_INPUT, "Thread %s: SendEvent for event %s with %d vals",
thread->Name(), name.c_str(), num_vals);
DBG_LOG(DBG_INPUT, "Thread %s: SendEvent for event %s with %d vals", thread->Name(),
name.c_str(), num_vals);
#endif
const auto& type = handler->GetType()->Params();
int num_event_vals = type->NumFields();
if ( num_vals != num_event_vals )
{
reporter->Warning("Thread %s: Wrong number of values for event %s", thread->Name(), name.c_str());
reporter->Warning("Thread %s: Wrong number of values for event %s", thread->Name(),
name.c_str());
Value::delete_value_ptr_array(vals, num_vals);
return false;
}
@ -166,7 +173,7 @@ bool Manager::SendEvent(MsgThread* thread, const std::string& name, const int nu
Args vl;
vl.reserve(num_vals);
for ( int j = 0; j < num_vals; j++)
for ( int j = 0; j < num_vals; j++ )
{
Val* v = Value::ValueToVal(std::string("thread ") + thread->Name(), vals[j], convert_error);
vl.emplace_back(AdoptRef{}, v);
@ -245,7 +252,7 @@ void Manager::Flush()
all_threads.remove(t);
MsgThread* mt = dynamic_cast<MsgThread *>(t);
MsgThread* mt = dynamic_cast<MsgThread*>(t);
if ( mt )
msg_threads.remove(mt);
@ -254,7 +261,8 @@ void Manager::Flush()
delete t;
}
// fprintf(stderr, "P %.6f %.6f do_beat=%d did_process=%d next_next=%.6f\n", run_state::network_time,
// fprintf(stderr, "P %.6f %.6f do_beat=%d did_process=%d next_next=%.6f\n",
// run_state::network_time,
// detail::timer_mgr->Time(), do_beat, (int)did_process, next_beat);
}
@ -269,10 +277,10 @@ const threading::Manager::msg_stats_list& threading::Manager::GetMsgThreadStats(
MsgThread::Stats s;
t->GetStats(&s);
stats.push_back(std::make_pair(t->Name(),s));
stats.push_back(std::make_pair(t->Name(), s));
}
return stats;
}
} // namespace zeek::threading
} // namespace zeek::threading

View file

@ -3,26 +3,29 @@
#include <list>
#include <utility>
#include "zeek/threading/MsgThread.h"
#include "zeek/Timer.h"
#include "zeek/threading/MsgThread.h"
namespace zeek {
namespace threading {
namespace detail {
namespace zeek
{
namespace threading
{
namespace detail
{
class HeartbeatTimer final : public zeek::detail::Timer {
class HeartbeatTimer final : public zeek::detail::Timer
{
public:
HeartbeatTimer(double t) : zeek::detail::Timer(t, zeek::detail::TIMER_THREAD_HEARTBEAT) {}
virtual ~HeartbeatTimer() {}
HeartbeatTimer(double t) : zeek::detail::Timer(t, zeek::detail::TIMER_THREAD_HEARTBEAT) { }
virtual ~HeartbeatTimer() { }
void Dispatch(double t, bool is_expire) override;
protected:
void Init();
};
};
} // namespace detail
} // namespace detail
/**
* The thread manager coordinates all child threads. Once a BasicThread is
@ -35,7 +38,7 @@ protected:
* the rest of Bro. It also triggers the regular heartbeats.
*/
class Manager
{
{
public:
/**
* Constructor. Only a single instance of the manager must be
@ -60,9 +63,9 @@ public:
* Returns True if we are currently in Terminate() waiting for
* threads to exit.
*/
bool Terminating() const { return terminating; }
bool Terminating() const { return terminating; }
typedef std::list<std::pair<std::string, MsgThread::Stats> > msg_stats_list;
typedef std::list<std::pair<std::string, MsgThread::Stats>> msg_stats_list;
/**
* Returns statistics from all current MsgThread instances.
@ -101,7 +104,8 @@ public:
* @param vals Values passed to the event
* @returns True on success false on failure.
*/
bool SendEvent(MsgThread* thread, const std::string& name, const int num_vals, Value* *vals) const;
bool SendEvent(MsgThread* thread, const std::string& name, const int num_vals,
Value** vals) const;
protected:
friend class BasicThread;
@ -145,16 +149,16 @@ private:
typedef std::list<MsgThread*> msg_thread_list;
msg_thread_list msg_threads;
bool did_process; // True if the last Process() found some work to do.
double next_beat; // Timestamp when the next heartbeat will be sent.
bool terminating; // True if we are in Terminate().
bool did_process; // True if the last Process() found some work to do.
double next_beat; // Timestamp when the next heartbeat will be sent.
bool terminating; // True if we are in Terminate().
msg_stats_list stats;
bool heartbeat_timer_running = false;
};
};
} // namespace threading
} // namespace threading
/**
* A singleton instance of the thread manager. All methods must only be
@ -162,4 +166,4 @@ private:
*/
extern threading::Manager* thread_mgr;
} // namespace zeek
} // namespace zeek

View file

@ -1,100 +1,113 @@
#include "zeek/threading/MsgThread.h"
#include <unistd.h>
#include <signal.h>
#include <fcntl.h>
#include <signal.h>
#include <unistd.h>
#include "zeek/DebugLogger.h"
#include "zeek/threading/Manager.h"
#include "zeek/iosource/Manager.h"
#include "zeek/RunState.h"
#include "zeek/iosource/Manager.h"
#include "zeek/threading/Manager.h"
// Set by Zeek's main signal handler.
extern int signal_val;
namespace zeek::threading {
namespace detail {
namespace zeek::threading
{
namespace detail
{
////// Messages.
// Signals child thread to shutdown operation.
class FinishMessage final : public InputMessage<MsgThread>
{
{
public:
FinishMessage(MsgThread* thread, double network_time) : InputMessage<MsgThread>("Finish", thread),
network_time(network_time) { }
FinishMessage(MsgThread* thread, double network_time)
: InputMessage<MsgThread>("Finish", thread), network_time(network_time)
{
}
bool Process() override {
bool Process() override
{
if ( Object()->child_finished )
return true;
bool result = Object()->OnFinish(network_time);
Object()->Finished();
return result;
}
}
private:
double network_time;
};
};
// Signals main thread that operations shut down.
class FinishedMessage final : public OutputMessage<MsgThread>
{
{
public:
FinishedMessage(MsgThread* thread)
: OutputMessage<MsgThread>("FinishedMessage", thread)
{ }
FinishedMessage(MsgThread* thread) : OutputMessage<MsgThread>("FinishedMessage", thread) { }
bool Process() override {
bool Process() override
{
Object()->main_finished = true;
return true;
}
};
}
};
/// Sends a heartbeat to the child thread.
class HeartbeatMessage final : public InputMessage<MsgThread>
{
{
public:
HeartbeatMessage(MsgThread* thread, double arg_network_time, double arg_current_time)
: InputMessage<MsgThread>("Heartbeat", thread)
{ network_time = arg_network_time; current_time = arg_current_time; }
{
network_time = arg_network_time;
current_time = arg_current_time;
}
bool Process() override {
return Object()->OnHeartbeat(network_time, current_time);
}
bool Process() override { return Object()->OnHeartbeat(network_time, current_time); }
private:
double network_time;
double current_time;
};
};
// A message from the child to be passed on to the Reporter.
class ReporterMessage final : public OutputMessage<MsgThread>
{
{
public:
enum Type {
INFO, WARNING, ERROR, FATAL_ERROR, FATAL_ERROR_WITH_CORE,
INTERNAL_WARNING, INTERNAL_ERROR
};
enum Type
{
INFO,
WARNING,
ERROR,
FATAL_ERROR,
FATAL_ERROR_WITH_CORE,
INTERNAL_WARNING,
INTERNAL_ERROR
};
ReporterMessage(Type arg_type, MsgThread* thread, const char* arg_msg)
: OutputMessage<MsgThread>("ReporterMessage", thread)
{ type = arg_type; msg = util::copy_string(arg_msg); }
{
type = arg_type;
msg = util::copy_string(arg_msg);
}
~ReporterMessage() override { delete [] msg; }
~ReporterMessage() override { delete[] msg; }
bool Process() override;
private:
const char* msg;
Type type;
};
};
// A message from the the child to the main process, requesting suicide.
class KillMeMessage final : public OutputMessage<MsgThread>
{
{
public:
KillMeMessage(MsgThread* thread)
: OutputMessage<MsgThread>("ReporterMessage", thread) {}
KillMeMessage(MsgThread* thread) : OutputMessage<MsgThread>("ReporterMessage", thread) { }
bool Process() override
{
@ -103,38 +116,45 @@ public:
thread_mgr->KillThread(Object());
return true;
}
};
};
#ifdef DEBUG
// A debug message from the child to be passed on to the DebugLogger.
class DebugMessage final : public OutputMessage<MsgThread>
{
{
public:
DebugMessage(DebugStream arg_stream, MsgThread* thread, const char* arg_msg)
: OutputMessage<MsgThread>("DebugMessage", thread)
{ stream = arg_stream; msg = util::copy_string(arg_msg); }
{
stream = arg_stream;
msg = util::copy_string(arg_msg);
}
~DebugMessage() override { delete [] msg; }
~DebugMessage() override { delete[] msg; }
bool Process() override
{
zeek::detail::debug_logger.Log(stream, "%s: %s", Object()->Name(), msg);
return true;
}
private:
const char* msg;
DebugStream stream;
};
};
#endif
// An event that the child wants to pass into the main event queue
class SendEventMessage final : public OutputMessage<MsgThread> {
class SendEventMessage final : public OutputMessage<MsgThread>
{
public:
SendEventMessage(MsgThread* thread, const char* name, const int num_vals, Value* *val)
: OutputMessage<MsgThread>("SendEvent", thread),
name(util::copy_string(name)), num_vals(num_vals), val(val) {}
SendEventMessage(MsgThread* thread, const char* name, const int num_vals, Value** val)
: OutputMessage<MsgThread>("SendEvent", thread), name(util::copy_string(name)),
num_vals(num_vals), val(val)
{
}
~SendEventMessage() override { delete [] name; }
~SendEventMessage() override { delete[] name; }
bool Process() override
{
@ -149,55 +169,56 @@ public:
private:
const char* name;
const int num_vals;
Value* *val;
};
Value** val;
};
bool ReporterMessage::Process()
{
switch ( type ) {
switch ( type )
{
case INFO:
reporter->Info("%s: %s", Object()->Name(), msg);
break;
case INFO:
reporter->Info("%s: %s", Object()->Name(), msg);
break;
case WARNING:
reporter->Warning("%s: %s", Object()->Name(), msg);
break;
case WARNING:
reporter->Warning("%s: %s", Object()->Name(), msg);
break;
case ERROR:
reporter->Error("%s: %s", Object()->Name(), msg);
break;
case ERROR:
reporter->Error("%s: %s", Object()->Name(), msg);
break;
case FATAL_ERROR:
reporter->FatalError("%s: %s", Object()->Name(), msg);
break;
case FATAL_ERROR:
reporter->FatalError("%s: %s", Object()->Name(), msg);
break;
case FATAL_ERROR_WITH_CORE:
reporter->FatalErrorWithCore("%s: %s", Object()->Name(), msg);
break;
case FATAL_ERROR_WITH_CORE:
reporter->FatalErrorWithCore("%s: %s", Object()->Name(), msg);
break;
case INTERNAL_WARNING:
reporter->InternalWarning("%s: %s", Object()->Name(), msg);
break;
case INTERNAL_WARNING:
reporter->InternalWarning("%s: %s", Object()->Name(), msg);
break;
case INTERNAL_ERROR :
reporter->InternalError("%s: %s", Object()->Name(), msg);
break;
case INTERNAL_ERROR:
reporter->InternalError("%s: %s", Object()->Name(), msg);
break;
default:
reporter->InternalError("unknown ReporterMessage type %d", type);
}
default:
reporter->InternalError("unknown ReporterMessage type %d", type);
}
return true;
}
} // namespace detail
} // namespace detail
////// Methods.
Message::~Message()
{
delete [] name;
delete[] name;
}
MsgThread::MsgThread() : BasicThread(), queue_in(this, nullptr), queue_out(nullptr, this)
@ -255,14 +276,18 @@ void MsgThread::OnWaitForStop()
{
// Abort all threads here so that we won't hang next
// on another one.
fprintf(stderr, "received signal while waiting for thread %s, aborting all ...\n", Name());
fprintf(stderr, "received signal while waiting for thread %s, aborting all ...\n",
Name());
thread_mgr->KillThreads();
}
else
{
// More than one signal. Abort processing
// right away. on another one.
fprintf(stderr, "received another signal while waiting for thread %s, aborting processing\n", Name());
fprintf(
stderr,
"received another signal while waiting for thread %s, aborting processing\n",
Name());
exit(1);
}
@ -275,7 +300,7 @@ void MsgThread::OnWaitForStop()
while ( HasOut() )
{
Message* msg = RetrieveOut();
assert ( msg );
assert(msg);
if ( ! msg->Process() )
reporter->Error("%s failed during thread termination", msg->Name());
@ -374,7 +399,6 @@ void MsgThread::SendIn(BasicInputMessage* msg, bool force)
++cnt_sent_in;
}
void MsgThread::SendOut(BasicOutputMessage* msg, bool force)
{
if ( Terminating() && ! force )
@ -390,7 +414,7 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force)
flare.Fire();
}
void MsgThread::SendEvent(const char* name, const int num_vals, Value* *vals)
void MsgThread::SendEvent(const char* name, const int num_vals, Value** vals)
{
SendOut(new detail::SendEventMessage(this, name, num_vals, vals));
}
@ -401,7 +425,7 @@ BasicOutputMessage* MsgThread::RetrieveOut()
if ( ! msg )
return nullptr;
DBG_LOG(DBG_THREADING, "Retrieved '%s' from %s", msg->Name(), Name());
DBG_LOG(DBG_THREADING, "Retrieved '%s' from %s", msg->Name(), Name());
return msg;
}
@ -414,7 +438,7 @@ BasicInputMessage* MsgThread::RetrieveIn()
return nullptr;
#ifdef DEBUG
std::string s = Fmt("Retrieved '%s' in %s", msg->Name(), Name());
std::string s = Fmt("Retrieved '%s' in %s", msg->Name(), Name());
Debug(DBG_THREADING, s.c_str());
#endif
@ -423,7 +447,7 @@ BasicInputMessage* MsgThread::RetrieveIn()
void MsgThread::Run()
{
while ( ! (child_finished || Killed() ) )
while ( ! (child_finished || Killed()) )
{
BasicInputMessage* msg = RetrieveIn();
@ -486,4 +510,4 @@ void MsgThread::Process()
}
}
} // namespace zeek::threading
} // namespace zeek::threading

View file

@ -3,19 +3,21 @@
#include <atomic>
#include "zeek/DebugLogger.h"
#include "zeek/Flare.h"
#include "zeek/iosource/IOSource.h"
#include "zeek/threading/BasicThread.h"
#include "zeek/threading/Queue.h"
#include "zeek/iosource/IOSource.h"
#include "zeek/Flare.h"
namespace zeek::threading {
namespace zeek::threading
{
struct Value;
struct Field;
class BasicInputMessage;
class BasicOutputMessage;
namespace detail {
namespace detail
{
// These classes are marked as friends later so they need to be forward declared.
class HeartbeatMessage;
@ -23,7 +25,7 @@ class FinishMessage;
class FinishedMessage;
class KillMeMessage;
}
}
/**
* A specialized thread that provides bi-directional message passing between
@ -37,7 +39,7 @@ class KillMeMessage;
* processes all remaining ones still in the queue, and then exits.
*/
class MsgThread : public BasicThread, public iosource::IOSource
{
{
public:
/**
* Constructor. It automatically registers the thread with the
@ -60,7 +62,7 @@ public:
*
* @param msg The message.
*/
void SendIn(BasicInputMessage* msg) { return SendIn(msg, false); }
void SendIn(BasicInputMessage* msg) { return SendIn(msg, false); }
/**
* Sends a message from the child thread to the main thread.
@ -69,7 +71,7 @@ public:
*
* @param msg The mesasge.
*/
void SendOut(BasicOutputMessage* msg) { return SendOut(msg, false); }
void SendOut(BasicOutputMessage* msg) { return SendOut(msg, false); }
/**
* Allows the child thread to send a specified Zeek event. The given Vals
@ -81,7 +83,7 @@ public:
*
* @param vals the values to be given to the event
*/
void SendEvent(const char* name, const int num_vals, Value* *vals);
void SendEvent(const char* name, const int num_vals, Value** vals);
/**
* Reports an informational message from the child thread. The main
@ -187,14 +189,15 @@ public:
*/
struct Stats
{
uint64_t sent_in; //! Number of messages sent to the child thread.
uint64_t sent_out; //! Number of messages sent from the child thread to the main thread
uint64_t pending_in; //! Number of messages sent to the child but not yet processed.
uint64_t pending_out; //! Number of messages sent from the child but not yet processed by the main thread.
uint64_t sent_in; //! Number of messages sent to the child thread.
uint64_t sent_out; //! Number of messages sent from the child thread to the main thread
uint64_t pending_in; //! Number of messages sent to the child but not yet processed.
uint64_t pending_out; //! Number of messages sent from the child but not yet processed by
//! the main thread.
/// Statistics from our queues.
Queue<BasicInputMessage *>::Stats queue_in_stats;
Queue<BasicOutputMessage *>::Stats queue_out_stats;
Queue<BasicInputMessage*>::Stats queue_in_stats;
Queue<BasicOutputMessage*>::Stats queue_out_stats;
};
/**
@ -243,9 +246,9 @@ protected:
virtual void Heartbeat();
/** Returns true if a child command has reported a failure. In that case, we'll
* be in the process of killing this thread and no further activity
* should carried out. To be called only from this child thread.
*/
* be in the process of killing this thread and no further activity
* should carried out. To be called only from this child thread.
*/
bool Failed() const { return failed; }
/**
@ -314,13 +317,13 @@ private:
* Returns true if there's at least one message pending for the child
* thread.
*/
bool HasIn() { return queue_in.Ready(); }
bool HasIn() { return queue_in.Ready(); }
/**
* Returns true if there's at least one message pending for the main
* thread.
*/
bool HasOut() { return queue_out.Ready(); }
bool HasOut() { return queue_out.Ready(); }
/**
* Returns true if there might be at least one message pending for
@ -334,25 +337,26 @@ private:
*/
void Finished();
Queue<BasicInputMessage *> queue_in;
Queue<BasicOutputMessage *> queue_out;
Queue<BasicInputMessage*> queue_in;
Queue<BasicOutputMessage*> queue_out;
std::atomic<uint64_t> cnt_sent_in; // Counts message sent to child.
std::atomic<uint64_t> cnt_sent_out; // Counts message sent by child.
std::atomic<uint64_t> cnt_sent_in; // Counts message sent to child.
std::atomic<uint64_t> cnt_sent_out; // Counts message sent by child.
bool main_finished; // Main thread is finished, meaning child_finished propagated back through message queue.
bool child_finished; // Child thread is finished.
bool main_finished; // Main thread is finished, meaning child_finished propagated back through
// message queue.
bool child_finished; // Child thread is finished.
bool child_sent_finish; // Child thread asked to be finished.
bool failed; // Set to true when a command failed.
bool failed; // Set to true when a command failed.
zeek::detail::Flare flare;
};
};
/**
* Base class for all message between Bro's main process and a MsgThread.
*/
class Message
{
{
public:
/**
* Destructor.
@ -378,18 +382,17 @@ protected:
* @param arg_name A descriptive name for the type of message. Used
* mainly for debugging purposes.
*/
explicit Message(const char* arg_name)
{ name = util::copy_string(arg_name); }
explicit Message(const char* arg_name) { name = util::copy_string(arg_name); }
private:
const char* name;
};
};
/**
* Base class for messages sent from Bro's main thread to a child MsgThread.
*/
class BasicInputMessage : public Message
{
{
protected:
/**
* Constructor.
@ -397,14 +400,14 @@ protected:
* @param name A descriptive name for the type of message. Used
* mainly for debugging purposes.
*/
explicit BasicInputMessage(const char* name) : Message(name) {}
};
explicit BasicInputMessage(const char* name) : Message(name) { }
};
/**
* Base class for messages sent from a child MsgThread to Bro's main thread.
*/
class BasicOutputMessage : public Message
{
{
protected:
/**
* Constructor.
@ -412,16 +415,15 @@ protected:
* @param name A descriptive name for the type of message. Used
* mainly for debugging purposes.
*/
explicit BasicOutputMessage(const char* name) : Message(name) {}
};
explicit BasicOutputMessage(const char* name) : Message(name) { }
};
/**
* A parameterized InputMessage that stores a pointer to an argument object.
* Normally, the objects will be used from the Process() callback.
*/
template<typename O>
class InputMessage : public BasicInputMessage
{
template <typename O> class InputMessage : public BasicInputMessage
{
public:
/**
* Returns the objects passed to the constructor.
@ -437,20 +439,18 @@ protected:
*
* @param arg_object: An object to store with the message.
*/
InputMessage(const char* name, O* arg_object) : BasicInputMessage(name)
{ object = arg_object; }
InputMessage(const char* name, O* arg_object) : BasicInputMessage(name) { object = arg_object; }
private:
O* object;
};
};
/**
* A parameterized OutputMessage that stores a pointer to an argument object.
* Normally, the objects will be used from the Process() callback.
*/
template<typename O>
class OutputMessage : public BasicOutputMessage
{
template <typename O> class OutputMessage : public BasicOutputMessage
{
public:
/**
* Returns the objects passed to the constructor.
@ -467,10 +467,12 @@ protected:
* @param arg_object An object to store with the message.
*/
OutputMessage(const char* name, O* arg_object) : BasicOutputMessage(name)
{ object = arg_object; }
{
object = arg_object;
}
private:
O* object;
};
};
} // namespace zeek::threading
} // namespace zeek::threading

View file

@ -1,18 +1,19 @@
#pragma once
#include <mutex>
#include <condition_variable>
#include <queue>
#include <deque>
#include <stdint.h>
#include <sys/time.h>
#include <condition_variable>
#include <deque>
#include <mutex>
#include <queue>
#include "zeek/Reporter.h"
#include "zeek/threading/BasicThread.h"
#undef Queue // Defined elsewhere unfortunately.
namespace zeek::threading {
namespace zeek::threading
{
/**
* A thread-safe single-reader single-writer queue.
@ -25,9 +26,8 @@ namespace zeek::threading {
* TODO: Unclear how critical performance is for this qeueue. We could likely
* optimize it further if helpful.
*/
template<typename T>
class Queue
{
template <typename T> class Queue
{
public:
/**
* Constructor.
@ -87,8 +87,8 @@ public:
*/
struct Stats
{
uint64_t num_reads; //! Number of messages read from the queue.
uint64_t num_writes; //! Number of messages written to the queue.
uint64_t num_reads; //! Number of messages read from the queue.
uint64_t num_writes; //! Number of messages written to the queue.
};
/**
@ -104,12 +104,12 @@ private:
std::vector<std::unique_lock<std::mutex>> LocksForAllQueues();
std::mutex mutex[NUM_QUEUES]; // Mutex protected shared accesses.
std::condition_variable has_data[NUM_QUEUES]; // Signals when data becomes available
std::queue<T> messages[NUM_QUEUES]; // Actually holds the queued messages
std::mutex mutex[NUM_QUEUES]; // Mutex protected shared accesses.
std::condition_variable has_data[NUM_QUEUES]; // Signals when data becomes available
std::queue<T> messages[NUM_QUEUES]; // Actually holds the queued messages
int read_ptr; // Where the next operation will read from
int write_ptr; // Where the next operation will write to
int read_ptr; // Where the next operation will read from
int write_ptr; // Where the next operation will write to
BasicThread* reader;
BasicThread* writer;
@ -117,7 +117,7 @@ private:
// Statistics.
uint64_t num_reads;
uint64_t num_writes;
};
};
inline static std::unique_lock<std::mutex> acquire_lock(std::mutex& m)
{
@ -133,8 +133,7 @@ inline static std::unique_lock<std::mutex> acquire_lock(std::mutex& m)
}
}
template<typename T>
inline Queue<T>::Queue(BasicThread* arg_reader, BasicThread* arg_writer)
template <typename T> inline Queue<T>::Queue(BasicThread* arg_reader, BasicThread* arg_writer)
{
read_ptr = 0;
write_ptr = 0;
@ -143,19 +142,16 @@ inline Queue<T>::Queue(BasicThread* arg_reader, BasicThread* arg_writer)
writer = arg_writer;
}
template<typename T>
inline Queue<T>::~Queue()
{
}
template <typename T> inline Queue<T>::~Queue() { }
template<typename T>
inline T Queue<T>::Get()
template <typename T> inline T Queue<T>::Get()
{
auto lock = acquire_lock(mutex[read_ptr]);
int old_read_ptr = read_ptr;
if ( messages[read_ptr].empty() && ! ((reader && reader->Killed()) || (writer && writer->Killed())) )
if ( messages[read_ptr].empty() &&
! ((reader && reader->Killed()) || (writer && writer->Killed())) )
{
if ( has_data[read_ptr].wait_for(lock, std::chrono::seconds(5)) == std::cv_status::timeout )
return nullptr;
@ -173,8 +169,7 @@ inline T Queue<T>::Get()
return data;
}
template<typename T>
inline void Queue<T>::Put(T data)
template <typename T> inline void Queue<T>::Put(T data)
{
auto lock = acquire_lock(mutex[write_ptr]);
@ -194,9 +189,7 @@ inline void Queue<T>::Put(T data)
}
}
template<typename T>
inline bool Queue<T>::Ready()
template <typename T> inline bool Queue<T>::Ready()
{
auto lock = acquire_lock(mutex[read_ptr]);
@ -205,8 +198,7 @@ inline bool Queue<T>::Ready()
return ret;
}
template<typename T>
inline std::vector<std::unique_lock<std::mutex>> Queue<T>::LocksForAllQueues()
template <typename T> inline std::vector<std::unique_lock<std::mutex>> Queue<T>::LocksForAllQueues()
{
std::vector<std::unique_lock<std::mutex>> locks;
@ -226,8 +218,7 @@ inline std::vector<std::unique_lock<std::mutex>> Queue<T>::LocksForAllQueues()
return locks;
}
template<typename T>
inline uint64_t Queue<T>::Size()
template <typename T> inline uint64_t Queue<T>::Size()
{
// Need to lock all queues.
auto locks = LocksForAllQueues();
@ -240,8 +231,7 @@ inline uint64_t Queue<T>::Size()
return size;
}
template<typename T>
inline void Queue<T>::GetStats(Stats* stats)
template <typename T> inline void Queue<T>::GetStats(Stats* stats)
{
// To be safe, we look all queues. That's probably unneccessary, but
// doesn't really hurt.
@ -251,8 +241,7 @@ inline void Queue<T>::GetStats(Stats* stats)
stats->num_writes = num_writes;
}
template<typename T>
inline void Queue<T>::WakeUp()
template <typename T> inline void Queue<T>::WakeUp()
{
for ( int i = 0; i < NUM_QUEUES; i++ )
{
@ -261,4 +250,4 @@ inline void Queue<T>::WakeUp()
}
}
} // namespace zeek::threading
} // namespace zeek::threading

File diff suppressed because it is too large Load diff

View file

@ -1,58 +1,67 @@
#pragma once
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include "zeek/Type.h"
#include "zeek/net_util.h"
namespace zeek::detail { class SerializationFormat; }
namespace zeek::detail
{
class SerializationFormat;
}
namespace zeek::threading {
namespace zeek::threading
{
/**
* Definition of a log file, i.e., one column of a log stream.
*/
struct Field {
const char* name; //! Name of the field.
struct Field
{
const char* name; //! Name of the field.
//! Needed by input framework. Port fields have two names (one for the
//! port, one for the type), and this specifies the secondary name.
const char* secondary_name;
TypeTag type; //! Type of the field.
TypeTag subtype; //! Inner type for sets and vectors.
bool optional; //! True if field is optional.
TypeTag type; //! Type of the field.
TypeTag subtype; //! Inner type for sets and vectors.
bool optional; //! True if field is optional.
/**
* Constructor.
*/
Field(const char* name, const char* secondary_name, TypeTag type, TypeTag subtype, bool optional)
: name(util::copy_string(name)),
secondary_name(util::copy_string(secondary_name)),
type(type), subtype(subtype), optional(optional) { }
Field(const char* name, const char* secondary_name, TypeTag type, TypeTag subtype,
bool optional)
: name(util::copy_string(name)), secondary_name(util::copy_string(secondary_name)),
type(type), subtype(subtype), optional(optional)
{
}
/**
* Copy constructor.
*/
Field(const Field& other)
: name(util::copy_string(other.name)),
secondary_name(util::copy_string(other.secondary_name)),
type(other.type), subtype(other.subtype), optional(other.optional) { }
secondary_name(util::copy_string(other.secondary_name)), type(other.type),
subtype(other.subtype), optional(other.optional)
{
}
~Field()
{
delete [] name;
delete [] secondary_name;
delete[] name;
delete[] secondary_name;
}
Field& operator=(const Field& other)
{
if ( this != &other )
{
delete [] name;
delete [] secondary_name;
delete[] name;
delete[] secondary_name;
name = util::copy_string(other.name);
secondary_name = util::copy_string(other.secondary_name);
type = other.type;
@ -91,8 +100,8 @@ struct Field {
private:
// Force usage of constructor above.
Field() {}
};
Field() { }
};
/**
* Definition of a log value, i.e., a entry logged by a stream.
@ -100,22 +109,32 @@ private:
* This struct essentialy represents a serialization of a Val instance (for
* those Vals supported).
*/
struct Value {
TypeTag type; //! The type of the value.
TypeTag subtype; //! Inner type for sets and vectors.
bool present; //! False for optional record fields that are not set.
struct Value
{
TypeTag type; //! The type of the value.
TypeTag subtype; //! Inner type for sets and vectors.
bool present; //! False for optional record fields that are not set.
struct set_t { bro_int_t size; Value** vals; };
struct set_t
{
bro_int_t size;
Value** vals;
};
typedef set_t vec_t;
struct port_t { bro_uint_t port; TransportProto proto; };
struct port_t
{
bro_uint_t port;
TransportProto proto;
};
struct addr_t {
struct addr_t
{
IPFamily family;
union {
union {
struct in_addr in4;
struct in6_addr in6;
} in;
};
} in;
};
// A small note for handling subnet values: Subnet values emitted from
// the logging framework will always have a length that is based on the
@ -124,13 +143,17 @@ struct Value {
// However, the Input framework expects the "normal" length for an IPv4
// address (so do not add 96 to it), because the underlying constructors
// for the SubNet type want it like this.
struct subnet_t { addr_t prefix; uint8_t length; };
struct subnet_t
{
addr_t prefix;
uint8_t length;
};
/**
* This union is a subset of the "underlying" values in Val subclasses,
* including only the types we can log directly. See IsCompatibleType().
*/
union _val {
/**
* This union is a subset of the "underlying" values in Val subclasses,
* including only the types we can log directly. See IsCompatibleType().
*/
union _val {
bro_int_t int_val;
bro_uint_t uint_val;
port_t port_val;
@ -141,39 +164,42 @@ struct Value {
subnet_t subnet_val;
const char* pattern_text_val;
struct {
struct
{
char* data;
int length;
} string_val;
} string_val;
_val() { memset(this, 0, sizeof(_val)); }
} val;
} val;
/**
* Constructor.
*
* arg_type: The type of the value.
*
* arg_present: False if the value represents an optional record field
* that is not set.
*/
* Constructor.
*
* arg_type: The type of the value.
*
* arg_present: False if the value represents an optional record field
* that is not set.
*/
Value(TypeTag arg_type = TYPE_ERROR, bool arg_present = true)
: type(arg_type), subtype(TYPE_VOID), present(arg_present)
{}
{
}
/**
* Constructor.
*
* arg_type: The type of the value.
*
* arg_type: The subtype of the value for sets and vectors.
*
* arg_present: False if the value represents an optional record field
* that is not set.
* Constructor.
*
* arg_type: The type of the value.
*
* arg_type: The subtype of the value for sets and vectors.
*
* arg_present: False if the value represents an optional record field
* that is not set.
*/
Value(TypeTag arg_type, TypeTag arg_subtype, bool arg_present = true)
: type(arg_type), subtype(arg_subtype), present(arg_present)
{}
{
}
/**
* Destructor.
@ -203,7 +229,7 @@ struct Value {
* Returns true if the type can be represented by a Value. If
* `atomic_only` is true, will not permit composite types. This
* method is thread-safe. */
static bool IsCompatibleType(Type* t, bool atomic_only=false);
static bool IsCompatibleType(Type* t, bool atomic_only = false);
/**
* Convenience function to delete an array of value pointers.
@ -213,21 +239,23 @@ struct Value {
static void delete_value_ptr_array(Value** vals, int num_fields);
/**
* Convert threading::Value to an internal Zeek type, just using the information given in the threading::Value.
* Convert threading::Value to an internal Zeek type, just using the information given in the
* threading::Value.
*
* @param source Name of the source of this threading value. This is used for warnings that are raised
* in case an error occurs.
* @param source Name of the source of this threading value. This is used for warnings that are
* raised in case an error occurs.
* @param val Threading Value to convert to a Zeek Val.
* @param have_error Reference to a boolean. This should be set to false when passed in and is set to true
* in case an error occurs. If this is set to false when the function is called, the function
* immediately aborts.
* @param have_error Reference to a boolean. This should be set to false when passed in and is
* set to true in case an error occurs. If this is set to false when the function is called, the
* function immediately aborts.
* @return Val representation of the threading::Value. nullptr on error.
*/
static Val* ValueToVal(const std::string& source, const threading::Value* val, bool& have_error);
static Val* ValueToVal(const std::string& source, const threading::Value* val,
bool& have_error);
private:
friend class IPAddr;
Value(const Value& other) = delete;
};
};
} // namespace zeek::threading
} // namespace zeek::threading

View file

@ -1,6 +1,5 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "zeek/zeek-config.h"
#include "threading/formatters/Ascii.h"
#include <errno.h>
@ -8,15 +7,18 @@
#include "zeek/Desc.h"
#include "zeek/threading/MsgThread.h"
#include "zeek/zeek-config.h"
using namespace std;
namespace zeek::threading::formatter {
namespace zeek::threading::formatter
{
// If the value we'd write out would match exactly the a reserved string, we
// escape the first character so that the output won't be ambigious. If this
// function returns true, it has added an escaped version of data to desc.
static inline bool escapeReservedContent(ODesc* desc, const string& reserved, const char* data, int size)
static inline bool escapeReservedContent(ODesc* desc, const string& reserved, const char* data,
int size)
{
if ( size != (int)reserved.size() || memcmp(data, reserved.data(), size) != 0 )
return false;
@ -28,7 +30,6 @@ static inline bool escapeReservedContent(ODesc* desc, const string& reserved, co
return true;
}
Ascii::SeparatorInfo::SeparatorInfo()
{
separator = "SHOULD_NOT_BE_USED";
@ -37,10 +38,8 @@ Ascii::SeparatorInfo::SeparatorInfo()
empty_field = "SHOULD_NOT_BE_USED";
}
Ascii::SeparatorInfo::SeparatorInfo(const string& arg_separator,
const string& arg_set_separator,
const string& arg_unset_field,
const string& arg_empty_field)
Ascii::SeparatorInfo::SeparatorInfo(const string& arg_separator, const string& arg_set_separator,
const string& arg_unset_field, const string& arg_empty_field)
{
separator = arg_separator;
set_separator = arg_set_separator;
@ -53,12 +52,9 @@ Ascii::Ascii(MsgThread* t, const SeparatorInfo& info) : Formatter(t)
separators = info;
}
Ascii::~Ascii()
{
}
Ascii::~Ascii() { }
bool Ascii::Describe(ODesc* desc, int num_fields, const Field* const * fields,
Value** vals) const
bool Ascii::Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals) const
{
for ( int i = 0; i < num_fields; i++ )
{
@ -80,137 +76,139 @@ bool Ascii::Describe(ODesc* desc, Value* val, const string& name) const
return true;
}
switch ( val->type ) {
case TYPE_BOOL:
desc->Add(val->val.int_val ? "T" : "F");
break;
case TYPE_INT:
desc->Add(val->val.int_val);
break;
case TYPE_COUNT:
desc->Add(val->val.uint_val);
break;
case TYPE_PORT:
desc->Add(val->val.port_val.port);
break;
case TYPE_SUBNET:
desc->Add(Render(val->val.subnet_val));
break;
case TYPE_ADDR:
desc->Add(Render(val->val.addr_val));
break;
case TYPE_DOUBLE:
// Rendering via Add() truncates trailing 0s after the
// decimal point. The difference with TIME/INTERVAL is mainly
// to keep the log format consistent.
desc->Add(val->val.double_val, true);
break;
case TYPE_INTERVAL:
case TYPE_TIME:
// Rendering via Render() keeps trailing 0s after the decimal
// point. The difference with DOUBLE is mainly to keep the
// log format consistent.
desc->Add(Render(val->val.double_val));
break;
case TYPE_ENUM:
case TYPE_STRING:
case TYPE_FILE:
case TYPE_FUNC:
switch ( val->type )
{
int size = val->val.string_val.length;
const char* data = val->val.string_val.data;
if ( ! size )
{
desc->Add(separators.empty_field);
break;
}
if ( escapeReservedContent(desc, separators.unset_field, data, size) )
case TYPE_BOOL:
desc->Add(val->val.int_val ? "T" : "F");
break;
if ( escapeReservedContent(desc, separators.empty_field, data, size) )
case TYPE_INT:
desc->Add(val->val.int_val);
break;
desc->AddN(data, size);
break;
}
case TYPE_TABLE:
{
if ( ! val->val.set_val.size )
{
desc->Add(separators.empty_field);
case TYPE_COUNT:
desc->Add(val->val.uint_val);
break;
}
desc->AddEscapeSequence(separators.set_separator);
case TYPE_PORT:
desc->Add(val->val.port_val.port);
break;
for ( bro_int_t j = 0; j < val->val.set_val.size; j++ )
{
if ( j > 0 )
desc->AddRaw(separators.set_separator);
case TYPE_SUBNET:
desc->Add(Render(val->val.subnet_val));
break;
if ( ! Describe(desc, val->val.set_val.vals[j]) )
case TYPE_ADDR:
desc->Add(Render(val->val.addr_val));
break;
case TYPE_DOUBLE:
// Rendering via Add() truncates trailing 0s after the
// decimal point. The difference with TIME/INTERVAL is mainly
// to keep the log format consistent.
desc->Add(val->val.double_val, true);
break;
case TYPE_INTERVAL:
case TYPE_TIME:
// Rendering via Render() keeps trailing 0s after the decimal
// point. The difference with DOUBLE is mainly to keep the
// log format consistent.
desc->Add(Render(val->val.double_val));
break;
case TYPE_ENUM:
case TYPE_STRING:
case TYPE_FILE:
case TYPE_FUNC:
{
desc->RemoveEscapeSequence(separators.set_separator);
return false;
int size = val->val.string_val.length;
const char* data = val->val.string_val.data;
if ( ! size )
{
desc->Add(separators.empty_field);
break;
}
if ( escapeReservedContent(desc, separators.unset_field, data, size) )
break;
if ( escapeReservedContent(desc, separators.empty_field, data, size) )
break;
desc->AddN(data, size);
break;
}
}
desc->RemoveEscapeSequence(separators.set_separator);
break;
}
case TYPE_VECTOR:
{
if ( ! val->val.vector_val.size )
{
desc->Add(separators.empty_field);
break;
}
desc->AddEscapeSequence(separators.set_separator);
for ( bro_int_t j = 0; j < val->val.vector_val.size; j++ )
{
if ( j > 0 )
desc->AddRaw(separators.set_separator);
if ( ! Describe(desc, val->val.vector_val.vals[j]) )
case TYPE_TABLE:
{
if ( ! val->val.set_val.size )
{
desc->Add(separators.empty_field);
break;
}
desc->AddEscapeSequence(separators.set_separator);
for ( bro_int_t j = 0; j < val->val.set_val.size; j++ )
{
if ( j > 0 )
desc->AddRaw(separators.set_separator);
if ( ! Describe(desc, val->val.set_val.vals[j]) )
{
desc->RemoveEscapeSequence(separators.set_separator);
return false;
}
}
desc->RemoveEscapeSequence(separators.set_separator);
return false;
break;
}
}
desc->RemoveEscapeSequence(separators.set_separator);
case TYPE_VECTOR:
{
if ( ! val->val.vector_val.size )
{
desc->Add(separators.empty_field);
break;
}
break;
desc->AddEscapeSequence(separators.set_separator);
for ( bro_int_t j = 0; j < val->val.vector_val.size; j++ )
{
if ( j > 0 )
desc->AddRaw(separators.set_separator);
if ( ! Describe(desc, val->val.vector_val.vals[j]) )
{
desc->RemoveEscapeSequence(separators.set_separator);
return false;
}
}
desc->RemoveEscapeSequence(separators.set_separator);
break;
}
default:
GetThread()->Warning(
GetThread()->Fmt("Ascii writer unsupported field format %d", val->type));
return false;
}
default:
GetThread()->Warning(GetThread()->Fmt("Ascii writer unsupported field format %d", val->type));
return false;
}
return true;
}
Value* Ascii::ParseValue(const string& s, const string& name, TypeTag type, TypeTag subtype) const
{
if ( ! separators.unset_field.empty() && s.compare(separators.unset_field) == 0 ) // field is not set...
if ( ! separators.unset_field.empty() &&
s.compare(separators.unset_field) == 0 ) // field is not set...
return new Value(type, false);
Value* val = new Value(type, subtype, true);
@ -219,255 +217,261 @@ Value* Ascii::ParseValue(const string& s, const string& name, TypeTag type, Type
errno = 0;
size_t pos;
switch ( type ) {
case TYPE_ENUM:
case TYPE_STRING:
switch ( type )
{
string unescaped = util::get_unescaped_string(s);
val->val.string_val.length = unescaped.size();
val->val.string_val.data = new char[val->val.string_val.length];
// we do not need a zero-byte at the end - the input manager adds that explicitly
memcpy(val->val.string_val.data, unescaped.data(), unescaped.size());
break;
}
case TYPE_ENUM:
case TYPE_STRING:
{
string unescaped = util::get_unescaped_string(s);
val->val.string_val.length = unescaped.size();
val->val.string_val.data = new char[val->val.string_val.length];
// we do not need a zero-byte at the end - the input manager adds that explicitly
memcpy(val->val.string_val.data, unescaped.data(), unescaped.size());
break;
}
case TYPE_BOOL:
{
auto stripped = util::strstrip(s);
if ( stripped == "T" || stripped == "1" )
val->val.int_val = 1;
else if ( stripped == "F" || stripped == "0" )
val->val.int_val = 0;
else
{
GetThread()->Warning(GetThread()->Fmt("Field: %s Invalid value for boolean: %s",
name.c_str(), start));
goto parse_error;
}
break;
}
case TYPE_BOOL:
{
auto stripped = util::strstrip(s);
if ( stripped == "T" || stripped == "1" )
val->val.int_val = 1;
else if ( stripped == "F" || stripped == "0" )
val->val.int_val = 0;
else
{
GetThread()->Warning(GetThread()->Fmt("Field: %s Invalid value for boolean: %s",
name.c_str(), start));
goto parse_error;
}
break;
}
case TYPE_INT:
val->val.int_val = strtoll(start, &end, 10);
if ( CheckNumberError(start, end) )
goto parse_error;
break;
case TYPE_INT:
val->val.int_val = strtoll(start, &end, 10);
if ( CheckNumberError(start, end) )
goto parse_error;
break;
case TYPE_DOUBLE:
case TYPE_TIME:
case TYPE_INTERVAL:
val->val.double_val = strtod(start, &end);
if ( CheckNumberError(start, end) )
goto parse_error;
break;
case TYPE_DOUBLE:
case TYPE_TIME:
case TYPE_INTERVAL:
val->val.double_val = strtod(start, &end);
if ( CheckNumberError(start, end) )
goto parse_error;
break;
case TYPE_COUNT:
val->val.uint_val = strtoull(start, &end, 10);
if ( CheckNumberError(start, end, true) )
goto parse_error;
break;
case TYPE_COUNT:
val->val.uint_val = strtoull(start, &end, 10);
if ( CheckNumberError(start, end, true) )
goto parse_error;
break;
case TYPE_PORT:
{
auto stripped = util::strstrip(s);
val->val.port_val.proto = TRANSPORT_UNKNOWN;
pos = stripped.find('/');
string numberpart;
if ( pos != std::string::npos && stripped.length() > pos + 1 )
{
auto proto = stripped.substr(pos+1);
if ( util::strtolower(proto) == "tcp" )
val->val.port_val.proto = TRANSPORT_TCP;
else if ( util::strtolower(proto) == "udp" )
val->val.port_val.proto = TRANSPORT_UDP;
else if ( util::strtolower(proto) == "icmp" )
val->val.port_val.proto = TRANSPORT_ICMP;
else if ( util::strtolower(proto) == "unknown" )
case TYPE_PORT:
{
auto stripped = util::strstrip(s);
val->val.port_val.proto = TRANSPORT_UNKNOWN;
else
GetThread()->Warning(GetThread()->Fmt("Port '%s' contained unknown protocol '%s'", s.c_str(), proto.c_str()));
}
pos = stripped.find('/');
string numberpart;
if ( pos != std::string::npos && stripped.length() > pos + 1 )
{
auto proto = stripped.substr(pos + 1);
if ( util::strtolower(proto) == "tcp" )
val->val.port_val.proto = TRANSPORT_TCP;
else if ( util::strtolower(proto) == "udp" )
val->val.port_val.proto = TRANSPORT_UDP;
else if ( util::strtolower(proto) == "icmp" )
val->val.port_val.proto = TRANSPORT_ICMP;
else if ( util::strtolower(proto) == "unknown" )
val->val.port_val.proto = TRANSPORT_UNKNOWN;
else
GetThread()->Warning(GetThread()->Fmt(
"Port '%s' contained unknown protocol '%s'", s.c_str(), proto.c_str()));
}
if ( pos != std::string::npos && pos > 0 )
{
numberpart = stripped.substr(0, pos);
start = numberpart.c_str();
}
val->val.port_val.port = strtoull(start, &end, 10);
if ( CheckNumberError(start, end, true) )
goto parse_error;
}
break;
case TYPE_SUBNET:
{
string unescaped = util::strstrip(util::get_unescaped_string(s));
size_t pos = unescaped.find('/');
if ( pos == unescaped.npos )
{
GetThread()->Warning(GetThread()->Fmt("Invalid value for subnet: %s", start));
goto parse_error;
}
string width_str = unescaped.substr(pos + 1);
uint8_t width = (uint8_t) strtol(width_str.c_str(), &end, 10);
if ( CheckNumberError(start, end) )
goto parse_error;
string addr = unescaped.substr(0, pos);
val->val.subnet_val.prefix = ParseAddr(addr);
val->val.subnet_val.length = width;
break;
}
case TYPE_ADDR:
{
string unescaped = util::strstrip(util::get_unescaped_string(s));
val->val.addr_val = ParseAddr(unescaped);
break;
}
case TYPE_PATTERN:
{
string candidate = util::get_unescaped_string(s);
// A string is a candidate pattern iff it begins and ends with
// a '/'. Rather or not the rest of the string is legal will
// be determined later when it is given to the RE engine.
if ( candidate.size() >= 2 )
{
if ( candidate.front() == candidate.back() &&
candidate.back() == '/' )
{
// Remove the '/'s
candidate.erase(0, 1);
candidate.erase(candidate.size() - 1);
val->val.pattern_text_val = util::copy_string(candidate.c_str());
break;
if ( pos != std::string::npos && pos > 0 )
{
numberpart = stripped.substr(0, pos);
start = numberpart.c_str();
}
val->val.port_val.port = strtoull(start, &end, 10);
if ( CheckNumberError(start, end, true) )
goto parse_error;
}
}
break;
GetThread()->Warning(GetThread()->Fmt("String '%s' contained no parseable pattern.", candidate.c_str()));
goto parse_error;
}
case TYPE_TABLE:
case TYPE_VECTOR:
// First - common initialization
// Then - initialization for table.
// Then - initialization for vector.
// Then - common stuff
{
// how many entries do we have...
unsigned int length = 1;
for ( const auto& c : s )
{
if ( c == separators.set_separator[0] )
length++;
}
unsigned int pos = 0;
bool error = false;
if ( separators.empty_field.size() > 0 && s.compare(separators.empty_field) == 0 )
length = 0;
if ( separators.empty_field.empty() && s.empty() )
length = 0;
Value** lvals = new Value* [length];
if ( type == TYPE_TABLE )
{
val->val.set_val.vals = lvals;
val->val.set_val.size = length;
}
else if ( type == TYPE_VECTOR )
{
val->val.vector_val.vals = lvals;
val->val.vector_val.size = length;
}
else
assert(false);
if ( length == 0 )
break; //empty
istringstream splitstream(s);
while ( splitstream )
{
string element;
if ( ! getline(splitstream, element, separators.set_separator[0]) )
break;
if ( pos >= length )
case TYPE_SUBNET:
{
GetThread()->Warning(GetThread()->Fmt("Internal error while parsing set. pos %d >= length %d."
" Element: %s", pos, length, element.c_str()));
error = true;
string unescaped = util::strstrip(util::get_unescaped_string(s));
size_t pos = unescaped.find('/');
if ( pos == unescaped.npos )
{
GetThread()->Warning(GetThread()->Fmt("Invalid value for subnet: %s", start));
goto parse_error;
}
string width_str = unescaped.substr(pos + 1);
uint8_t width = (uint8_t)strtol(width_str.c_str(), &end, 10);
if ( CheckNumberError(start, end) )
goto parse_error;
string addr = unescaped.substr(0, pos);
val->val.subnet_val.prefix = ParseAddr(addr);
val->val.subnet_val.length = width;
break;
}
Value* newval = ParseValue(element, name, subtype);
if ( newval == nullptr )
case TYPE_ADDR:
{
GetThread()->Warning("Error while reading set or vector");
error = true;
string unescaped = util::strstrip(util::get_unescaped_string(s));
val->val.addr_val = ParseAddr(unescaped);
break;
}
lvals[pos] = newval;
pos++;
}
// Test if the string ends with a set_separator... or if the
// complete string is empty. In either of these cases we have
// to push an empty val on top of it.
if ( ! error && (s.empty() || *s.rbegin() == separators.set_separator[0]) )
{
lvals[pos] = ParseValue("", name, subtype);
if ( lvals[pos] == nullptr )
case TYPE_PATTERN:
{
GetThread()->Warning("Error while trying to add empty set element");
string candidate = util::get_unescaped_string(s);
// A string is a candidate pattern iff it begins and ends with
// a '/'. Rather or not the rest of the string is legal will
// be determined later when it is given to the RE engine.
if ( candidate.size() >= 2 )
{
if ( candidate.front() == candidate.back() && candidate.back() == '/' )
{
// Remove the '/'s
candidate.erase(0, 1);
candidate.erase(candidate.size() - 1);
val->val.pattern_text_val = util::copy_string(candidate.c_str());
break;
}
}
GetThread()->Warning(GetThread()->Fmt("String '%s' contained no parseable pattern.",
candidate.c_str()));
goto parse_error;
}
pos++;
}
case TYPE_TABLE:
case TYPE_VECTOR:
// First - common initialization
// Then - initialization for table.
// Then - initialization for vector.
// Then - common stuff
{
// how many entries do we have...
unsigned int length = 1;
for ( const auto& c : s )
{
if ( c == separators.set_separator[0] )
length++;
}
if ( error ) {
// We had an error while reading a set or a vector.
// Hence we have to clean up the values that have
// been read so far
for ( unsigned int i = 0; i < pos; i++ )
delete lvals[i];
unsigned int pos = 0;
bool error = false;
// and set the length of the set to 0, otherwhise the destructor will crash.
val->val.vector_val.size = 0;
if ( separators.empty_field.size() > 0 && s.compare(separators.empty_field) == 0 )
length = 0;
if ( separators.empty_field.empty() && s.empty() )
length = 0;
Value** lvals = new Value*[length];
if ( type == TYPE_TABLE )
{
val->val.set_val.vals = lvals;
val->val.set_val.size = length;
}
else if ( type == TYPE_VECTOR )
{
val->val.vector_val.vals = lvals;
val->val.vector_val.size = length;
}
else
assert(false);
if ( length == 0 )
break; // empty
istringstream splitstream(s);
while ( splitstream )
{
string element;
if ( ! getline(splitstream, element, separators.set_separator[0]) )
break;
if ( pos >= length )
{
GetThread()->Warning(GetThread()->Fmt(
"Internal error while parsing set. pos %d >= length %d."
" Element: %s",
pos, length, element.c_str()));
error = true;
break;
}
Value* newval = ParseValue(element, name, subtype);
if ( newval == nullptr )
{
GetThread()->Warning("Error while reading set or vector");
error = true;
break;
}
lvals[pos] = newval;
pos++;
}
// Test if the string ends with a set_separator... or if the
// complete string is empty. In either of these cases we have
// to push an empty val on top of it.
if ( ! error && (s.empty() || *s.rbegin() == separators.set_separator[0]) )
{
lvals[pos] = ParseValue("", name, subtype);
if ( lvals[pos] == nullptr )
{
GetThread()->Warning("Error while trying to add empty set element");
goto parse_error;
}
pos++;
}
if ( error )
{
// We had an error while reading a set or a vector.
// Hence we have to clean up the values that have
// been read so far
for ( unsigned int i = 0; i < pos; i++ )
delete lvals[i];
// and set the length of the set to 0, otherwhise the destructor will crash.
val->val.vector_val.size = 0;
goto parse_error;
}
if ( pos != length )
{
GetThread()->Warning(GetThread()->Fmt(
"Internal error while parsing set: did not find all elements: %s", start));
goto parse_error;
}
break;
}
default:
GetThread()->Warning(
GetThread()->Fmt("unsupported field format %d for %s", type, name.c_str()));
goto parse_error;
}
if ( pos != length )
{
GetThread()->Warning(GetThread()->Fmt("Internal error while parsing set: did not find all elements: %s", start));
goto parse_error;
}
break;
}
default:
GetThread()->Warning(GetThread()->Fmt("unsupported field format %d for %s", type,
name.c_str()));
goto parse_error;
}
return val;
parse_error:
@ -479,10 +483,11 @@ bool Ascii::CheckNumberError(const char* start, const char* end, bool nonneg_onl
{
MsgThread* thread = GetThread();
if ( end == start && *end != '\0' ) {
if ( end == start && *end != '\0' )
{
thread->Warning(thread->Fmt("String '%s' contained no parseable number", start));
return true;
}
}
if ( end - start == 0 && *end == '\0' )
{
@ -491,19 +496,23 @@ bool Ascii::CheckNumberError(const char* start, const char* end, bool nonneg_onl
}
if ( (*end != '\0') )
thread->Warning(thread->Fmt("Number '%s' contained non-numeric trailing characters. Ignored trailing characters '%s'", start, end));
thread->Warning(thread->Fmt("Number '%s' contained non-numeric trailing characters. "
"Ignored trailing characters '%s'",
start, end));
if ( nonneg_only ) {
if ( nonneg_only )
{
// String may legitimately start with whitespace, so
// we skip this before checking for a minus sign.
const char* s = start;
while ( s < end && isspace(*s) )
s++;
if ( *s == '-' ) {
if ( *s == '-' )
{
thread->Warning(thread->Fmt("Number '%s' cannot be negative", start));
return true;
}
}
}
if ( errno == EINVAL )
{
@ -520,4 +529,4 @@ bool Ascii::CheckNumberError(const char* start, const char* end, bool nonneg_onl
return false;
}
} // namespace zeek::threading::formatter
} // namespace zeek::threading::formatter

View file

@ -4,9 +4,11 @@
#include "zeek/threading/Formatter.h"
namespace zeek::threading::formatter {
namespace zeek::threading::formatter
{
class Ascii final : public Formatter {
class Ascii final : public Formatter
{
public:
/**
* A struct to pass the necessary configuration values to the
@ -14,10 +16,10 @@ public:
*/
struct SeparatorInfo
{
std::string separator; // Separator between columns
std::string set_separator; // Separator between set elements.
std::string unset_field; // String marking an unset field.
std::string empty_field; // String marking an empty (but set) field.
std::string separator; // Separator between columns
std::string set_separator; // Separator between set elements.
std::string unset_field; // String marking an unset field.
std::string empty_field; // String marking an empty (but set) field.
/**
* Constructor that defines all the configuration options.
@ -48,15 +50,15 @@ public:
virtual ~Ascii();
virtual bool Describe(ODesc* desc, Value* val, const std::string& name = "") const;
virtual bool Describe(ODesc* desc, int num_fields, const Field* const * fields,
virtual bool Describe(ODesc* desc, int num_fields, const Field* const* fields,
Value** vals) const;
virtual Value* ParseValue(const std::string& s, const std::string& name,
TypeTag type, TypeTag subtype = TYPE_ERROR) const;
virtual Value* ParseValue(const std::string& s, const std::string& name, TypeTag type,
TypeTag subtype = TYPE_ERROR) const;
private:
bool CheckNumberError(const char* start, const char* end, bool nonneg_only = false) const;
SeparatorInfo separators;
};
};
} // namespace zeek::threading::formatter
} // namespace zeek::threading::formatter

View file

@ -1,23 +1,24 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "zeek/zeek-config.h"
#include "zeek/threading/formatters/JSON.h"
#include "zeek/zeek-config.h"
#ifndef __STDC_LIMIT_MACROS
#define __STDC_LIMIT_MACROS
#endif
#include <errno.h>
#include <math.h>
#include <rapidjson/internal/ieee754.h>
#include <stdint.h>
#include <sstream>
#include <rapidjson/internal/ieee754.h>
#include "zeek/Desc.h"
#include "zeek/threading/MsgThread.h"
namespace zeek::threading::formatter {
namespace zeek::threading::formatter
{
bool JSON::NullDoubleWriter::Double(double d)
{
@ -32,12 +33,9 @@ JSON::JSON(MsgThread* t, TimeFormat tf) : Formatter(t), surrounding_braces(true)
timestamps = tf;
}
JSON::~JSON()
{
}
JSON::~JSON() { }
bool JSON::Describe(ODesc* desc, int num_fields, const Field* const * fields,
Value** vals) const
bool JSON::Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals) const
{
rapidjson::StringBuffer buffer;
NullDoubleWriter writer(buffer);
@ -79,8 +77,8 @@ bool JSON::Describe(ODesc* desc, Value* val, const std::string& name) const
return true;
}
Value* JSON::ParseValue(const std::string& s, const std::string& name,
TypeTag type, TypeTag subtype) const
Value* JSON::ParseValue(const std::string& s, const std::string& name, TypeTag type,
TypeTag subtype) const
{
GetThread()->Error("JSON formatter does not support parsing yet.");
return nullptr;
@ -129,78 +127,80 @@ void JSON::BuildJSON(NullDoubleWriter& writer, Value* val, const std::string& na
break;
case TYPE_TIME:
{
if ( timestamps == TS_ISO8601 )
{
char buffer[40];
char buffer2[48];
time_t the_time = time_t(floor(val->val.double_val));
struct tm t;
if ( ! gmtime_r(&the_time, &t) ||
! strftime(buffer, sizeof(buffer), "%Y-%m-%dT%H:%M:%S", &t) )
if ( timestamps == TS_ISO8601 )
{
GetThread()->Error(GetThread()->Fmt("json formatter: failure getting time: (%lf)", val->val.double_val));
// This was a failure, doesn't really matter what gets put here
// but it should probably stand out...
writer.String("2000-01-01T00:00:00.000000");
char buffer[40];
char buffer2[48];
time_t the_time = time_t(floor(val->val.double_val));
struct tm t;
if ( ! gmtime_r(&the_time, &t) ||
! strftime(buffer, sizeof(buffer), "%Y-%m-%dT%H:%M:%S", &t) )
{
GetThread()->Error(GetThread()->Fmt(
"json formatter: failure getting time: (%lf)", val->val.double_val));
// This was a failure, doesn't really matter what gets put here
// but it should probably stand out...
writer.String("2000-01-01T00:00:00.000000");
}
else
{
double integ;
double frac = modf(val->val.double_val, &integ);
if ( frac < 0 )
frac += 1;
snprintf(buffer2, sizeof(buffer2), "%s.%06.0fZ", buffer,
fabs(frac) * 1000000);
writer.String(buffer2, strlen(buffer2));
}
}
else
else if ( timestamps == TS_EPOCH )
writer.Double(val->val.double_val);
else if ( timestamps == TS_MILLIS )
{
double integ;
double frac = modf(val->val.double_val, &integ);
if ( frac < 0 )
frac += 1;
snprintf(buffer2, sizeof(buffer2), "%s.%06.0fZ", buffer, fabs(frac) * 1000000);
writer.String(buffer2, strlen(buffer2));
// ElasticSearch uses milliseconds for timestamps
writer.Uint64((uint64_t)(val->val.double_val * 1000));
}
break;
}
else if ( timestamps == TS_EPOCH )
writer.Double(val->val.double_val);
else if ( timestamps == TS_MILLIS )
{
// ElasticSearch uses milliseconds for timestamps
writer.Uint64((uint64_t) (val->val.double_val * 1000));
}
break;
}
case TYPE_ENUM:
case TYPE_STRING:
case TYPE_FILE:
case TYPE_FUNC:
{
writer.String(util::json_escape_utf8(
std::string(val->val.string_val.data, val->val.string_val.length)));
break;
}
{
writer.String(util::json_escape_utf8(
std::string(val->val.string_val.data, val->val.string_val.length)));
break;
}
case TYPE_TABLE:
{
writer.StartArray();
{
writer.StartArray();
for ( bro_int_t idx = 0; idx < val->val.set_val.size; idx++ )
BuildJSON(writer, val->val.set_val.vals[idx]);
for ( bro_int_t idx = 0; idx < val->val.set_val.size; idx++ )
BuildJSON(writer, val->val.set_val.vals[idx]);
writer.EndArray();
break;
}
writer.EndArray();
break;
}
case TYPE_VECTOR:
{
writer.StartArray();
{
writer.StartArray();
for ( bro_int_t idx = 0; idx < val->val.vector_val.size; idx++ )
BuildJSON(writer, val->val.vector_val.vals[idx]);
for ( bro_int_t idx = 0; idx < val->val.vector_val.size; idx++ )
BuildJSON(writer, val->val.vector_val.vals[idx]);
writer.EndArray();
break;
}
writer.EndArray();
break;
}
default:
reporter->Warning("Unhandled type in JSON::BuildJSON");
@ -208,4 +208,4 @@ void JSON::BuildJSON(NullDoubleWriter& writer, Value* val, const std::string& na
}
}
} // namespace zeek::threading::formatter
} // namespace zeek::threading::formatter

View file

@ -8,40 +8,48 @@
#include "zeek/threading/Formatter.h"
namespace zeek::threading::formatter {
namespace zeek::threading::formatter
{
/**
* A thread-safe class for converting values into a JSON representation
* and vice versa.
*/
class JSON : public Formatter {
* A thread-safe class for converting values into a JSON representation
* and vice versa.
*/
class JSON : public Formatter
{
public:
enum TimeFormat {
TS_EPOCH, // Doubles that represents seconds from the UNIX epoch.
TS_ISO8601, // ISO 8601 defined human readable timestamp format.
TS_MILLIS // Milliseconds from the UNIX epoch. Some consumers need this (e.g., elasticsearch).
enum TimeFormat
{
TS_EPOCH, // Doubles that represents seconds from the UNIX epoch.
TS_ISO8601, // ISO 8601 defined human readable timestamp format.
TS_MILLIS // Milliseconds from the UNIX epoch. Some consumers need this (e.g.,
// elasticsearch).
};
JSON(MsgThread* t, TimeFormat tf);
~JSON() override;
bool Describe(ODesc* desc, Value* val, const std::string& name = "") const override;
bool Describe(ODesc* desc, int num_fields, const Field* const * fields,
bool Describe(ODesc* desc, int num_fields, const Field* const* fields,
Value** vals) const override;
Value* ParseValue(const std::string& s, const std::string& name, TypeTag type,
TypeTag subtype = TYPE_ERROR) const override;
class NullDoubleWriter : public rapidjson::Writer<rapidjson::StringBuffer> {
class NullDoubleWriter : public rapidjson::Writer<rapidjson::StringBuffer>
{
public:
NullDoubleWriter(rapidjson::StringBuffer& stream) : rapidjson::Writer<rapidjson::StringBuffer>(stream) {}
NullDoubleWriter(rapidjson::StringBuffer& stream)
: rapidjson::Writer<rapidjson::StringBuffer>(stream)
{
}
bool Double(double d);
};
};
private:
void BuildJSON(NullDoubleWriter& writer, Value* val, const std::string& name = "") const;
TimeFormat timestamps;
bool surrounding_braces;
};
};
} // namespace zeek::threading::formatter
} // namespace zeek::threading::formatter