mirror of
https://github.com/zeek/zeek.git
synced 2025-10-14 12:38:20 +00:00
DataSeries cleanup.
This commit is contained in:
parent
7131feefbc
commit
891c532775
6 changed files with 162 additions and 181 deletions
|
@ -15,10 +15,9 @@
|
||||||
|
|
||||||
extern int generate_documentation;
|
extern int generate_documentation;
|
||||||
|
|
||||||
|
// Note: This function must be thread-safe.
|
||||||
const char* type_name(TypeTag t)
|
const char* type_name(TypeTag t)
|
||||||
{
|
{
|
||||||
static char errbuf[512];
|
|
||||||
|
|
||||||
static const char* type_names[int(NUM_TYPES)] = {
|
static const char* type_names[int(NUM_TYPES)] = {
|
||||||
"void",
|
"void",
|
||||||
"bool", "int", "count", "counter",
|
"bool", "int", "count", "counter",
|
||||||
|
@ -37,10 +36,7 @@ const char* type_name(TypeTag t)
|
||||||
};
|
};
|
||||||
|
|
||||||
if ( int(t) >= NUM_TYPES )
|
if ( int(t) >= NUM_TYPES )
|
||||||
{
|
return "type_name(): not a type tag";
|
||||||
snprintf(errbuf, sizeof(errbuf), "%d: not a type tag", int(t));
|
|
||||||
return errbuf;
|
|
||||||
}
|
|
||||||
|
|
||||||
return type_names[int(t)];
|
return type_names[int(t)];
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,6 +86,9 @@ bool Ascii::DoInit(string path, int num_fields, const Field* const * fields)
|
||||||
|
|
||||||
if ( include_header )
|
if ( include_header )
|
||||||
{
|
{
|
||||||
|
string names;
|
||||||
|
string types;
|
||||||
|
|
||||||
string str = string(header_prefix, header_prefix_len)
|
string str = string(header_prefix, header_prefix_len)
|
||||||
+ "separator " // Always use space as separator here.
|
+ "separator " // Always use space as separator here.
|
||||||
+ get_escaped_string(string(separator, separator_len), false)
|
+ get_escaped_string(string(separator, separator_len), false)
|
||||||
|
@ -103,9 +106,6 @@ bool Ascii::DoInit(string path, int num_fields, const Field* const * fields)
|
||||||
WriteHeaderField("path", get_escaped_string(path, false))) )
|
WriteHeaderField("path", get_escaped_string(path, false))) )
|
||||||
goto write_error;
|
goto write_error;
|
||||||
|
|
||||||
string names;
|
|
||||||
string types;
|
|
||||||
|
|
||||||
for ( int i = 0; i < num_fields; ++i )
|
for ( int i = 0; i < num_fields; ++i )
|
||||||
{
|
{
|
||||||
if ( i > 0 )
|
if ( i > 0 )
|
||||||
|
@ -114,15 +114,8 @@ bool Ascii::DoInit(string path, int num_fields, const Field* const * fields)
|
||||||
types += string(separator, separator_len);
|
types += string(separator, separator_len);
|
||||||
}
|
}
|
||||||
|
|
||||||
const Field* field = fields[i];
|
names += fields[i]->name;
|
||||||
names += field->name;
|
types += fields[i]->TypeName();
|
||||||
types += type_name(field->type);
|
|
||||||
if ( (field->type == TYPE_TABLE) || (field->type == TYPE_VECTOR) )
|
|
||||||
{
|
|
||||||
types += "[";
|
|
||||||
types += type_name(field->subtype);
|
|
||||||
types += "]";
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( ! (WriteHeaderField("fields", names)
|
if ( ! (WriteHeaderField("fields", names)
|
||||||
|
|
|
@ -15,17 +15,15 @@ using namespace logging;
|
||||||
using namespace writer;
|
using namespace writer;
|
||||||
|
|
||||||
std::string DataSeries::LogValueToString(threading::Value *val)
|
std::string DataSeries::LogValueToString(threading::Value *val)
|
||||||
{
|
{
|
||||||
const int strsz = 1024;
|
// In some cases, no value is attached. If this is the case, return
|
||||||
char strbuf[strsz];
|
// an empty string.
|
||||||
|
if( ! val->present )
|
||||||
// In some cases, no value is attached. If this is the case, return an empty string.
|
|
||||||
if(!val->present)
|
|
||||||
return "";
|
return "";
|
||||||
|
|
||||||
std::ostringstream ostr;
|
std::ostringstream ostr;
|
||||||
switch(val->type)
|
|
||||||
{
|
switch(val->type) {
|
||||||
case TYPE_BOOL:
|
case TYPE_BOOL:
|
||||||
return (val->val.int_val ? "true" : "false");
|
return (val->val.int_val ? "true" : "false");
|
||||||
|
|
||||||
|
@ -47,12 +45,15 @@ std::string DataSeries::LogValueToString(threading::Value *val)
|
||||||
ostr << Render(val->val.addr_val);
|
ostr << Render(val->val.addr_val);
|
||||||
return ostr.str();
|
return ostr.str();
|
||||||
|
|
||||||
// Note: These two cases are relatively special. We need to convert these values into their integer equivalents
|
// Note: These two cases are relatively special. We need to convert
|
||||||
// to maximize precision. At the moment, there won't be a noticeable effect (Bro uses the double format everywhere
|
// these values into their integer equivalents to maximize precision.
|
||||||
// internally, so we've already lost the precision we'd gain here), but timestamps may eventually switch to this
|
// At the moment, there won't be a noticeable effect (Bro uses the
|
||||||
// representation within Bro.
|
// double format everywhere internally, so we've already lost the
|
||||||
|
// precision we'd gain here), but timestamps may eventually switch to
|
||||||
|
// this representation within Bro.
|
||||||
//
|
//
|
||||||
// in the near-term, this *should* lead to better pack_relative (and thus smaller output files).
|
// In the near-term, this *should* lead to better pack_relative (and
|
||||||
|
// thus smaller output files).
|
||||||
case TYPE_TIME:
|
case TYPE_TIME:
|
||||||
case TYPE_INTERVAL:
|
case TYPE_INTERVAL:
|
||||||
if ( ds_use_integer_for_time )
|
if ( ds_use_integer_for_time )
|
||||||
|
@ -69,59 +70,57 @@ std::string DataSeries::LogValueToString(threading::Value *val)
|
||||||
case TYPE_ENUM:
|
case TYPE_ENUM:
|
||||||
case TYPE_STRING:
|
case TYPE_STRING:
|
||||||
case TYPE_FILE:
|
case TYPE_FILE:
|
||||||
{
|
case TYPE_FUNC:
|
||||||
int size = val->val.string_val->size();
|
if ( ! val->val.string_val->size() )
|
||||||
string tmpString = "";
|
return "";
|
||||||
if(size)
|
|
||||||
tmpString = string(val->val.string_val->data(), val->val.string_val->size());
|
return string(val->val.string_val->data(), val->val.string_val->size());
|
||||||
else
|
|
||||||
tmpString = string("");
|
|
||||||
return tmpString;
|
|
||||||
}
|
|
||||||
case TYPE_TABLE:
|
case TYPE_TABLE:
|
||||||
{
|
{
|
||||||
if ( ! val->val.set_val.size )
|
if ( ! val->val.set_val.size )
|
||||||
{
|
|
||||||
return "";
|
return "";
|
||||||
}
|
|
||||||
|
|
||||||
string tmpString = "";
|
string tmpString = "";
|
||||||
|
|
||||||
for ( int j = 0; j < val->val.set_val.size; j++ )
|
for ( int j = 0; j < val->val.set_val.size; j++ )
|
||||||
{
|
{
|
||||||
if ( j > 0 )
|
if ( j > 0 )
|
||||||
tmpString += ":"; //TODO: Specify set separator char in configuration.
|
tmpString += ds_set_separator;
|
||||||
|
|
||||||
tmpString += LogValueToString(val->val.set_val.vals[j]);
|
tmpString += LogValueToString(val->val.set_val.vals[j]);
|
||||||
}
|
}
|
||||||
|
|
||||||
return tmpString;
|
return tmpString;
|
||||||
}
|
}
|
||||||
|
|
||||||
case TYPE_VECTOR:
|
case TYPE_VECTOR:
|
||||||
{
|
{
|
||||||
if ( ! val->val.vector_val.size )
|
if ( ! val->val.vector_val.size )
|
||||||
{
|
|
||||||
return "";
|
return "";
|
||||||
}
|
|
||||||
|
|
||||||
string tmpString = "";
|
string tmpString = "";
|
||||||
|
|
||||||
for ( int j = 0; j < val->val.vector_val.size; j++ )
|
for ( int j = 0; j < val->val.vector_val.size; j++ )
|
||||||
{
|
{
|
||||||
if ( j > 0 )
|
if ( j > 0 )
|
||||||
tmpString += ":"; //TODO: Specify set separator char in configuration.
|
tmpString += ds_set_separator;
|
||||||
|
|
||||||
tmpString += LogValueToString(val->val.vector_val.vals[j]);
|
tmpString += LogValueToString(val->val.vector_val.vals[j]);
|
||||||
}
|
}
|
||||||
|
|
||||||
return tmpString;
|
return tmpString;
|
||||||
}
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return "???";
|
InternalError(Fmt("unknown type %s in DataSeries::LogValueToString", type_name(val->type)));
|
||||||
|
return "cannot be reached";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
string DataSeries::GetDSFieldType(const threading::Field *field)
|
string DataSeries::GetDSFieldType(const threading::Field *field)
|
||||||
{
|
{
|
||||||
switch(field->type)
|
switch(field->type) {
|
||||||
{
|
|
||||||
case TYPE_BOOL:
|
case TYPE_BOOL:
|
||||||
return "bool";
|
return "bool";
|
||||||
|
|
||||||
|
@ -145,75 +144,49 @@ string DataSeries::GetDSFieldType(const threading::Field *field)
|
||||||
case TYPE_FILE:
|
case TYPE_FILE:
|
||||||
case TYPE_TABLE:
|
case TYPE_TABLE:
|
||||||
case TYPE_VECTOR:
|
case TYPE_VECTOR:
|
||||||
default:
|
case TYPE_FUNC:
|
||||||
return "variable32";
|
return "variable32";
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
string DataSeries::GetBroTypeString(const threading::Field *field)
|
|
||||||
{
|
|
||||||
switch(field->type)
|
|
||||||
{
|
|
||||||
case TYPE_BOOL:
|
|
||||||
return "bool";
|
|
||||||
case TYPE_COUNT:
|
|
||||||
return "count";
|
|
||||||
case TYPE_COUNTER:
|
|
||||||
return "counter";
|
|
||||||
case TYPE_PORT:
|
|
||||||
return "port";
|
|
||||||
case TYPE_INT:
|
|
||||||
return "int";
|
|
||||||
case TYPE_TIME:
|
|
||||||
return "time";
|
|
||||||
case TYPE_INTERVAL:
|
|
||||||
return "interval";
|
|
||||||
case TYPE_DOUBLE:
|
|
||||||
return "double";
|
|
||||||
case TYPE_SUBNET:
|
|
||||||
return "subnet";
|
|
||||||
case TYPE_ADDR:
|
|
||||||
return "addr";
|
|
||||||
case TYPE_ENUM:
|
|
||||||
return "enum";
|
|
||||||
case TYPE_STRING:
|
|
||||||
return "string";
|
|
||||||
case TYPE_FILE:
|
|
||||||
return "file";
|
|
||||||
case TYPE_TABLE:
|
|
||||||
return "table";
|
|
||||||
case TYPE_VECTOR:
|
|
||||||
return "vector";
|
|
||||||
default:
|
default:
|
||||||
return "???";
|
InternalError(Fmt("unknown type %s in DataSeries::GetDSFieldType", type_name(field->type)));
|
||||||
|
return "cannot be reached";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
string DataSeries::BuildDSSchemaFromFieldTypes(const vector<SchemaValue>& vals, string sTitle)
|
string DataSeries::BuildDSSchemaFromFieldTypes(const vector<SchemaValue>& vals, string sTitle)
|
||||||
{
|
|
||||||
if("" == sTitle)
|
|
||||||
{
|
{
|
||||||
|
if( ! sTitle.size() )
|
||||||
sTitle = "GenericBroStream";
|
sTitle = "GenericBroStream";
|
||||||
}
|
|
||||||
string xmlschema;
|
string xmlschema = "<ExtentType name=\""
|
||||||
xmlschema = "<ExtentType name=\"" + sTitle + "\" version=\"1.0\" namespace=\"bro-ids.org\">\n";
|
+ sTitle
|
||||||
for(size_t i = 0; i < vals.size(); ++i)
|
+ "\" version=\"1.0\" namespace=\"bro-ids.org\">\n";
|
||||||
|
|
||||||
|
for( size_t i = 0; i < vals.size(); ++i )
|
||||||
{
|
{
|
||||||
xmlschema += "\t<field type=\"" + vals[i].ds_type + "\" name=\"" + vals[i].field_name + "\" " + vals[i].field_options + "/>\n";
|
xmlschema += "\t<field type=\""
|
||||||
|
+ vals[i].ds_type
|
||||||
|
+ "\" name=\""
|
||||||
|
+ vals[i].field_name
|
||||||
|
+ "\" " + vals[i].field_options
|
||||||
|
+ "/>\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
xmlschema += "</ExtentType>\n";
|
xmlschema += "</ExtentType>\n";
|
||||||
for(size_t i = 0; i < vals.size(); ++i)
|
|
||||||
|
for( size_t i = 0; i < vals.size(); ++i )
|
||||||
{
|
{
|
||||||
xmlschema += "<!-- " + vals[i].field_name + " : " + vals[i].bro_type + " -->\n";
|
xmlschema += "<!-- " + vals[i].field_name
|
||||||
|
+ " : " + vals[i].bro_type
|
||||||
|
+ " -->\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
return xmlschema;
|
return xmlschema;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string DataSeries::GetDSOptionsForType(const threading::Field *field)
|
std::string DataSeries::GetDSOptionsForType(const threading::Field *field)
|
||||||
{
|
{
|
||||||
switch(field->type)
|
switch( field->type ) {
|
||||||
{
|
|
||||||
case TYPE_TIME:
|
case TYPE_TIME:
|
||||||
case TYPE_INTERVAL:
|
case TYPE_INTERVAL:
|
||||||
{
|
{
|
||||||
|
@ -233,6 +206,7 @@ std::string DataSeries::GetDSOptionsForType(const threading::Field *field)
|
||||||
case TYPE_TABLE:
|
case TYPE_TABLE:
|
||||||
case TYPE_VECTOR:
|
case TYPE_VECTOR:
|
||||||
return "pack_unique=\"yes\"";
|
return "pack_unique=\"yes\"";
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return "";
|
return "";
|
||||||
}
|
}
|
||||||
|
@ -242,11 +216,13 @@ std::string DataSeries::GetDSOptionsForType(const threading::Field *field)
|
||||||
|
|
||||||
DataSeries::DataSeries(WriterFrontend* frontend) : WriterBackend(frontend)
|
DataSeries::DataSeries(WriterFrontend* frontend) : WriterBackend(frontend)
|
||||||
{
|
{
|
||||||
ds_compression = string((const char *)BifConst::LogDataSeries::compression->Bytes(), BifConst::LogDataSeries::compression->Len());
|
ds_compression = string((const char *)BifConst::LogDataSeries::compression->Bytes(),
|
||||||
|
BifConst::LogDataSeries::compression->Len());
|
||||||
ds_dump_schema = BifConst::LogDataSeries::dump_schema;
|
ds_dump_schema = BifConst::LogDataSeries::dump_schema;
|
||||||
ds_extent_size = BifConst::LogDataSeries::extent_size;
|
ds_extent_size = BifConst::LogDataSeries::extent_size;
|
||||||
ds_num_threads = BifConst::LogDataSeries::num_threads;
|
ds_num_threads = BifConst::LogDataSeries::num_threads;
|
||||||
ds_use_integer_for_time = BifConst::LogDataSeries::use_integer_for_time;
|
ds_use_integer_for_time = BifConst::LogDataSeries::use_integer_for_time;
|
||||||
|
ds_set_separator = ",";
|
||||||
}
|
}
|
||||||
|
|
||||||
DataSeries::~DataSeries()
|
DataSeries::~DataSeries()
|
||||||
|
@ -258,20 +234,23 @@ bool DataSeries::OpenLog(string path)
|
||||||
log_file = new DataSeriesSink(path + ".ds", compress_type);
|
log_file = new DataSeriesSink(path + ".ds", compress_type);
|
||||||
log_file->writeExtentLibrary(log_types);
|
log_file->writeExtentLibrary(log_types);
|
||||||
|
|
||||||
for(size_t i = 0; i < schema_list.size(); ++i)
|
for( size_t i = 0; i < schema_list.size(); ++i )
|
||||||
extents.insert(std::make_pair(schema_list[i].field_name, GeneralField::create(log_series, schema_list[i].field_name)));
|
extents.insert(std::make_pair(schema_list[i].field_name,
|
||||||
|
GeneralField::create(log_series, schema_list[i].field_name)));
|
||||||
|
|
||||||
if(ds_extent_size < ROW_MIN)
|
if ( ds_extent_size < ROW_MIN )
|
||||||
{
|
{
|
||||||
fprintf(stderr, "%d is not a valid value for 'rows'. Using min of %d instead.\n", (int)ds_extent_size, (int)ROW_MIN);
|
Warning(Fmt("%d is not a valid value for 'rows'. Using min of %d instead", (int)ds_extent_size, (int)ROW_MIN));
|
||||||
ds_extent_size = ROW_MIN;
|
ds_extent_size = ROW_MIN;
|
||||||
}
|
}
|
||||||
else if(ds_extent_size > ROW_MAX)
|
|
||||||
|
else if( ds_extent_size > ROW_MAX )
|
||||||
{
|
{
|
||||||
fprintf(stderr, "%d is not a valid value for 'rows'. Using max of %d instead.\n", (int)ds_extent_size, (int)ROW_MAX);
|
Warning(Fmt("%d is not a valid value for 'rows'. Using max of %d instead", (int)ds_extent_size, (int)ROW_MAX));
|
||||||
ds_extent_size = ROW_MAX;
|
ds_extent_size = ROW_MAX;
|
||||||
}
|
}
|
||||||
log_output = new OutputModule(*log_file, log_series, log_type, ds_extent_size);
|
|
||||||
|
log_output = new OutputModule(*log_file, log_series, *log_type, ds_extent_size);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -283,22 +262,22 @@ bool DataSeries::DoInit(string path, int num_fields, const threading::Field* con
|
||||||
// use that schema to build our output logfile and prepare it to be
|
// use that schema to build our output logfile and prepare it to be
|
||||||
// written to.
|
// written to.
|
||||||
|
|
||||||
// Note: compressor count must be set *BEFORE* DataSeriesSink is instantiated.
|
// Note: compressor count must be set *BEFORE* DataSeriesSink is
|
||||||
if(ds_num_threads < THREAD_MIN && ds_num_threads != 0)
|
// instantiated.
|
||||||
|
if( ds_num_threads < THREAD_MIN && ds_num_threads != 0 )
|
||||||
{
|
{
|
||||||
fprintf(stderr, "%d is too few threads! Using %d instead\n", (int)ds_num_threads, (int)THREAD_MIN);
|
Warning(Fmt("%d is too few threads! Using %d instead", (int)ds_num_threads, (int)THREAD_MIN));
|
||||||
ds_num_threads = THREAD_MIN;
|
ds_num_threads = THREAD_MIN;
|
||||||
}
|
}
|
||||||
if(ds_num_threads > THREAD_MAX)
|
|
||||||
|
if( ds_num_threads > THREAD_MAX )
|
||||||
{
|
{
|
||||||
fprintf(stderr, "%d is too many threads! Dropping back to %d\n", (int)ds_num_threads, (int)THREAD_MAX);
|
Warning(Fmt("%d is too many threads! Dropping back to %d", (int)ds_num_threads, (int)THREAD_MAX));
|
||||||
ds_num_threads = THREAD_MAX;
|
ds_num_threads = THREAD_MAX;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(ds_num_threads > 0)
|
if( ds_num_threads > 0 )
|
||||||
{
|
|
||||||
DataSeriesSink::setCompressorCount(ds_num_threads);
|
DataSeriesSink::setCompressorCount(ds_num_threads);
|
||||||
}
|
|
||||||
|
|
||||||
for ( int i = 0; i < num_fields; i++ )
|
for ( int i = 0; i < num_fields; i++ )
|
||||||
{
|
{
|
||||||
|
@ -307,65 +286,59 @@ bool DataSeries::DoInit(string path, int num_fields, const threading::Field* con
|
||||||
val.ds_type = GetDSFieldType(field);
|
val.ds_type = GetDSFieldType(field);
|
||||||
val.field_name = string(field->name);
|
val.field_name = string(field->name);
|
||||||
val.field_options = GetDSOptionsForType(field);
|
val.field_options = GetDSOptionsForType(field);
|
||||||
val.bro_type = GetBroTypeString(field);
|
val.bro_type = field->TypeName();
|
||||||
schema_list.push_back(val);
|
schema_list.push_back(val);
|
||||||
}
|
}
|
||||||
|
|
||||||
string schema = BuildDSSchemaFromFieldTypes(schema_list, path);
|
string schema = BuildDSSchemaFromFieldTypes(schema_list, path);
|
||||||
if(ds_dump_schema)
|
|
||||||
|
if( ds_dump_schema )
|
||||||
{
|
{
|
||||||
FILE * pFile;
|
FILE* pFile = fopen ( string(path + ".ds.xml").c_str() , "wb" );
|
||||||
pFile = fopen ( string(path + ".ds.xml").c_str() , "wb" );
|
|
||||||
if(NULL == pFile)
|
if( pFile )
|
||||||
{
|
{
|
||||||
perror("Could not dump schema");
|
fwrite(schema.c_str(), 1, schema.length(), pFile);
|
||||||
}
|
fclose(pFile);
|
||||||
fwrite (schema.c_str(), 1 , schema.length() , pFile );
|
|
||||||
fclose (pFile);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
compress_type = Extent::compress_all;
|
|
||||||
|
|
||||||
if(ds_compression == "lzf")
|
|
||||||
{
|
|
||||||
compress_type = Extent::compress_lzf;
|
|
||||||
}
|
|
||||||
else if(ds_compression == "lzo")
|
|
||||||
{
|
|
||||||
compress_type = Extent::compress_lzo;
|
|
||||||
}
|
|
||||||
else if(ds_compression == "gz")
|
|
||||||
{
|
|
||||||
compress_type = Extent::compress_gz;
|
|
||||||
}
|
|
||||||
else if(ds_compression == "bz2")
|
|
||||||
{
|
|
||||||
compress_type = Extent::compress_bz2;
|
|
||||||
}
|
|
||||||
else if(ds_compression == "none")
|
|
||||||
{
|
|
||||||
compress_type = Extent::compress_none;
|
|
||||||
}
|
|
||||||
else if(ds_compression == "any")
|
|
||||||
{
|
|
||||||
compress_type = Extent::compress_all;
|
|
||||||
}
|
|
||||||
else
|
else
|
||||||
{
|
Error(Fmt("cannot dump schema: %s", strerror(errno)));
|
||||||
fprintf(stderr, "%s is not a valid compression type. Valid types are: 'lzf', 'lzo', 'gz', 'bz2', 'none', 'any'\n", ds_compression.c_str());
|
|
||||||
fprintf(stderr, "Defaulting to 'any'\n");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
compress_type = Extent::compress_all;
|
||||||
|
|
||||||
|
if( ds_compression == "lzf" )
|
||||||
|
compress_type = Extent::compress_lzf;
|
||||||
|
|
||||||
|
else if( ds_compression == "lzo" )
|
||||||
|
compress_type = Extent::compress_lzo;
|
||||||
|
|
||||||
|
else if( ds_compression == "gz" )
|
||||||
|
compress_type = Extent::compress_gz;
|
||||||
|
|
||||||
|
else if( ds_compression == "bz2" )
|
||||||
|
compress_type = Extent::compress_bz2;
|
||||||
|
|
||||||
|
else if( ds_compression == "none" )
|
||||||
|
compress_type = Extent::compress_none;
|
||||||
|
|
||||||
|
else if( ds_compression == "any" )
|
||||||
|
compress_type = Extent::compress_all;
|
||||||
|
|
||||||
|
else
|
||||||
|
Warning(Fmt("%s is not a valid compression type. Valid types are: 'lzf', 'lzo', 'gz', 'bz2', 'none', 'any'. Defaulting to 'any'", ds_compression.c_str()));
|
||||||
|
|
||||||
log_type = const_cast<ExtentType *>(log_types.registerType(schema));
|
log_type = const_cast<ExtentType *>(log_types.registerType(schema));
|
||||||
|
|
||||||
log_series.setType(*log_type);
|
log_series.setType(*log_type);
|
||||||
|
|
||||||
return OpenLog(path);
|
return OpenLog(path);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool DataSeries::DoFlush()
|
bool DataSeries::DoFlush()
|
||||||
{
|
{
|
||||||
// Flushing is handled by DataSeries automatically, so this function doesn't do anything.
|
// Flushing is handled by DataSeries automatically, so this function
|
||||||
|
// doesn't do anything.
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -377,7 +350,7 @@ void DataSeries::CloseLog()
|
||||||
extents.clear();
|
extents.clear();
|
||||||
|
|
||||||
// Don't delete the file before you delete the output, or bad things
|
// Don't delete the file before you delete the output, or bad things
|
||||||
// happen.
|
// will happen.
|
||||||
delete log_output;
|
delete log_output;
|
||||||
delete log_file;
|
delete log_file;
|
||||||
|
|
||||||
|
@ -396,14 +369,17 @@ bool DataSeries::DoWrite(int num_fields, const threading::Field* const * fields,
|
||||||
threading::Value** vals)
|
threading::Value** vals)
|
||||||
{
|
{
|
||||||
log_output->newRecord();
|
log_output->newRecord();
|
||||||
for(size_t i = 0; i < (size_t)num_fields; ++i)
|
|
||||||
|
for( size_t i = 0; i < (size_t)num_fields; ++i )
|
||||||
{
|
{
|
||||||
ExtentIterator iter = extents.find(fields[i]->name);
|
ExtentIterator iter = extents.find(fields[i]->name);
|
||||||
assert(iter != extents.end());
|
assert(iter != extents.end());
|
||||||
|
|
||||||
if( iter != extents.end() )
|
if( iter != extents.end() )
|
||||||
{
|
{
|
||||||
GeneralField *cField = iter->second;
|
GeneralField *cField = iter->second;
|
||||||
if(vals[i]->present)
|
|
||||||
|
if( vals[i]->present )
|
||||||
cField->set(LogValueToString(vals[i]));
|
cField->set(LogValueToString(vals[i]));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -413,7 +389,8 @@ bool DataSeries::DoWrite(int num_fields, const threading::Field* const * fields,
|
||||||
|
|
||||||
bool DataSeries::DoRotate(string rotated_path, double open, double close, bool terminating)
|
bool DataSeries::DoRotate(string rotated_path, double open, double close, bool terminating)
|
||||||
{
|
{
|
||||||
// Note that if DS files are rotated too often, the aggregate log size will be (much) larger.
|
// Note that if DS files are rotated too often, the aggregate log
|
||||||
|
// size will be (much) larger.
|
||||||
CloseLog();
|
CloseLog();
|
||||||
|
|
||||||
string dsname = Path() + ".ds";
|
string dsname = Path() + ".ds";
|
||||||
|
|
|
@ -6,13 +6,13 @@
|
||||||
#ifndef LOGGING_WRITER_DATA_SERIES_H
|
#ifndef LOGGING_WRITER_DATA_SERIES_H
|
||||||
#define LOGGING_WRITER_DATA_SERIES_H
|
#define LOGGING_WRITER_DATA_SERIES_H
|
||||||
|
|
||||||
#include "../WriterBackend.h"
|
|
||||||
|
|
||||||
#include <DataSeries/ExtentType.hpp>
|
#include <DataSeries/ExtentType.hpp>
|
||||||
#include <DataSeries/DataSeriesFile.hpp>
|
#include <DataSeries/DataSeriesFile.hpp>
|
||||||
#include <DataSeries/DataSeriesModule.hpp>
|
#include <DataSeries/DataSeriesModule.hpp>
|
||||||
#include <DataSeries/GeneralField.hpp>
|
#include <DataSeries/GeneralField.hpp>
|
||||||
|
|
||||||
|
#include "../WriterBackend.h"
|
||||||
|
|
||||||
namespace logging { namespace writer {
|
namespace logging { namespace writer {
|
||||||
|
|
||||||
class DataSeries : public WriterBackend {
|
class DataSeries : public WriterBackend {
|
||||||
|
@ -24,6 +24,8 @@ public:
|
||||||
{ return new DataSeries(frontend); }
|
{ return new DataSeries(frontend); }
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
// Overidden from WriterBackend.
|
||||||
|
|
||||||
virtual bool DoInit(string path, int num_fields,
|
virtual bool DoInit(string path, int num_fields,
|
||||||
const threading::Field* const * fields);
|
const threading::Field* const * fields);
|
||||||
|
|
||||||
|
@ -85,18 +87,10 @@ private:
|
||||||
*/
|
*/
|
||||||
string BuildDSSchemaFromFieldTypes(const vector<SchemaValue>& vals, string sTitle);
|
string BuildDSSchemaFromFieldTypes(const vector<SchemaValue>& vals, string sTitle);
|
||||||
|
|
||||||
/**
|
|
||||||
* Takes a field type and converts it to a readable string.
|
|
||||||
*
|
|
||||||
* @param field We extract the type from this and convert it into a readable string.
|
|
||||||
* @return String representation of the field's type
|
|
||||||
*/
|
|
||||||
string GetBroTypeString(const threading::Field *field);
|
|
||||||
|
|
||||||
/** Closes the currently open file. */
|
/** Closes the currently open file. */
|
||||||
void CloseLog();
|
void CloseLog();
|
||||||
|
|
||||||
/** XXX */
|
/** Opens a new file. */
|
||||||
bool OpenLog(string path);
|
bool OpenLog(string path);
|
||||||
|
|
||||||
typedef std::map<string, GeneralField *> ExtentMap;
|
typedef std::map<string, GeneralField *> ExtentMap;
|
||||||
|
@ -119,6 +113,7 @@ private:
|
||||||
string ds_compression;
|
string ds_compression;
|
||||||
bool ds_dump_schema;
|
bool ds_dump_schema;
|
||||||
bool ds_use_integer_for_time;
|
bool ds_use_integer_for_time;
|
||||||
|
string ds_set_separator;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,20 @@ bool Field::Write(SerializationFormat* fmt) const
|
||||||
return (fmt->Write(name, "name") && fmt->Write((int)type, "type") && fmt->Write((int)subtype, "subtype"));
|
return (fmt->Write(name, "name") && fmt->Write((int)type, "type") && fmt->Write((int)subtype, "subtype"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
string Field::TypeName() const
|
||||||
|
{
|
||||||
|
string n = type_name(type);
|
||||||
|
|
||||||
|
if ( (type == TYPE_TABLE) || (type == TYPE_VECTOR) )
|
||||||
|
{
|
||||||
|
n += "[";
|
||||||
|
n += type_name(subtype);
|
||||||
|
n += "]";
|
||||||
|
}
|
||||||
|
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
Value::~Value()
|
Value::~Value()
|
||||||
{
|
{
|
||||||
if ( (type == TYPE_ENUM || type == TYPE_STRING || type == TYPE_FILE || type == TYPE_FUNC)
|
if ( (type == TYPE_ENUM || type == TYPE_STRING || type == TYPE_FILE || type == TYPE_FUNC)
|
||||||
|
|
|
@ -53,6 +53,12 @@ struct Field {
|
||||||
* @return False if an error occured.
|
* @return False if an error occured.
|
||||||
*/
|
*/
|
||||||
bool Write(SerializationFormat* fmt) const;
|
bool Write(SerializationFormat* fmt) const;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a textual description of the field's type. This method is
|
||||||
|
* thread-safe.
|
||||||
|
*/
|
||||||
|
string TypeName() const;
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -132,8 +138,8 @@ struct Value {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if the type can be represented by a Value. If
|
* Returns true if the type can be represented by a Value. If
|
||||||
* `atomic_only` is true, will not permit composite types.
|
* `atomic_only` is true, will not permit composite types. This
|
||||||
*/
|
* method is thread-safe. */
|
||||||
static bool IsCompatibleType(BroType* t, bool atomic_only=false);
|
static bool IsCompatibleType(BroType* t, bool atomic_only=false);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue