Working on DataSeries support.

- The option to use integers insteads of double was ignored.

   - Renaming script-level options to remove the ds_ prefix.

   - Log rotation didn't work.

   - A set of simple unit tests.
This commit is contained in:
Robin Sommer 2012-04-09 17:30:57 -07:00
parent 952b6b293a
commit 7131feefbc
16 changed files with 1001 additions and 128 deletions

View file

@ -14,78 +14,6 @@
using namespace logging;
using namespace writer;
// NOTE: Naming conventions are a little bit scattershot at the moment.
// Within the scope of this file, a function name prefixed by '_' denotes a
// static function.
// ************************ LOCAL PROTOTYPES *********************************
struct SchemaValue;
/**
* Turns a log value into a std::string. Uses an ostringstream to do the
* heavy lifting, but still need to switch on the type to know which value
* in the union to give to the string string for processing.
*
* @param val The value we wish to convert to a string
* @return the string value of val
*/
static std::string _LogValueToString(threading::Value* val);
/**
* Takes a field type and converts it to a relevant DataSeries type.
*
* @param field We extract the type from this and convert it into a relevant DS type.
* @return String representation of type that DataSeries can understand.
*/
static string _GetDSFieldType(const threading::Field* field);
/**
* Takes a field type and converts it to a readable string.
*
* @param field We extract the type from this and convert it into a readable string.
* @return String representation of the field's type
*/
static string _GetBroTypeString(const threading::Field *field);
/**
* Takes a list of types, a list of names, and a title, and uses it to construct a valid DataSeries XML schema
* thing, which is then returned as a std::string
*
* @param opts std::vector of strings containing a list of options to be appended to each field (e.g. "pack_relative=yes")
* @param sTitle Name of this schema. Ideally, these schemas would be aggregated and re-used.
*/
static string _BuildDSSchemaFromFieldTypes(const vector<SchemaValue>& vals, string sTitle);
/**
* Are there any options we should put into the XML schema?
*
* @param field We extract the type from this and return any options that make sense for that type.
* @return Options that can be added directly to the XML (e.g. "pack_relative=\"yes\"")
*/
static std::string _GetDSOptionsForType(const threading::Field *field);
/**
* Internal helper structure; populate a vector of these which is passed to the XML generator for its use.
*/
struct SchemaValue
{
string ds_type;
string bro_type;
string field_name;
string field_options;
SchemaValue(const threading::Field *field)
{
ds_type = _GetDSFieldType(field);
field_name = string(field->name);
field_options = _GetDSOptionsForType(field);
bro_type = _GetBroTypeString(field);
}
};
// ************************ LOCAL IMPL *********************************
std::string DataSeries::LogValueToString(threading::Value *val)
{
const int strsz = 1024;
@ -127,7 +55,11 @@ std::string DataSeries::LogValueToString(threading::Value *val)
// in the near-term, this *should* lead to better pack_relative (and thus smaller output files).
case TYPE_TIME:
case TYPE_INTERVAL:
ostr << (unsigned long)(DataSeries::TIME_SCALE * val->val.double_val);
if ( ds_use_integer_for_time )
ostr << (unsigned long)(DataSeries::TIME_SCALE * val->val.double_val);
else
ostr << val->val.double_val;
return ostr.str();
case TYPE_DOUBLE:
@ -186,7 +118,7 @@ std::string DataSeries::LogValueToString(threading::Value *val)
}
}
static string _GetDSFieldType(const threading::Field *field)
string DataSeries::GetDSFieldType(const threading::Field *field)
{
switch(field->type)
{
@ -197,13 +129,15 @@ static string _GetDSFieldType(const threading::Field *field)
case TYPE_COUNTER:
case TYPE_PORT:
case TYPE_INT:
case TYPE_TIME:
case TYPE_INTERVAL:
return "int64";
case TYPE_DOUBLE:
return "double";
case TYPE_TIME:
case TYPE_INTERVAL:
return ds_use_integer_for_time ? "int64" : "double";
case TYPE_SUBNET:
case TYPE_ADDR:
case TYPE_ENUM:
@ -217,7 +151,7 @@ static string _GetDSFieldType(const threading::Field *field)
}
}
static string _GetBroTypeString(const threading::Field *field)
string DataSeries::GetBroTypeString(const threading::Field *field)
{
switch(field->type)
{
@ -256,7 +190,7 @@ static string _GetBroTypeString(const threading::Field *field)
}
}
static string _BuildDSSchemaFromFieldTypes(const vector<SchemaValue>& vals, string sTitle)
string DataSeries::BuildDSSchemaFromFieldTypes(const vector<SchemaValue>& vals, string sTitle)
{
if("" == sTitle)
{
@ -276,13 +210,21 @@ static string _BuildDSSchemaFromFieldTypes(const vector<SchemaValue>& vals, stri
return xmlschema;
}
static std::string _GetDSOptionsForType(const threading::Field *field)
std::string DataSeries::GetDSOptionsForType(const threading::Field *field)
{
switch(field->type)
{
case TYPE_TIME:
case TYPE_INTERVAL:
return "pack_relative=\"" + std::string(field->name) + "\"";
{
std::string s = "pack_relative=\"" + std::string(field->name) + "\"";
if ( ! ds_use_integer_for_time )
s += " pack_scale=\"1000000\"";
return s;
}
case TYPE_SUBNET:
case TYPE_ADDR:
case TYPE_ENUM:
@ -300,16 +242,40 @@ static std::string _GetDSOptionsForType(const threading::Field *field)
DataSeries::DataSeries(WriterFrontend* frontend) : WriterBackend(frontend)
{
ds_compression = string((const char *)BifConst::LogDataSeries::ds_compression->Bytes(), BifConst::LogDataSeries::ds_compression->Len());
ds_dump_schema = BifConst::LogDataSeries::ds_dump_schema;
ds_extent_size = BifConst::LogDataSeries::ds_extent_size;
ds_num_threads = BifConst::LogDataSeries::ds_num_threads;
ds_compression = string((const char *)BifConst::LogDataSeries::compression->Bytes(), BifConst::LogDataSeries::compression->Len());
ds_dump_schema = BifConst::LogDataSeries::dump_schema;
ds_extent_size = BifConst::LogDataSeries::extent_size;
ds_num_threads = BifConst::LogDataSeries::num_threads;
ds_use_integer_for_time = BifConst::LogDataSeries::use_integer_for_time;
}
DataSeries::~DataSeries()
{
}
bool DataSeries::OpenLog(string path)
{
log_file = new DataSeriesSink(path + ".ds", compress_type);
log_file->writeExtentLibrary(log_types);
for(size_t i = 0; i < schema_list.size(); ++i)
extents.insert(std::make_pair(schema_list[i].field_name, GeneralField::create(log_series, schema_list[i].field_name)));
if(ds_extent_size < ROW_MIN)
{
fprintf(stderr, "%d is not a valid value for 'rows'. Using min of %d instead.\n", (int)ds_extent_size, (int)ROW_MIN);
ds_extent_size = ROW_MIN;
}
else if(ds_extent_size > ROW_MAX)
{
fprintf(stderr, "%d is not a valid value for 'rows'. Using max of %d instead.\n", (int)ds_extent_size, (int)ROW_MAX);
ds_extent_size = ROW_MAX;
}
log_output = new OutputModule(*log_file, log_series, log_type, ds_extent_size);
return true;
}
bool DataSeries::DoInit(string path, int num_fields, const threading::Field* const * fields)
{
// We first construct an XML schema thing (and, if ds_dump_schema is
@ -333,14 +299,18 @@ bool DataSeries::DoInit(string path, int num_fields, const threading::Field* con
{
DataSeriesSink::setCompressorCount(ds_num_threads);
}
vector<SchemaValue> schema_list;
for ( int i = 0; i < num_fields; i++ )
{
const threading::Field* field = fields[i];
SchemaValue val(field);
SchemaValue val;
val.ds_type = GetDSFieldType(field);
val.field_name = string(field->name);
val.field_options = GetDSOptionsForType(field);
val.bro_type = GetBroTypeString(field);
schema_list.push_back(val);
}
string schema = _BuildDSSchemaFromFieldTypes(schema_list, path);
string schema = BuildDSSchemaFromFieldTypes(schema_list, path);
if(ds_dump_schema)
{
FILE * pFile;
@ -353,7 +323,7 @@ bool DataSeries::DoInit(string path, int num_fields, const threading::Field* con
fclose (pFile);
}
int compress_type = Extent::compress_all;
compress_type = Extent::compress_all;
if(ds_compression == "lzf")
{
@ -385,28 +355,11 @@ bool DataSeries::DoInit(string path, int num_fields, const threading::Field* con
fprintf(stderr, "Defaulting to 'any'\n");
}
log_type = const_cast<ExtentType *>(log_types.registerType(schema));
log_type = const_cast<ExtentType *>(log_types.registerType(schema));
log_series.setType(*log_type);
log_file = new DataSeriesSink(path + ".ds", compress_type);
log_file->writeExtentLibrary(log_types);
for(size_t i = 0; i < schema_list.size(); ++i)
extents.insert(std::make_pair(schema_list[i].field_name, GeneralField::create(log_series, schema_list[i].field_name)));
if(ds_extent_size < ROW_MIN)
{
fprintf(stderr, "%d is not a valid value for 'rows'. Using min of %d instead.\n", (int)ds_extent_size, (int)ROW_MIN);
ds_extent_size = ROW_MIN;
}
else if(ds_extent_size > ROW_MAX)
{
fprintf(stderr, "%d is not a valid value for 'rows'. Using max of %d instead.\n", (int)ds_extent_size, (int)ROW_MAX);
ds_extent_size = ROW_MAX;
}
log_output = new OutputModule(*log_file, log_series, log_type, ds_extent_size);
return true;
return OpenLog(path);
}
@ -416,18 +369,26 @@ bool DataSeries::DoFlush()
return true;
}
bool DataSeries::DoFinish()
{
for(ExtentIterator iter = extents.begin();
iter != extents.end(); ++iter)
{
void DataSeries::CloseLog()
{
for( ExtentIterator iter = extents.begin(); iter != extents.end(); ++iter )
delete iter->second;
}
extents.clear();
// Don't delete the file before you delete the output, or bad things happen.
// Don't delete the file before you delete the output, or bad things
// happen.
delete log_output;
delete log_file;
log_output = 0;
log_file = 0;
}
bool DataSeries::DoFinish()
{
CloseLog();
return WriterBackend::DoFinish();
}
@ -453,8 +414,7 @@ bool DataSeries::DoWrite(int num_fields, const threading::Field* const * fields,
bool DataSeries::DoRotate(string rotated_path, double open, double close, bool terminating)
{
// Note that if DS files are rotated too often, the aggregate log size will be (much) larger.
DoFinish();
CloseLog();
string dsname = Path() + ".ds";
string nname = rotated_path + ".ds";
@ -466,7 +426,7 @@ bool DataSeries::DoRotate(string rotated_path, double open, double close, bool t
return false;
}
return DoInit(Path(), NumFields(), Fields());
return OpenLog(Path());
}
bool DataSeries::DoSetBuf(bool enabled)

View file

@ -42,24 +42,83 @@ private:
static const size_t THREAD_MAX = 128; // Maximum number of compression threads that DataSeries may spawn.
static const size_t TIME_SCALE = 1000000; // Fixed-point multiplier for time values when converted to integers.
struct SchemaValue
{
string ds_type;
string bro_type;
string field_name;
string field_options;
};
/**
* Turns a log value into a std::string. Uses an ostringstream to do the
* heavy lifting, but still need to switch on the type to know which value
* in the union to give to the string string for processing.
*
* @param val The value we wish to convert to a string
* @return the string value of val
*/
std::string LogValueToString(threading::Value *val);
/**
* Takes a field type and converts it to a relevant DataSeries type.
*
* @param field We extract the type from this and convert it into a relevant DS type.
* @return String representation of type that DataSeries can understand.
*/
string GetDSFieldType(const threading::Field *field);
/**
* Are there any options we should put into the XML schema?
*
* @param field We extract the type from this and return any options that make sense for that type.
* @return Options that can be added directly to the XML (e.g. "pack_relative=\"yes\"")
*/
std::string GetDSOptionsForType(const threading::Field *field);
/**
* Takes a list of types, a list of names, and a title, and uses it to construct a valid DataSeries XML schema
* thing, which is then returned as a std::string
*
* @param opts std::vector of strings containing a list of options to be appended to each field (e.g. "pack_relative=yes")
* @param sTitle Name of this schema. Ideally, these schemas would be aggregated and re-used.
*/
string BuildDSSchemaFromFieldTypes(const vector<SchemaValue>& vals, string sTitle);
/**
* Takes a field type and converts it to a readable string.
*
* @param field We extract the type from this and convert it into a readable string.
* @return String representation of the field's type
*/
string GetBroTypeString(const threading::Field *field);
/** Closes the currently open file. */
void CloseLog();
/** XXX */
bool OpenLog(string path);
typedef std::map<string, GeneralField *> ExtentMap;
typedef ExtentMap::iterator ExtentIterator;
// Internal DataSeries structures we need to keep track of.
DataSeriesSink* log_file;
vector<SchemaValue> schema_list;
ExtentTypeLibrary log_types;
ExtentType *log_type;
ExtentSeries log_series;
OutputModule* log_output;
ExtentMap extents;
int compress_type;
DataSeriesSink* log_file;
OutputModule* log_output;
// Options set from the script-level.
uint64 ds_extent_size;
uint64 ds_num_threads;
string ds_compression;
bool ds_dump_schema;
bool ds_use_integer_for_time;
};
}