mirror of
https://github.com/zeek/zeek.git
synced 2025-10-07 09:08:20 +00:00
Merging in DataSeries support from topic/gilbert/logging.
I copied the code over manually, no merging, because (1) it needed to be adapted to the new threading API, and (2) there's more stuff in the branch that I haven't ported yet. The DS output generally seems to work, but it has seen no further testing yet. Not unit tests yet either.
This commit is contained in:
parent
99e3c58494
commit
952b6b293a
18 changed files with 726 additions and 65 deletions
|
@ -107,6 +107,21 @@ if (GOOGLEPERFTOOLS_FOUND)
|
||||||
endif ()
|
endif ()
|
||||||
endif ()
|
endif ()
|
||||||
|
|
||||||
|
set(USE_DATASERIES false)
|
||||||
|
find_package(Lintel)
|
||||||
|
find_package(DataSeries)
|
||||||
|
find_package(LibXML2)
|
||||||
|
|
||||||
|
if (LINTEL_FOUND AND DATASERIES_FOUND AND LIBXML2_FOUND)
|
||||||
|
set(USE_DATASERIES true)
|
||||||
|
include_directories(BEFORE ${Lintel_INCLUDE_DIR})
|
||||||
|
include_directories(BEFORE ${DataSeries_INCLUDE_DIR})
|
||||||
|
include_directories(BEFORE ${LibXML2_INCLUDE_DIR})
|
||||||
|
list(APPEND OPTLIBS ${Lintel_LIBRARIES})
|
||||||
|
list(APPEND OPTLIBS ${DataSeries_LIBRARIES})
|
||||||
|
list(APPEND OPTLIBS ${LibXML2_LIBRARIES})
|
||||||
|
endif()
|
||||||
|
|
||||||
set(brodeps
|
set(brodeps
|
||||||
${BinPAC_LIBRARY}
|
${BinPAC_LIBRARY}
|
||||||
${PCAP_LIBRARY}
|
${PCAP_LIBRARY}
|
||||||
|
@ -193,6 +208,7 @@ message(
|
||||||
"\nGeoIP: ${USE_GEOIP}"
|
"\nGeoIP: ${USE_GEOIP}"
|
||||||
"\nGoogle perftools: ${USE_PERFTOOLS}"
|
"\nGoogle perftools: ${USE_PERFTOOLS}"
|
||||||
"\n debugging: ${USE_PERFTOOLS_DEBUG}"
|
"\n debugging: ${USE_PERFTOOLS_DEBUG}"
|
||||||
|
"\nDataSeries: ${USE_DATASERIES}"
|
||||||
"\n"
|
"\n"
|
||||||
"\n================================================================\n"
|
"\n================================================================\n"
|
||||||
)
|
)
|
||||||
|
|
2
cmake
2
cmake
|
@ -1 +1 @@
|
||||||
Subproject commit 550ab2c8d95b1d3e18e40a903152650e6c7a3c45
|
Subproject commit 60b28739379da75f26c5c2a312b7886f5209a1cc
|
|
@ -111,6 +111,9 @@
|
||||||
/* Use Google's perftools */
|
/* Use Google's perftools */
|
||||||
#cmakedefine USE_PERFTOOLS
|
#cmakedefine USE_PERFTOOLS
|
||||||
|
|
||||||
|
/* Use the DataSeries writer. */
|
||||||
|
#cmakedefine USE_DATASERIES
|
||||||
|
|
||||||
/* Version number of package */
|
/* Version number of package */
|
||||||
#define VERSION "@VERSION@"
|
#define VERSION "@VERSION@"
|
||||||
|
|
||||||
|
|
9
configure
vendored
9
configure
vendored
|
@ -54,6 +54,8 @@ Usage: $0 [OPTION]... [VAR=VALUE]...
|
||||||
--with-ruby-lib=PATH path to ruby library
|
--with-ruby-lib=PATH path to ruby library
|
||||||
--with-ruby-inc=PATH path to ruby headers
|
--with-ruby-inc=PATH path to ruby headers
|
||||||
--with-swig=PATH path to SWIG executable
|
--with-swig=PATH path to SWIG executable
|
||||||
|
--with-dataseries=PATH path to DataSeries and Lintel libraries
|
||||||
|
--with-xml2=PATH path to libxml2 installation (for DataSeries)
|
||||||
|
|
||||||
Packaging Options (for developers):
|
Packaging Options (for developers):
|
||||||
--binary-package toggle special logic for binary packaging
|
--binary-package toggle special logic for binary packaging
|
||||||
|
@ -203,6 +205,13 @@ while [ $# -ne 0 ]; do
|
||||||
--with-swig=*)
|
--with-swig=*)
|
||||||
append_cache_entry SWIG_EXECUTABLE PATH $optarg
|
append_cache_entry SWIG_EXECUTABLE PATH $optarg
|
||||||
;;
|
;;
|
||||||
|
--with-dataseries=*)
|
||||||
|
append_cache_entry DataSeries_ROOT_DIR PATH $optarg
|
||||||
|
append_cache_entry Lintel_ROOT_DIR PATH $optarg
|
||||||
|
;;
|
||||||
|
--with-xml2=*)
|
||||||
|
append_cache_entry LibXML2_ROOT_DIR PATH $optarg
|
||||||
|
;;
|
||||||
--binary-package)
|
--binary-package)
|
||||||
append_cache_entry BINARY_PACKAGING_MODE BOOL true
|
append_cache_entry BINARY_PACKAGING_MODE BOOL true
|
||||||
;;
|
;;
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
@load ./main
|
@load ./main
|
||||||
@load ./postprocessors
|
@load ./postprocessors
|
||||||
@load ./writers/ascii
|
@load ./writers/ascii
|
||||||
|
@load ./writers/dataseries
|
||||||
|
|
62
scripts/base/frameworks/logging/writers/dataseries.bro
Normal file
62
scripts/base/frameworks/logging/writers/dataseries.bro
Normal file
|
@ -0,0 +1,62 @@
|
||||||
|
##! Interface for the dataseries log writer.
|
||||||
|
|
||||||
|
module LogDataSeries;
|
||||||
|
|
||||||
|
export {
|
||||||
|
## Compression to use with the DS output file. Options are:
|
||||||
|
##
|
||||||
|
## 'none' -- No compression.
|
||||||
|
## 'lzf' -- LZF compression. Very quick, but leads to larger output files.
|
||||||
|
## 'lzo' -- LZO compression. Very fast decompression times.
|
||||||
|
## 'gz' -- GZIP compression. Slower than LZF, but also produces smaller output.
|
||||||
|
## 'bz2' -- BZIP2 compression. Slower than GZIP, but also produces smaller output.
|
||||||
|
const ds_compression = "lzf" &redef;
|
||||||
|
|
||||||
|
## The extent buffer size.
|
||||||
|
## Larger values here lead to better compression and more efficient writes, but
|
||||||
|
## also increases the lag between the time events are received and the time they
|
||||||
|
## are actually written to disk.
|
||||||
|
const ds_extent_size = 65536 &redef;
|
||||||
|
|
||||||
|
## Should we dump the XML schema we use for this ds file to disk?
|
||||||
|
## If yes, the XML schema shares the name of the logfile, but has
|
||||||
|
## an XML ending.
|
||||||
|
const ds_dump_schema = T &redef;
|
||||||
|
|
||||||
|
## How many threads should DataSeries spawn to perform compression?
|
||||||
|
## Note that this dictates the number of threads per log stream. If
|
||||||
|
## you're using a lot of streams, you may want to keep this number
|
||||||
|
## relatively small.
|
||||||
|
##
|
||||||
|
## Default value is 1, which will spawn one thread / core / stream.
|
||||||
|
##
|
||||||
|
## MAX is 128, MIN is 1.
|
||||||
|
const ds_num_threads = 1 &redef;
|
||||||
|
|
||||||
|
## Should time be stored as an integer or a double?
|
||||||
|
## Storing time as a double leads to possible precision issues and
|
||||||
|
## could (significantly) increase the size of the resulting DS log.
|
||||||
|
## That said, timestamps stored in double form are more consistent
|
||||||
|
## with the rest of Bro and are more easily readable / understandable
|
||||||
|
## when working with the raw DataSeries format.
|
||||||
|
##
|
||||||
|
## Double timestamps are used by default.
|
||||||
|
const ds_use_integer = F &redef;
|
||||||
|
}
|
||||||
|
|
||||||
|
# Default function to postprocess a rotated DataSeries log file. It moves the
|
||||||
|
# rotated file to a new name that includes a timestamp with the opening time, and
|
||||||
|
# then runs the writer's default postprocessor command on it.
|
||||||
|
function default_rotation_postprocessor_func(info: Log::RotationInfo) : bool
|
||||||
|
{
|
||||||
|
# Move file to name including both opening and closing time.
|
||||||
|
local dst = fmt("%s.%s.ds", info$path,
|
||||||
|
strftime(Log::default_rotation_date_format, info$open));
|
||||||
|
|
||||||
|
system(fmt("/bin/mv %s %s", info$fname, dst));
|
||||||
|
|
||||||
|
# Run default postprocessor.
|
||||||
|
return Log::run_rotation_postprocessor_cmd(info, dst);
|
||||||
|
}
|
||||||
|
|
||||||
|
redef Log::default_rotation_postprocessors += { [Log::WRITER_DATASERIES] = default_rotation_postprocessor_func };
|
|
@ -419,6 +419,7 @@ set(bro_SRCS
|
||||||
logging/WriterBackend.cc
|
logging/WriterBackend.cc
|
||||||
logging/WriterFrontend.cc
|
logging/WriterFrontend.cc
|
||||||
logging/writers/Ascii.cc
|
logging/writers/Ascii.cc
|
||||||
|
logging/writers/DataSeries.cc
|
||||||
logging/writers/None.cc
|
logging/writers/None.cc
|
||||||
|
|
||||||
${dns_SRCS}
|
${dns_SRCS}
|
||||||
|
|
|
@ -72,3 +72,11 @@ const set_separator: string;
|
||||||
const empty_field: string;
|
const empty_field: string;
|
||||||
const unset_field: string;
|
const unset_field: string;
|
||||||
|
|
||||||
|
# Options for the DataSeries writer.
|
||||||
|
|
||||||
|
module LogDataSeries;
|
||||||
|
|
||||||
|
const ds_compression: string;
|
||||||
|
const ds_extent_size: count;
|
||||||
|
const ds_dump_schema: bool;
|
||||||
|
const ds_num_threads: count;
|
||||||
|
|
|
@ -16,9 +16,11 @@
|
||||||
#include "writers/Ascii.h"
|
#include "writers/Ascii.h"
|
||||||
#include "writers/None.h"
|
#include "writers/None.h"
|
||||||
|
|
||||||
|
#ifdef USE_DATASERIES
|
||||||
|
#include "writers/DataSeries.h"
|
||||||
|
#endif
|
||||||
|
|
||||||
using namespace logging;
|
using namespace logging;
|
||||||
using threading::Value;
|
|
||||||
using threading::Field;
|
|
||||||
|
|
||||||
// Structure describing a log writer type.
|
// Structure describing a log writer type.
|
||||||
struct WriterDefinition {
|
struct WriterDefinition {
|
||||||
|
@ -32,6 +34,9 @@ struct WriterDefinition {
|
||||||
WriterDefinition log_writers[] = {
|
WriterDefinition log_writers[] = {
|
||||||
{ BifEnum::Log::WRITER_NONE, "None", 0, writer::None::Instantiate },
|
{ BifEnum::Log::WRITER_NONE, "None", 0, writer::None::Instantiate },
|
||||||
{ BifEnum::Log::WRITER_ASCII, "Ascii", 0, writer::Ascii::Instantiate },
|
{ BifEnum::Log::WRITER_ASCII, "Ascii", 0, writer::Ascii::Instantiate },
|
||||||
|
#ifdef USE_DATASERIES
|
||||||
|
{ BifEnum::Log::WRITER_DATASERIES, "DataSeries", 0, writer::DataSeries::Instantiate },
|
||||||
|
#endif
|
||||||
|
|
||||||
// End marker, don't touch.
|
// End marker, don't touch.
|
||||||
{ BifEnum::Log::WRITER_DEFAULT, "None", 0, (WriterBackend* (*)(WriterFrontend* frontend))0 }
|
{ BifEnum::Log::WRITER_DEFAULT, "None", 0, (WriterBackend* (*)(WriterFrontend* frontend))0 }
|
||||||
|
@ -51,7 +56,7 @@ struct Manager::Filter {
|
||||||
Func* postprocessor;
|
Func* postprocessor;
|
||||||
|
|
||||||
int num_fields;
|
int num_fields;
|
||||||
Field** fields;
|
threading::Field** fields;
|
||||||
|
|
||||||
// Vector indexed by field number. Each element is a list of record
|
// Vector indexed by field number. Each element is a list of record
|
||||||
// indices defining a path leading to the value across potential
|
// indices defining a path leading to the value across potential
|
||||||
|
@ -127,6 +132,17 @@ Manager::~Manager()
|
||||||
delete *s;
|
delete *s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
list<string> Manager::SupportedFormats()
|
||||||
|
{
|
||||||
|
list<string> formats;
|
||||||
|
|
||||||
|
for ( WriterDefinition* ld = log_writers; ld->type != BifEnum::Log::WRITER_DEFAULT; ++ld )
|
||||||
|
formats.push_back(ld->name);
|
||||||
|
|
||||||
|
return formats;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
WriterBackend* Manager::CreateBackend(WriterFrontend* frontend, bro_int_t type)
|
WriterBackend* Manager::CreateBackend(WriterFrontend* frontend, bro_int_t type)
|
||||||
{
|
{
|
||||||
WriterDefinition* ld = log_writers;
|
WriterDefinition* ld = log_writers;
|
||||||
|
@ -135,7 +151,7 @@ WriterBackend* Manager::CreateBackend(WriterFrontend* frontend, bro_int_t type)
|
||||||
{
|
{
|
||||||
if ( ld->type == BifEnum::Log::WRITER_DEFAULT )
|
if ( ld->type == BifEnum::Log::WRITER_DEFAULT )
|
||||||
{
|
{
|
||||||
reporter->Error("unknow writer when creating writer");
|
reporter->Error("unknown writer type requested");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -159,10 +175,8 @@ WriterBackend* Manager::CreateBackend(WriterFrontend* frontend, bro_int_t type)
|
||||||
// function.
|
// function.
|
||||||
ld->factory = 0;
|
ld->factory = 0;
|
||||||
|
|
||||||
DBG_LOG(DBG_LOGGING, "failed to init writer class %s",
|
reporter->Error("initialization of writer %s failed", ld->name);
|
||||||
ld->name);
|
return 0;
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -449,7 +463,7 @@ bool Manager::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt,
|
||||||
|
|
||||||
filter->indices.push_back(new_indices);
|
filter->indices.push_back(new_indices);
|
||||||
|
|
||||||
filter->fields = (Field**)
|
filter->fields = (threading::Field**)
|
||||||
realloc(filter->fields,
|
realloc(filter->fields,
|
||||||
sizeof(Field) * ++filter->num_fields);
|
sizeof(Field) * ++filter->num_fields);
|
||||||
|
|
||||||
|
@ -459,7 +473,7 @@ bool Manager::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt,
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
Field* field = new Field();
|
threading::Field* field = new threading::Field();
|
||||||
field->name = new_path;
|
field->name = new_path;
|
||||||
field->type = t->Tag();
|
field->type = t->Tag();
|
||||||
if ( field->type == TYPE_TABLE )
|
if ( field->type == TYPE_TABLE )
|
||||||
|
@ -572,7 +586,7 @@ bool Manager::AddFilter(EnumVal* id, RecordVal* fval)
|
||||||
|
|
||||||
for ( int i = 0; i < filter->num_fields; i++ )
|
for ( int i = 0; i < filter->num_fields; i++ )
|
||||||
{
|
{
|
||||||
Field* field = filter->fields[i];
|
threading::Field* field = filter->fields[i];
|
||||||
DBG_LOG(DBG_LOGGING, " field %10s: %s",
|
DBG_LOG(DBG_LOGGING, " field %10s: %s",
|
||||||
field->name.c_str(), type_name(field->type));
|
field->name.c_str(), type_name(field->type));
|
||||||
}
|
}
|
||||||
|
@ -744,10 +758,10 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
|
||||||
|
|
||||||
// Copy the fields for WriterFrontend::Init() as it
|
// Copy the fields for WriterFrontend::Init() as it
|
||||||
// will take ownership.
|
// will take ownership.
|
||||||
Field** arg_fields = new Field*[filter->num_fields];
|
threading::Field** arg_fields = new threading::Field*[filter->num_fields];
|
||||||
|
|
||||||
for ( int j = 0; j < filter->num_fields; ++j )
|
for ( int j = 0; j < filter->num_fields; ++j )
|
||||||
arg_fields[j] = new Field(*filter->fields[j]);
|
arg_fields[j] = new threading::Field(*filter->fields[j]);
|
||||||
|
|
||||||
writer = CreateWriter(stream->id, filter->writer,
|
writer = CreateWriter(stream->id, filter->writer,
|
||||||
path, filter->num_fields,
|
path, filter->num_fields,
|
||||||
|
@ -898,10 +912,10 @@ threading::Value* Manager::ValToLogVal(Val* val, BroType* ty)
|
||||||
return lval;
|
return lval;
|
||||||
}
|
}
|
||||||
|
|
||||||
Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
|
threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
|
||||||
RecordVal* columns)
|
RecordVal* columns)
|
||||||
{
|
{
|
||||||
Value** vals = new Value*[filter->num_fields];
|
threading::Value** vals = new threading::Value*[filter->num_fields];
|
||||||
|
|
||||||
for ( int i = 0; i < filter->num_fields; ++i )
|
for ( int i = 0; i < filter->num_fields; ++i )
|
||||||
{
|
{
|
||||||
|
@ -920,7 +934,7 @@ Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
|
||||||
if ( ! val )
|
if ( ! val )
|
||||||
{
|
{
|
||||||
// Value, or any of its parents, is not set.
|
// Value, or any of its parents, is not set.
|
||||||
vals[i] = new Value(filter->fields[i]->type, false);
|
vals[i] = new threading::Value(filter->fields[i]->type, false);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -933,7 +947,7 @@ Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
|
||||||
}
|
}
|
||||||
|
|
||||||
WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
|
WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
|
||||||
int num_fields, const Field* const* fields, bool local, bool remote)
|
int num_fields, const threading::Field* const* fields, bool local, bool remote)
|
||||||
{
|
{
|
||||||
Stream* stream = FindStream(id);
|
Stream* stream = FindStream(id);
|
||||||
|
|
||||||
|
@ -997,7 +1011,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
|
||||||
return writer_obj;
|
return writer_obj;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Manager::DeleteVals(int num_fields, Value** vals)
|
void Manager::DeleteVals(int num_fields, threading::Value** vals)
|
||||||
{
|
{
|
||||||
// Note this code is duplicated in WriterBackend::DeleteVals().
|
// Note this code is duplicated in WriterBackend::DeleteVals().
|
||||||
for ( int i = 0; i < num_fields; i++ )
|
for ( int i = 0; i < num_fields; i++ )
|
||||||
|
@ -1007,7 +1021,7 @@ void Manager::DeleteVals(int num_fields, Value** vals)
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Manager::Write(EnumVal* id, EnumVal* writer, string path, int num_fields,
|
bool Manager::Write(EnumVal* id, EnumVal* writer, string path, int num_fields,
|
||||||
Value** vals)
|
threading::Value** vals)
|
||||||
{
|
{
|
||||||
Stream* stream = FindStream(id);
|
Stream* stream = FindStream(id);
|
||||||
|
|
||||||
|
@ -1116,8 +1130,10 @@ void Manager::Terminate()
|
||||||
{
|
{
|
||||||
for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s )
|
for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s )
|
||||||
{
|
{
|
||||||
if ( *s )
|
if ( ! *s )
|
||||||
Flush((*s)->id);
|
continue;
|
||||||
|
|
||||||
|
Flush((*s)->id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,6 @@ class RotationTimer;
|
||||||
|
|
||||||
namespace logging {
|
namespace logging {
|
||||||
|
|
||||||
|
|
||||||
class WriterBackend;
|
class WriterBackend;
|
||||||
class WriterFrontend;
|
class WriterFrontend;
|
||||||
class RotationFinishedMessage;
|
class RotationFinishedMessage;
|
||||||
|
@ -56,7 +55,7 @@ public:
|
||||||
* logging.bif, which just forwards here.
|
* logging.bif, which just forwards here.
|
||||||
*/
|
*/
|
||||||
bool EnableStream(EnumVal* id);
|
bool EnableStream(EnumVal* id);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Disables a log stream.
|
* Disables a log stream.
|
||||||
*
|
*
|
||||||
|
@ -145,6 +144,11 @@ public:
|
||||||
*/
|
*/
|
||||||
void Terminate();
|
void Terminate();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a list of supported output formats.
|
||||||
|
*/
|
||||||
|
static list<string> SupportedFormats();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
friend class WriterFrontend;
|
friend class WriterFrontend;
|
||||||
friend class RotationFinishedMessage;
|
friend class RotationFinishedMessage;
|
||||||
|
|
|
@ -222,17 +222,6 @@ bool WriterBackend::Flush()
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool WriterBackend::Finish()
|
|
||||||
{
|
|
||||||
if ( ! DoFlush() )
|
|
||||||
{
|
|
||||||
DisableFrontend();
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool WriterBackend::DoHeartbeat(double network_time, double current_time)
|
bool WriterBackend::DoHeartbeat(double network_time, double current_time)
|
||||||
{
|
{
|
||||||
MsgThread::DoHeartbeat(network_time, current_time);
|
MsgThread::DoHeartbeat(network_time, current_time);
|
||||||
|
|
|
@ -101,15 +101,6 @@ public:
|
||||||
*/
|
*/
|
||||||
bool Rotate(string rotated_path, double open, double close, bool terminating);
|
bool Rotate(string rotated_path, double open, double close, bool terminating);
|
||||||
|
|
||||||
/**
|
|
||||||
* Finishes writing to this logger in a regularl fashion. Must not be
|
|
||||||
* called if an error has been indicated earlier. After calling this,
|
|
||||||
* no further writing must be performed.
|
|
||||||
*
|
|
||||||
* @return False if an error occured.
|
|
||||||
*/
|
|
||||||
bool Finish();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Disables the frontend that has instantiated this backend. Once
|
* Disables the frontend that has instantiated this backend. Once
|
||||||
* disabled,the frontend will not send any further message over.
|
* disabled,the frontend will not send any further message over.
|
||||||
|
@ -175,6 +166,8 @@ public:
|
||||||
string Render(const threading::Value::subnet_t& subnet) const;
|
string Render(const threading::Value::subnet_t& subnet) const;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
friend class FinishMessage;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writer-specific intialization method.
|
* Writer-specific intialization method.
|
||||||
*
|
*
|
||||||
|
@ -272,26 +265,18 @@ protected:
|
||||||
bool terminating) = 0;
|
bool terminating) = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writer-specific method implementing log output finalization at
|
* Writer-specific method called just before the threading system is
|
||||||
* termination. Not called when any of the other methods has
|
* going to shutdown.
|
||||||
* previously signaled an error, i.e., executing this method signals
|
*
|
||||||
* a regular shutdown of the writer.
|
* This method can be overridden but one must call
|
||||||
*
|
* WriterBackend::DoFinish().
|
||||||
* A writer implementation must override this method but it can just
|
|
||||||
* ignore calls if flushing doesn't align with its semantics.
|
|
||||||
*
|
|
||||||
* If the method returns false, it will be assumed that a fatal error
|
|
||||||
* has occured that prevents the writer from further operation; it
|
|
||||||
* will then be disabled and eventually deleted. When returning
|
|
||||||
* false, an implementation should also call Error() to indicate what
|
|
||||||
* happened.
|
|
||||||
*/
|
*/
|
||||||
virtual bool DoFinish() = 0;
|
virtual bool DoFinish() { return MsgThread::DoFinish(); }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Triggered by regular heartbeat messages from the main thread.
|
* Triggered by regular heartbeat messages from the main thread.
|
||||||
*
|
*
|
||||||
* This method can be overridden but once must call
|
* This method can be overridden but one must call
|
||||||
* WriterBackend::DoHeartbeat().
|
* WriterBackend::DoHeartbeat().
|
||||||
*/
|
*/
|
||||||
virtual bool DoHeartbeat(double network_time, double current_time);
|
virtual bool DoHeartbeat(double network_time, double current_time);
|
||||||
|
|
|
@ -90,7 +90,7 @@ public:
|
||||||
FinishMessage(WriterBackend* backend)
|
FinishMessage(WriterBackend* backend)
|
||||||
: threading::InputMessage<WriterBackend>("Finish", backend) {}
|
: threading::InputMessage<WriterBackend>("Finish", backend) {}
|
||||||
|
|
||||||
virtual bool Process() { return Object()->Finish(); }
|
virtual bool Process() { return Object()->DoFinish(); }
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -117,8 +117,9 @@ WriterFrontend::WriterFrontend(EnumVal* arg_stream, EnumVal* arg_writer, bool ar
|
||||||
if ( local )
|
if ( local )
|
||||||
{
|
{
|
||||||
backend = log_mgr->CreateBackend(this, writer->AsEnum());
|
backend = log_mgr->CreateBackend(this, writer->AsEnum());
|
||||||
assert(backend);
|
|
||||||
backend->Start();
|
if ( backend )
|
||||||
|
backend->Start();
|
||||||
}
|
}
|
||||||
|
|
||||||
else
|
else
|
||||||
|
|
|
@ -69,8 +69,7 @@ bool Ascii::WriteHeaderField(const string& key, const string& val)
|
||||||
return (fwrite(str.c_str(), str.length(), 1, file) == 1);
|
return (fwrite(str.c_str(), str.length(), 1, file) == 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Ascii::DoInit(string path, int num_fields,
|
bool Ascii::DoInit(string path, int num_fields, const Field* const * fields)
|
||||||
const Field* const * fields)
|
|
||||||
{
|
{
|
||||||
if ( output_to_stdout )
|
if ( output_to_stdout )
|
||||||
path = "/dev/stdout";
|
path = "/dev/stdout";
|
||||||
|
@ -146,7 +145,7 @@ bool Ascii::DoFlush()
|
||||||
|
|
||||||
bool Ascii::DoFinish()
|
bool Ascii::DoFinish()
|
||||||
{
|
{
|
||||||
return true;
|
return WriterBackend::DoFinish();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Ascii::DoWriteOne(ODesc* desc, Value* val, const Field* field)
|
bool Ascii::DoWriteOne(ODesc* desc, Value* val, const Field* field)
|
||||||
|
|
476
src/logging/writers/DataSeries.cc
Normal file
476
src/logging/writers/DataSeries.cc
Normal file
|
@ -0,0 +1,476 @@
|
||||||
|
// See the file "COPYING" in the main distribution directory for copyright.
|
||||||
|
|
||||||
|
#include <map>
|
||||||
|
#include <string>
|
||||||
|
#include <errno.h>
|
||||||
|
|
||||||
|
#include <DataSeries/GeneralField.hpp>
|
||||||
|
|
||||||
|
#include "NetVar.h"
|
||||||
|
#include "threading/SerialTypes.h"
|
||||||
|
|
||||||
|
#include "DataSeries.h"
|
||||||
|
|
||||||
|
using namespace logging;
|
||||||
|
using namespace writer;
|
||||||
|
|
||||||
|
// NOTE: Naming conventions are a little bit scattershot at the moment.
|
||||||
|
// Within the scope of this file, a function name prefixed by '_' denotes a
|
||||||
|
// static function.
|
||||||
|
|
||||||
|
// ************************ LOCAL PROTOTYPES *********************************
|
||||||
|
|
||||||
|
struct SchemaValue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Turns a log value into a std::string. Uses an ostringstream to do the
|
||||||
|
* heavy lifting, but still need to switch on the type to know which value
|
||||||
|
* in the union to give to the string string for processing.
|
||||||
|
*
|
||||||
|
* @param val The value we wish to convert to a string
|
||||||
|
* @return the string value of val
|
||||||
|
*/
|
||||||
|
static std::string _LogValueToString(threading::Value* val);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Takes a field type and converts it to a relevant DataSeries type.
|
||||||
|
*
|
||||||
|
* @param field We extract the type from this and convert it into a relevant DS type.
|
||||||
|
* @return String representation of type that DataSeries can understand.
|
||||||
|
*/
|
||||||
|
static string _GetDSFieldType(const threading::Field* field);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Takes a field type and converts it to a readable string.
|
||||||
|
*
|
||||||
|
* @param field We extract the type from this and convert it into a readable string.
|
||||||
|
* @return String representation of the field's type
|
||||||
|
*/
|
||||||
|
static string _GetBroTypeString(const threading::Field *field);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Takes a list of types, a list of names, and a title, and uses it to construct a valid DataSeries XML schema
|
||||||
|
* thing, which is then returned as a std::string
|
||||||
|
*
|
||||||
|
* @param opts std::vector of strings containing a list of options to be appended to each field (e.g. "pack_relative=yes")
|
||||||
|
* @param sTitle Name of this schema. Ideally, these schemas would be aggregated and re-used.
|
||||||
|
*/
|
||||||
|
static string _BuildDSSchemaFromFieldTypes(const vector<SchemaValue>& vals, string sTitle);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Are there any options we should put into the XML schema?
|
||||||
|
*
|
||||||
|
* @param field We extract the type from this and return any options that make sense for that type.
|
||||||
|
* @return Options that can be added directly to the XML (e.g. "pack_relative=\"yes\"")
|
||||||
|
*/
|
||||||
|
static std::string _GetDSOptionsForType(const threading::Field *field);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Internal helper structure; populate a vector of these which is passed to the XML generator for its use.
|
||||||
|
*/
|
||||||
|
struct SchemaValue
|
||||||
|
{
|
||||||
|
string ds_type;
|
||||||
|
string bro_type;
|
||||||
|
string field_name;
|
||||||
|
string field_options;
|
||||||
|
|
||||||
|
SchemaValue(const threading::Field *field)
|
||||||
|
{
|
||||||
|
ds_type = _GetDSFieldType(field);
|
||||||
|
field_name = string(field->name);
|
||||||
|
field_options = _GetDSOptionsForType(field);
|
||||||
|
bro_type = _GetBroTypeString(field);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// ************************ LOCAL IMPL *********************************
|
||||||
|
|
||||||
|
std::string DataSeries::LogValueToString(threading::Value *val)
|
||||||
|
{
|
||||||
|
const int strsz = 1024;
|
||||||
|
char strbuf[strsz];
|
||||||
|
|
||||||
|
// In some cases, no value is attached. If this is the case, return an empty string.
|
||||||
|
if(!val->present)
|
||||||
|
return "";
|
||||||
|
|
||||||
|
std::ostringstream ostr;
|
||||||
|
switch(val->type)
|
||||||
|
{
|
||||||
|
case TYPE_BOOL:
|
||||||
|
return (val->val.int_val ? "true" : "false");
|
||||||
|
|
||||||
|
case TYPE_INT:
|
||||||
|
ostr << val->val.int_val;
|
||||||
|
return ostr.str();
|
||||||
|
|
||||||
|
case TYPE_COUNT:
|
||||||
|
case TYPE_COUNTER:
|
||||||
|
case TYPE_PORT:
|
||||||
|
ostr << val->val.uint_val;
|
||||||
|
return ostr.str();
|
||||||
|
|
||||||
|
case TYPE_SUBNET:
|
||||||
|
ostr << Render(val->val.subnet_val);
|
||||||
|
return ostr.str();
|
||||||
|
|
||||||
|
case TYPE_ADDR:
|
||||||
|
ostr << Render(val->val.addr_val);
|
||||||
|
return ostr.str();
|
||||||
|
|
||||||
|
// Note: These two cases are relatively special. We need to convert these values into their integer equivalents
|
||||||
|
// to maximize precision. At the moment, there won't be a noticeable effect (Bro uses the double format everywhere
|
||||||
|
// internally, so we've already lost the precision we'd gain here), but timestamps may eventually switch to this
|
||||||
|
// representation within Bro.
|
||||||
|
//
|
||||||
|
// in the near-term, this *should* lead to better pack_relative (and thus smaller output files).
|
||||||
|
case TYPE_TIME:
|
||||||
|
case TYPE_INTERVAL:
|
||||||
|
ostr << (unsigned long)(DataSeries::TIME_SCALE * val->val.double_val);
|
||||||
|
return ostr.str();
|
||||||
|
|
||||||
|
case TYPE_DOUBLE:
|
||||||
|
ostr << val->val.double_val;
|
||||||
|
return ostr.str();
|
||||||
|
|
||||||
|
case TYPE_ENUM:
|
||||||
|
case TYPE_STRING:
|
||||||
|
case TYPE_FILE:
|
||||||
|
{
|
||||||
|
int size = val->val.string_val->size();
|
||||||
|
string tmpString = "";
|
||||||
|
if(size)
|
||||||
|
tmpString = string(val->val.string_val->data(), val->val.string_val->size());
|
||||||
|
else
|
||||||
|
tmpString = string("");
|
||||||
|
return tmpString;
|
||||||
|
}
|
||||||
|
case TYPE_TABLE:
|
||||||
|
{
|
||||||
|
if ( ! val->val.set_val.size )
|
||||||
|
{
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
|
string tmpString = "";
|
||||||
|
for ( int j = 0; j < val->val.set_val.size; j++ )
|
||||||
|
{
|
||||||
|
if ( j > 0 )
|
||||||
|
tmpString += ":"; //TODO: Specify set separator char in configuration.
|
||||||
|
|
||||||
|
tmpString += LogValueToString(val->val.set_val.vals[j]);
|
||||||
|
}
|
||||||
|
return tmpString;
|
||||||
|
}
|
||||||
|
case TYPE_VECTOR:
|
||||||
|
{
|
||||||
|
if ( ! val->val.vector_val.size )
|
||||||
|
{
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
|
string tmpString = "";
|
||||||
|
for ( int j = 0; j < val->val.vector_val.size; j++ )
|
||||||
|
{
|
||||||
|
if ( j > 0 )
|
||||||
|
tmpString += ":"; //TODO: Specify set separator char in configuration.
|
||||||
|
|
||||||
|
tmpString += LogValueToString(val->val.vector_val.vals[j]);
|
||||||
|
}
|
||||||
|
|
||||||
|
return tmpString;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return "???";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static string _GetDSFieldType(const threading::Field *field)
|
||||||
|
{
|
||||||
|
switch(field->type)
|
||||||
|
{
|
||||||
|
case TYPE_BOOL:
|
||||||
|
return "bool";
|
||||||
|
|
||||||
|
case TYPE_COUNT:
|
||||||
|
case TYPE_COUNTER:
|
||||||
|
case TYPE_PORT:
|
||||||
|
case TYPE_INT:
|
||||||
|
case TYPE_TIME:
|
||||||
|
case TYPE_INTERVAL:
|
||||||
|
return "int64";
|
||||||
|
|
||||||
|
case TYPE_DOUBLE:
|
||||||
|
return "double";
|
||||||
|
|
||||||
|
case TYPE_SUBNET:
|
||||||
|
case TYPE_ADDR:
|
||||||
|
case TYPE_ENUM:
|
||||||
|
case TYPE_STRING:
|
||||||
|
case TYPE_FILE:
|
||||||
|
case TYPE_TABLE:
|
||||||
|
case TYPE_VECTOR:
|
||||||
|
default:
|
||||||
|
return "variable32";
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static string _GetBroTypeString(const threading::Field *field)
|
||||||
|
{
|
||||||
|
switch(field->type)
|
||||||
|
{
|
||||||
|
case TYPE_BOOL:
|
||||||
|
return "bool";
|
||||||
|
case TYPE_COUNT:
|
||||||
|
return "count";
|
||||||
|
case TYPE_COUNTER:
|
||||||
|
return "counter";
|
||||||
|
case TYPE_PORT:
|
||||||
|
return "port";
|
||||||
|
case TYPE_INT:
|
||||||
|
return "int";
|
||||||
|
case TYPE_TIME:
|
||||||
|
return "time";
|
||||||
|
case TYPE_INTERVAL:
|
||||||
|
return "interval";
|
||||||
|
case TYPE_DOUBLE:
|
||||||
|
return "double";
|
||||||
|
case TYPE_SUBNET:
|
||||||
|
return "subnet";
|
||||||
|
case TYPE_ADDR:
|
||||||
|
return "addr";
|
||||||
|
case TYPE_ENUM:
|
||||||
|
return "enum";
|
||||||
|
case TYPE_STRING:
|
||||||
|
return "string";
|
||||||
|
case TYPE_FILE:
|
||||||
|
return "file";
|
||||||
|
case TYPE_TABLE:
|
||||||
|
return "table";
|
||||||
|
case TYPE_VECTOR:
|
||||||
|
return "vector";
|
||||||
|
default:
|
||||||
|
return "???";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static string _BuildDSSchemaFromFieldTypes(const vector<SchemaValue>& vals, string sTitle)
|
||||||
|
{
|
||||||
|
if("" == sTitle)
|
||||||
|
{
|
||||||
|
sTitle = "GenericBroStream";
|
||||||
|
}
|
||||||
|
string xmlschema;
|
||||||
|
xmlschema = "<ExtentType name=\"" + sTitle + "\" version=\"1.0\" namespace=\"bro-ids.org\">\n";
|
||||||
|
for(size_t i = 0; i < vals.size(); ++i)
|
||||||
|
{
|
||||||
|
xmlschema += "\t<field type=\"" + vals[i].ds_type + "\" name=\"" + vals[i].field_name + "\" " + vals[i].field_options + "/>\n";
|
||||||
|
}
|
||||||
|
xmlschema += "</ExtentType>\n";
|
||||||
|
for(size_t i = 0; i < vals.size(); ++i)
|
||||||
|
{
|
||||||
|
xmlschema += "<!-- " + vals[i].field_name + " : " + vals[i].bro_type + " -->\n";
|
||||||
|
}
|
||||||
|
return xmlschema;
|
||||||
|
}
|
||||||
|
|
||||||
|
static std::string _GetDSOptionsForType(const threading::Field *field)
|
||||||
|
{
|
||||||
|
switch(field->type)
|
||||||
|
{
|
||||||
|
case TYPE_TIME:
|
||||||
|
case TYPE_INTERVAL:
|
||||||
|
return "pack_relative=\"" + std::string(field->name) + "\"";
|
||||||
|
case TYPE_SUBNET:
|
||||||
|
case TYPE_ADDR:
|
||||||
|
case TYPE_ENUM:
|
||||||
|
case TYPE_STRING:
|
||||||
|
case TYPE_FILE:
|
||||||
|
case TYPE_TABLE:
|
||||||
|
case TYPE_VECTOR:
|
||||||
|
return "pack_unique=\"yes\"";
|
||||||
|
default:
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ************************ CLASS IMPL *********************************
|
||||||
|
|
||||||
|
DataSeries::DataSeries(WriterFrontend* frontend) : WriterBackend(frontend)
|
||||||
|
{
|
||||||
|
ds_compression = string((const char *)BifConst::LogDataSeries::ds_compression->Bytes(), BifConst::LogDataSeries::ds_compression->Len());
|
||||||
|
ds_dump_schema = BifConst::LogDataSeries::ds_dump_schema;
|
||||||
|
ds_extent_size = BifConst::LogDataSeries::ds_extent_size;
|
||||||
|
ds_num_threads = BifConst::LogDataSeries::ds_num_threads;
|
||||||
|
}
|
||||||
|
|
||||||
|
DataSeries::~DataSeries()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
bool DataSeries::DoInit(string path, int num_fields, const threading::Field* const * fields)
|
||||||
|
{
|
||||||
|
// We first construct an XML schema thing (and, if ds_dump_schema is
|
||||||
|
// set, dump it to path + ".ds.xml"). Assuming that goes well, we
|
||||||
|
// use that schema to build our output logfile and prepare it to be
|
||||||
|
// written to.
|
||||||
|
|
||||||
|
// Note: compressor count must be set *BEFORE* DataSeriesSink is instantiated.
|
||||||
|
if(ds_num_threads < THREAD_MIN && ds_num_threads != 0)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "%d is too few threads! Using %d instead\n", (int)ds_num_threads, (int)THREAD_MIN);
|
||||||
|
ds_num_threads = THREAD_MIN;
|
||||||
|
}
|
||||||
|
if(ds_num_threads > THREAD_MAX)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "%d is too many threads! Dropping back to %d\n", (int)ds_num_threads, (int)THREAD_MAX);
|
||||||
|
ds_num_threads = THREAD_MAX;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(ds_num_threads > 0)
|
||||||
|
{
|
||||||
|
DataSeriesSink::setCompressorCount(ds_num_threads);
|
||||||
|
}
|
||||||
|
vector<SchemaValue> schema_list;
|
||||||
|
for ( int i = 0; i < num_fields; i++ )
|
||||||
|
{
|
||||||
|
const threading::Field* field = fields[i];
|
||||||
|
SchemaValue val(field);
|
||||||
|
schema_list.push_back(val);
|
||||||
|
}
|
||||||
|
string schema = _BuildDSSchemaFromFieldTypes(schema_list, path);
|
||||||
|
if(ds_dump_schema)
|
||||||
|
{
|
||||||
|
FILE * pFile;
|
||||||
|
pFile = fopen ( string(path + ".ds.xml").c_str() , "wb" );
|
||||||
|
if(NULL == pFile)
|
||||||
|
{
|
||||||
|
perror("Could not dump schema");
|
||||||
|
}
|
||||||
|
fwrite (schema.c_str(), 1 , schema.length() , pFile );
|
||||||
|
fclose (pFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
int compress_type = Extent::compress_all;
|
||||||
|
|
||||||
|
if(ds_compression == "lzf")
|
||||||
|
{
|
||||||
|
compress_type = Extent::compress_lzf;
|
||||||
|
}
|
||||||
|
else if(ds_compression == "lzo")
|
||||||
|
{
|
||||||
|
compress_type = Extent::compress_lzo;
|
||||||
|
}
|
||||||
|
else if(ds_compression == "gz")
|
||||||
|
{
|
||||||
|
compress_type = Extent::compress_gz;
|
||||||
|
}
|
||||||
|
else if(ds_compression == "bz2")
|
||||||
|
{
|
||||||
|
compress_type = Extent::compress_bz2;
|
||||||
|
}
|
||||||
|
else if(ds_compression == "none")
|
||||||
|
{
|
||||||
|
compress_type = Extent::compress_none;
|
||||||
|
}
|
||||||
|
else if(ds_compression == "any")
|
||||||
|
{
|
||||||
|
compress_type = Extent::compress_all;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
fprintf(stderr, "%s is not a valid compression type. Valid types are: 'lzf', 'lzo', 'gz', 'bz2', 'none', 'any'\n", ds_compression.c_str());
|
||||||
|
fprintf(stderr, "Defaulting to 'any'\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
log_type = const_cast<ExtentType *>(log_types.registerType(schema));
|
||||||
|
|
||||||
|
log_series.setType(*log_type);
|
||||||
|
log_file = new DataSeriesSink(path + ".ds", compress_type);
|
||||||
|
log_file->writeExtentLibrary(log_types);
|
||||||
|
|
||||||
|
for(size_t i = 0; i < schema_list.size(); ++i)
|
||||||
|
extents.insert(std::make_pair(schema_list[i].field_name, GeneralField::create(log_series, schema_list[i].field_name)));
|
||||||
|
|
||||||
|
if(ds_extent_size < ROW_MIN)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "%d is not a valid value for 'rows'. Using min of %d instead.\n", (int)ds_extent_size, (int)ROW_MIN);
|
||||||
|
ds_extent_size = ROW_MIN;
|
||||||
|
}
|
||||||
|
else if(ds_extent_size > ROW_MAX)
|
||||||
|
{
|
||||||
|
fprintf(stderr, "%d is not a valid value for 'rows'. Using max of %d instead.\n", (int)ds_extent_size, (int)ROW_MAX);
|
||||||
|
ds_extent_size = ROW_MAX;
|
||||||
|
}
|
||||||
|
log_output = new OutputModule(*log_file, log_series, log_type, ds_extent_size);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
bool DataSeries::DoFlush()
|
||||||
|
{
|
||||||
|
// Flushing is handled by DataSeries automatically, so this function doesn't do anything.
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool DataSeries::DoFinish()
|
||||||
|
{
|
||||||
|
for(ExtentIterator iter = extents.begin();
|
||||||
|
iter != extents.end(); ++iter)
|
||||||
|
{
|
||||||
|
delete iter->second;
|
||||||
|
}
|
||||||
|
extents.clear();
|
||||||
|
// Don't delete the file before you delete the output, or bad things happen.
|
||||||
|
delete log_output;
|
||||||
|
delete log_file;
|
||||||
|
|
||||||
|
return WriterBackend::DoFinish();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool DataSeries::DoWrite(int num_fields, const threading::Field* const * fields,
|
||||||
|
threading::Value** vals)
|
||||||
|
{
|
||||||
|
log_output->newRecord();
|
||||||
|
for(size_t i = 0; i < (size_t)num_fields; ++i)
|
||||||
|
{
|
||||||
|
ExtentIterator iter = extents.find(fields[i]->name);
|
||||||
|
assert(iter != extents.end());
|
||||||
|
if( iter != extents.end() )
|
||||||
|
{
|
||||||
|
GeneralField *cField = iter->second;
|
||||||
|
if(vals[i]->present)
|
||||||
|
cField->set(LogValueToString(vals[i]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool DataSeries::DoRotate(string rotated_path, double open, double close, bool terminating)
|
||||||
|
{
|
||||||
|
// Note that if DS files are rotated too often, the aggregate log size will be (much) larger.
|
||||||
|
|
||||||
|
DoFinish();
|
||||||
|
|
||||||
|
string dsname = Path() + ".ds";
|
||||||
|
string nname = rotated_path + ".ds";
|
||||||
|
rename(dsname.c_str(), nname.c_str());
|
||||||
|
|
||||||
|
if ( ! FinishedRotation(nname, dsname, open, close, terminating) )
|
||||||
|
{
|
||||||
|
Error(Fmt("error rotating %s to %s", dsname.c_str(), nname.c_str()));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return DoInit(Path(), NumFields(), Fields());
|
||||||
|
}
|
||||||
|
|
||||||
|
bool DataSeries::DoSetBuf(bool enabled)
|
||||||
|
{
|
||||||
|
// DataSeries is *always* buffered to some degree. This option is ignored.
|
||||||
|
return true;
|
||||||
|
}
|
69
src/logging/writers/DataSeries.h
Normal file
69
src/logging/writers/DataSeries.h
Normal file
|
@ -0,0 +1,69 @@
|
||||||
|
// See the file "COPYING" in the main distribution directory for copyright.
|
||||||
|
//
|
||||||
|
// A binary log writer producing DataSeries output. See doc/data-series.rst
|
||||||
|
// for more information.
|
||||||
|
|
||||||
|
#ifndef LOGGING_WRITER_DATA_SERIES_H
|
||||||
|
#define LOGGING_WRITER_DATA_SERIES_H
|
||||||
|
|
||||||
|
#include "../WriterBackend.h"
|
||||||
|
|
||||||
|
#include <DataSeries/ExtentType.hpp>
|
||||||
|
#include <DataSeries/DataSeriesFile.hpp>
|
||||||
|
#include <DataSeries/DataSeriesModule.hpp>
|
||||||
|
#include <DataSeries/GeneralField.hpp>
|
||||||
|
|
||||||
|
namespace logging { namespace writer {
|
||||||
|
|
||||||
|
class DataSeries : public WriterBackend {
|
||||||
|
public:
|
||||||
|
DataSeries(WriterFrontend* frontend);
|
||||||
|
~DataSeries();
|
||||||
|
|
||||||
|
static WriterBackend* Instantiate(WriterFrontend* frontend)
|
||||||
|
{ return new DataSeries(frontend); }
|
||||||
|
|
||||||
|
protected:
|
||||||
|
virtual bool DoInit(string path, int num_fields,
|
||||||
|
const threading::Field* const * fields);
|
||||||
|
|
||||||
|
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
|
||||||
|
threading::Value** vals);
|
||||||
|
virtual bool DoSetBuf(bool enabled);
|
||||||
|
virtual bool DoRotate(string rotated_path, double open,
|
||||||
|
double close, bool terminating);
|
||||||
|
virtual bool DoFlush();
|
||||||
|
virtual bool DoFinish();
|
||||||
|
|
||||||
|
private:
|
||||||
|
static const size_t ROW_MIN = 2048; // Minimum extent size.
|
||||||
|
static const size_t ROW_MAX = (1024 * 1024 * 100); // Maximum extent size.
|
||||||
|
static const size_t THREAD_MIN = 1; // Minimum number of compression threads that DataSeries may spawn.
|
||||||
|
static const size_t THREAD_MAX = 128; // Maximum number of compression threads that DataSeries may spawn.
|
||||||
|
static const size_t TIME_SCALE = 1000000; // Fixed-point multiplier for time values when converted to integers.
|
||||||
|
|
||||||
|
std::string LogValueToString(threading::Value *val);
|
||||||
|
|
||||||
|
typedef std::map<string, GeneralField *> ExtentMap;
|
||||||
|
typedef ExtentMap::iterator ExtentIterator;
|
||||||
|
|
||||||
|
// Internal DataSeries structures we need to keep track of.
|
||||||
|
DataSeriesSink* log_file;
|
||||||
|
ExtentTypeLibrary log_types;
|
||||||
|
ExtentType *log_type;
|
||||||
|
ExtentSeries log_series;
|
||||||
|
OutputModule* log_output;
|
||||||
|
ExtentMap extents;
|
||||||
|
|
||||||
|
// Options set from the script-level.
|
||||||
|
uint64 ds_extent_size;
|
||||||
|
uint64 ds_num_threads;
|
||||||
|
string ds_compression;
|
||||||
|
bool ds_dump_schema;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
21
src/main.cc
21
src/main.cc
|
@ -201,6 +201,27 @@ void usage()
|
||||||
fprintf(stderr, " $BRO_LOG_SUFFIX | ASCII log file extension (.%s)\n", logging::writer::Ascii::LogExt().c_str());
|
fprintf(stderr, " $BRO_LOG_SUFFIX | ASCII log file extension (.%s)\n", logging::writer::Ascii::LogExt().c_str());
|
||||||
fprintf(stderr, " $BRO_PROFILER_FILE | Output file for script execution statistics (not set)\n");
|
fprintf(stderr, " $BRO_PROFILER_FILE | Output file for script execution statistics (not set)\n");
|
||||||
|
|
||||||
|
fprintf(stderr, "\n");
|
||||||
|
fprintf(stderr, " Supported log formats: ");
|
||||||
|
|
||||||
|
bool first = true;
|
||||||
|
list<string> fmts = logging::Manager::SupportedFormats();
|
||||||
|
|
||||||
|
for ( list<string>::const_iterator i = fmts.begin(); i != fmts.end(); ++i )
|
||||||
|
{
|
||||||
|
if ( *i == "None" )
|
||||||
|
// Skip, it's uninteresting.
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if ( ! first )
|
||||||
|
fprintf(stderr, ",");
|
||||||
|
|
||||||
|
fprintf(stderr, "%s", (*i).c_str());
|
||||||
|
first = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
fprintf(stderr, "\n");
|
||||||
|
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -162,6 +162,7 @@ enum Writer %{
|
||||||
WRITER_DEFAULT,
|
WRITER_DEFAULT,
|
||||||
WRITER_NONE,
|
WRITER_NONE,
|
||||||
WRITER_ASCII,
|
WRITER_ASCII,
|
||||||
|
WRITER_DATASERIES,
|
||||||
%}
|
%}
|
||||||
|
|
||||||
enum ID %{
|
enum ID %{
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue