diff --git a/src/Type.cc b/src/Type.cc index 82221303af..d688b15376 100644 --- a/src/Type.cc +++ b/src/Type.cc @@ -15,10 +15,9 @@ extern int generate_documentation; +// Note: This function must be thread-safe. const char* type_name(TypeTag t) { - static char errbuf[512]; - static const char* type_names[int(NUM_TYPES)] = { "void", "bool", "int", "count", "counter", @@ -37,10 +36,7 @@ const char* type_name(TypeTag t) }; if ( int(t) >= NUM_TYPES ) - { - snprintf(errbuf, sizeof(errbuf), "%d: not a type tag", int(t)); - return errbuf; - } + return "type_name(): not a type tag"; return type_names[int(t)]; } diff --git a/src/logging/writers/Ascii.cc b/src/logging/writers/Ascii.cc index 2f25ac418f..3a35eea380 100644 --- a/src/logging/writers/Ascii.cc +++ b/src/logging/writers/Ascii.cc @@ -86,6 +86,9 @@ bool Ascii::DoInit(string path, int num_fields, const Field* const * fields) if ( include_header ) { + string names; + string types; + string str = string(header_prefix, header_prefix_len) + "separator " // Always use space as separator here. + get_escaped_string(string(separator, separator_len), false) @@ -103,9 +106,6 @@ bool Ascii::DoInit(string path, int num_fields, const Field* const * fields) WriteHeaderField("path", get_escaped_string(path, false))) ) goto write_error; - string names; - string types; - for ( int i = 0; i < num_fields; ++i ) { if ( i > 0 ) @@ -114,15 +114,8 @@ bool Ascii::DoInit(string path, int num_fields, const Field* const * fields) types += string(separator, separator_len); } - const Field* field = fields[i]; - names += field->name; - types += type_name(field->type); - if ( (field->type == TYPE_TABLE) || (field->type == TYPE_VECTOR) ) - { - types += "["; - types += type_name(field->subtype); - types += "]"; - } + names += fields[i]->name; + types += fields[i]->TypeName(); } if ( ! (WriteHeaderField("fields", names) diff --git a/src/logging/writers/DataSeries.cc b/src/logging/writers/DataSeries.cc index 5ee8a812da..f6b26dc494 100644 --- a/src/logging/writers/DataSeries.cc +++ b/src/logging/writers/DataSeries.cc @@ -15,17 +15,15 @@ using namespace logging; using namespace writer; std::string DataSeries::LogValueToString(threading::Value *val) -{ - const int strsz = 1024; - char strbuf[strsz]; - - // In some cases, no value is attached. If this is the case, return an empty string. - if(!val->present) + { + // In some cases, no value is attached. If this is the case, return + // an empty string. + if( ! val->present ) return ""; std::ostringstream ostr; - switch(val->type) - { + + switch(val->type) { case TYPE_BOOL: return (val->val.int_val ? "true" : "false"); @@ -40,19 +38,22 @@ std::string DataSeries::LogValueToString(threading::Value *val) return ostr.str(); case TYPE_SUBNET: - ostr << Render(val->val.subnet_val); + ostr << Render(val->val.subnet_val); return ostr.str(); case TYPE_ADDR: - ostr << Render(val->val.addr_val); + ostr << Render(val->val.addr_val); return ostr.str(); - // Note: These two cases are relatively special. We need to convert these values into their integer equivalents - // to maximize precision. At the moment, there won't be a noticeable effect (Bro uses the double format everywhere - // internally, so we've already lost the precision we'd gain here), but timestamps may eventually switch to this - // representation within Bro. + // Note: These two cases are relatively special. We need to convert + // these values into their integer equivalents to maximize precision. + // At the moment, there won't be a noticeable effect (Bro uses the + // double format everywhere internally, so we've already lost the + // precision we'd gain here), but timestamps may eventually switch to + // this representation within Bro. // - // in the near-term, this *should* lead to better pack_relative (and thus smaller output files). + // In the near-term, this *should* lead to better pack_relative (and + // thus smaller output files). case TYPE_TIME: case TYPE_INTERVAL: if ( ds_use_integer_for_time ) @@ -69,59 +70,57 @@ std::string DataSeries::LogValueToString(threading::Value *val) case TYPE_ENUM: case TYPE_STRING: case TYPE_FILE: - { - int size = val->val.string_val->size(); - string tmpString = ""; - if(size) - tmpString = string(val->val.string_val->data(), val->val.string_val->size()); - else - tmpString = string(""); - return tmpString; - } - case TYPE_TABLE: - { - if ( ! val->val.set_val.size ) - { + case TYPE_FUNC: + if ( ! val->val.string_val->size() ) + return ""; + + return string(val->val.string_val->data(), val->val.string_val->size()); + + case TYPE_TABLE: + { + if ( ! val->val.set_val.size ) return ""; - } string tmpString = ""; + for ( int j = 0; j < val->val.set_val.size; j++ ) { if ( j > 0 ) - tmpString += ":"; //TODO: Specify set separator char in configuration. + tmpString += ds_set_separator; tmpString += LogValueToString(val->val.set_val.vals[j]); } + return tmpString; - } + } + case TYPE_VECTOR: - { + { if ( ! val->val.vector_val.size ) - { return ""; - } string tmpString = ""; + for ( int j = 0; j < val->val.vector_val.size; j++ ) { if ( j > 0 ) - tmpString += ":"; //TODO: Specify set separator char in configuration. + tmpString += ds_set_separator; tmpString += LogValueToString(val->val.vector_val.vals[j]); } return tmpString; - } + } + default: - return "???"; + InternalError(Fmt("unknown type %s in DataSeries::LogValueToString", type_name(val->type))); + return "cannot be reached"; } } string DataSeries::GetDSFieldType(const threading::Field *field) { - switch(field->type) - { + switch(field->type) { case TYPE_BOOL: return "bool"; @@ -145,75 +144,49 @@ string DataSeries::GetDSFieldType(const threading::Field *field) case TYPE_FILE: case TYPE_TABLE: case TYPE_VECTOR: - default: + case TYPE_FUNC: return "variable32"; - } -} - -string DataSeries::GetBroTypeString(const threading::Field *field) -{ - switch(field->type) - { - case TYPE_BOOL: - return "bool"; - case TYPE_COUNT: - return "count"; - case TYPE_COUNTER: - return "counter"; - case TYPE_PORT: - return "port"; - case TYPE_INT: - return "int"; - case TYPE_TIME: - return "time"; - case TYPE_INTERVAL: - return "interval"; - case TYPE_DOUBLE: - return "double"; - case TYPE_SUBNET: - return "subnet"; - case TYPE_ADDR: - return "addr"; - case TYPE_ENUM: - return "enum"; - case TYPE_STRING: - return "string"; - case TYPE_FILE: - return "file"; - case TYPE_TABLE: - return "table"; - case TYPE_VECTOR: - return "vector"; default: - return "???"; + InternalError(Fmt("unknown type %s in DataSeries::GetDSFieldType", type_name(field->type))); + return "cannot be reached"; } } string DataSeries::BuildDSSchemaFromFieldTypes(const vector& vals, string sTitle) -{ - if("" == sTitle) - { + { + if( ! sTitle.size() ) sTitle = "GenericBroStream"; - } - string xmlschema; - xmlschema = "\n"; - for(size_t i = 0; i < vals.size(); ++i) + + string xmlschema = "\n"; + + for( size_t i = 0; i < vals.size(); ++i ) { - xmlschema += "\t\n"; + xmlschema += "\t\n"; } + xmlschema += "\n"; - for(size_t i = 0; i < vals.size(); ++i) + + for( size_t i = 0; i < vals.size(); ++i ) { - xmlschema += "\n"; + xmlschema += "\n"; } + return xmlschema; } std::string DataSeries::GetDSOptionsForType(const threading::Field *field) { - switch(field->type) - { + switch( field->type ) { case TYPE_TIME: case TYPE_INTERVAL: { @@ -233,6 +206,7 @@ std::string DataSeries::GetDSOptionsForType(const threading::Field *field) case TYPE_TABLE: case TYPE_VECTOR: return "pack_unique=\"yes\""; + default: return ""; } @@ -242,11 +216,13 @@ std::string DataSeries::GetDSOptionsForType(const threading::Field *field) DataSeries::DataSeries(WriterFrontend* frontend) : WriterBackend(frontend) { - ds_compression = string((const char *)BifConst::LogDataSeries::compression->Bytes(), BifConst::LogDataSeries::compression->Len()); + ds_compression = string((const char *)BifConst::LogDataSeries::compression->Bytes(), + BifConst::LogDataSeries::compression->Len()); ds_dump_schema = BifConst::LogDataSeries::dump_schema; ds_extent_size = BifConst::LogDataSeries::extent_size; ds_num_threads = BifConst::LogDataSeries::num_threads; ds_use_integer_for_time = BifConst::LogDataSeries::use_integer_for_time; + ds_set_separator = ","; } DataSeries::~DataSeries() @@ -258,20 +234,23 @@ bool DataSeries::OpenLog(string path) log_file = new DataSeriesSink(path + ".ds", compress_type); log_file->writeExtentLibrary(log_types); - for(size_t i = 0; i < schema_list.size(); ++i) - extents.insert(std::make_pair(schema_list[i].field_name, GeneralField::create(log_series, schema_list[i].field_name))); + for( size_t i = 0; i < schema_list.size(); ++i ) + extents.insert(std::make_pair(schema_list[i].field_name, + GeneralField::create(log_series, schema_list[i].field_name))); - if(ds_extent_size < ROW_MIN) + if ( ds_extent_size < ROW_MIN ) { - fprintf(stderr, "%d is not a valid value for 'rows'. Using min of %d instead.\n", (int)ds_extent_size, (int)ROW_MIN); - ds_extent_size = ROW_MIN; + Warning(Fmt("%d is not a valid value for 'rows'. Using min of %d instead", (int)ds_extent_size, (int)ROW_MIN)); + ds_extent_size = ROW_MIN; } - else if(ds_extent_size > ROW_MAX) + + else if( ds_extent_size > ROW_MAX ) { - fprintf(stderr, "%d is not a valid value for 'rows'. Using max of %d instead.\n", (int)ds_extent_size, (int)ROW_MAX); - ds_extent_size = ROW_MAX; + Warning(Fmt("%d is not a valid value for 'rows'. Using max of %d instead", (int)ds_extent_size, (int)ROW_MAX)); + ds_extent_size = ROW_MAX; } - log_output = new OutputModule(*log_file, log_series, log_type, ds_extent_size); + + log_output = new OutputModule(*log_file, log_series, *log_type, ds_extent_size); return true; } @@ -283,22 +262,22 @@ bool DataSeries::DoInit(string path, int num_fields, const threading::Field* con // use that schema to build our output logfile and prepare it to be // written to. - // Note: compressor count must be set *BEFORE* DataSeriesSink is instantiated. - if(ds_num_threads < THREAD_MIN && ds_num_threads != 0) + // Note: compressor count must be set *BEFORE* DataSeriesSink is + // instantiated. + if( ds_num_threads < THREAD_MIN && ds_num_threads != 0 ) { - fprintf(stderr, "%d is too few threads! Using %d instead\n", (int)ds_num_threads, (int)THREAD_MIN); + Warning(Fmt("%d is too few threads! Using %d instead", (int)ds_num_threads, (int)THREAD_MIN)); ds_num_threads = THREAD_MIN; } - if(ds_num_threads > THREAD_MAX) + + if( ds_num_threads > THREAD_MAX ) { - fprintf(stderr, "%d is too many threads! Dropping back to %d\n", (int)ds_num_threads, (int)THREAD_MAX); + Warning(Fmt("%d is too many threads! Dropping back to %d", (int)ds_num_threads, (int)THREAD_MAX)); ds_num_threads = THREAD_MAX; } - if(ds_num_threads > 0) - { + if( ds_num_threads > 0 ) DataSeriesSink::setCompressorCount(ds_num_threads); - } for ( int i = 0; i < num_fields; i++ ) { @@ -307,65 +286,59 @@ bool DataSeries::DoInit(string path, int num_fields, const threading::Field* con val.ds_type = GetDSFieldType(field); val.field_name = string(field->name); val.field_options = GetDSOptionsForType(field); - val.bro_type = GetBroTypeString(field); + val.bro_type = field->TypeName(); schema_list.push_back(val); } + string schema = BuildDSSchemaFromFieldTypes(schema_list, path); - if(ds_dump_schema) + + if( ds_dump_schema ) { - FILE * pFile; - pFile = fopen ( string(path + ".ds.xml").c_str() , "wb" ); - if(NULL == pFile) + FILE* pFile = fopen ( string(path + ".ds.xml").c_str() , "wb" ); + + if( pFile ) { - perror("Could not dump schema"); + fwrite(schema.c_str(), 1, schema.length(), pFile); + fclose(pFile); } - fwrite (schema.c_str(), 1 , schema.length() , pFile ); - fclose (pFile); + + else + Error(Fmt("cannot dump schema: %s", strerror(errno))); } compress_type = Extent::compress_all; - if(ds_compression == "lzf") - { + if( ds_compression == "lzf" ) compress_type = Extent::compress_lzf; - } - else if(ds_compression == "lzo") - { + + else if( ds_compression == "lzo" ) compress_type = Extent::compress_lzo; - } - else if(ds_compression == "gz") - { + + else if( ds_compression == "gz" ) compress_type = Extent::compress_gz; - } - else if(ds_compression == "bz2") - { + + else if( ds_compression == "bz2" ) compress_type = Extent::compress_bz2; - } - else if(ds_compression == "none") - { + + else if( ds_compression == "none" ) compress_type = Extent::compress_none; - } - else if(ds_compression == "any") - { + + else if( ds_compression == "any" ) compress_type = Extent::compress_all; - } + else - { - fprintf(stderr, "%s is not a valid compression type. Valid types are: 'lzf', 'lzo', 'gz', 'bz2', 'none', 'any'\n", ds_compression.c_str()); - fprintf(stderr, "Defaulting to 'any'\n"); - } + Warning(Fmt("%s is not a valid compression type. Valid types are: 'lzf', 'lzo', 'gz', 'bz2', 'none', 'any'. Defaulting to 'any'", ds_compression.c_str())); log_type = const_cast(log_types.registerType(schema)); - log_series.setType(*log_type); return OpenLog(path); - } bool DataSeries::DoFlush() { - // Flushing is handled by DataSeries automatically, so this function doesn't do anything. + // Flushing is handled by DataSeries automatically, so this function + // doesn't do anything. return true; } @@ -377,7 +350,7 @@ void DataSeries::CloseLog() extents.clear(); // Don't delete the file before you delete the output, or bad things - // happen. + // will happen. delete log_output; delete log_file; @@ -396,14 +369,17 @@ bool DataSeries::DoWrite(int num_fields, const threading::Field* const * fields, threading::Value** vals) { log_output->newRecord(); - for(size_t i = 0; i < (size_t)num_fields; ++i) + + for( size_t i = 0; i < (size_t)num_fields; ++i ) { ExtentIterator iter = extents.find(fields[i]->name); assert(iter != extents.end()); + if( iter != extents.end() ) { GeneralField *cField = iter->second; - if(vals[i]->present) + + if( vals[i]->present ) cField->set(LogValueToString(vals[i])); } } @@ -413,7 +389,8 @@ bool DataSeries::DoWrite(int num_fields, const threading::Field* const * fields, bool DataSeries::DoRotate(string rotated_path, double open, double close, bool terminating) { - // Note that if DS files are rotated too often, the aggregate log size will be (much) larger. + // Note that if DS files are rotated too often, the aggregate log + // size will be (much) larger. CloseLog(); string dsname = Path() + ".ds"; diff --git a/src/logging/writers/DataSeries.h b/src/logging/writers/DataSeries.h index 319cb72ec5..5faa87e1b2 100644 --- a/src/logging/writers/DataSeries.h +++ b/src/logging/writers/DataSeries.h @@ -6,13 +6,13 @@ #ifndef LOGGING_WRITER_DATA_SERIES_H #define LOGGING_WRITER_DATA_SERIES_H -#include "../WriterBackend.h" - #include #include #include #include +#include "../WriterBackend.h" + namespace logging { namespace writer { class DataSeries : public WriterBackend { @@ -24,6 +24,8 @@ public: { return new DataSeries(frontend); } protected: + // Overidden from WriterBackend. + virtual bool DoInit(string path, int num_fields, const threading::Field* const * fields); @@ -36,11 +38,11 @@ protected: virtual bool DoFinish(); private: - static const size_t ROW_MIN = 2048; // Minimum extent size. - static const size_t ROW_MAX = (1024 * 1024 * 100); // Maximum extent size. - static const size_t THREAD_MIN = 1; // Minimum number of compression threads that DataSeries may spawn. - static const size_t THREAD_MAX = 128; // Maximum number of compression threads that DataSeries may spawn. - static const size_t TIME_SCALE = 1000000; // Fixed-point multiplier for time values when converted to integers. + static const size_t ROW_MIN = 2048; // Minimum extent size. + static const size_t ROW_MAX = (1024 * 1024 * 100); // Maximum extent size. + static const size_t THREAD_MIN = 1; // Minimum number of compression threads that DataSeries may spawn. + static const size_t THREAD_MAX = 128; // Maximum number of compression threads that DataSeries may spawn. + static const size_t TIME_SCALE = 1000000; // Fixed-point multiplier for time values when converted to integers. struct SchemaValue { @@ -85,18 +87,10 @@ private: */ string BuildDSSchemaFromFieldTypes(const vector& vals, string sTitle); - /** - * Takes a field type and converts it to a readable string. - * - * @param field We extract the type from this and convert it into a readable string. - * @return String representation of the field's type - */ - string GetBroTypeString(const threading::Field *field); - /** Closes the currently open file. */ void CloseLog(); - /** XXX */ + /** Opens a new file. */ bool OpenLog(string path); typedef std::map ExtentMap; @@ -119,6 +113,7 @@ private: string ds_compression; bool ds_dump_schema; bool ds_use_integer_for_time; + string ds_set_separator; }; } diff --git a/src/threading/SerialTypes.cc b/src/threading/SerialTypes.cc index a5692b2ffd..5ab61b0d41 100644 --- a/src/threading/SerialTypes.cc +++ b/src/threading/SerialTypes.cc @@ -24,6 +24,20 @@ bool Field::Write(SerializationFormat* fmt) const return (fmt->Write(name, "name") && fmt->Write((int)type, "type") && fmt->Write((int)subtype, "subtype")); } +string Field::TypeName() const + { + string n = type_name(type); + + if ( (type == TYPE_TABLE) || (type == TYPE_VECTOR) ) + { + n += "["; + n += type_name(subtype); + n += "]"; + } + + return n; + } + Value::~Value() { if ( (type == TYPE_ENUM || type == TYPE_STRING || type == TYPE_FILE || type == TYPE_FUNC) diff --git a/src/threading/SerialTypes.h b/src/threading/SerialTypes.h index db7dc837bd..eee3b750fe 100644 --- a/src/threading/SerialTypes.h +++ b/src/threading/SerialTypes.h @@ -53,6 +53,12 @@ struct Field { * @return False if an error occured. */ bool Write(SerializationFormat* fmt) const; + + /** + * Returns a textual description of the field's type. This method is + * thread-safe. + */ + string TypeName() const; }; /** @@ -132,8 +138,8 @@ struct Value { /** * Returns true if the type can be represented by a Value. If - * `atomic_only` is true, will not permit composite types. - */ + * `atomic_only` is true, will not permit composite types. This + * method is thread-safe. */ static bool IsCompatibleType(BroType* t, bool atomic_only=false); private: