diff --git a/src/Attr.cc b/src/Attr.cc index b877250f52..40c6c1a75c 100644 --- a/src/Attr.cc +++ b/src/Attr.cc @@ -5,7 +5,7 @@ #include "Attr.h" #include "Expr.h" #include "Serializer.h" -#include "logging/Manager.h" +#include "threading/SerializationTypes.h" const char* attr_name(attr_tag t) { @@ -416,7 +416,7 @@ void Attributes::CheckAttr(Attr* a) break; case ATTR_LOG: - if ( ! logging::Value::IsCompatibleType(type) ) + if ( ! threading::Value::IsCompatibleType(type) ) Error("&log applied to a type that cannot be logged"); break; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 61a4847b70..7a3cc4babf 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -411,6 +411,7 @@ set(bro_SRCS threading/BasicThread.cc threading/Manager.cc threading/MsgThread.cc + threading/SerializationTypes.cc logging/Manager.cc logging/WriterBackend.cc diff --git a/src/RemoteSerializer.cc b/src/RemoteSerializer.cc index a75812b42b..ba2598c018 100644 --- a/src/RemoteSerializer.cc +++ b/src/RemoteSerializer.cc @@ -184,6 +184,7 @@ #include "File.h" #include "Conn.h" #include "Reporter.h" +#include "threading/SerializationTypes.h" #include "logging/Manager.h" extern "C" { @@ -2476,7 +2477,7 @@ bool RemoteSerializer::ProcessRemotePrint() return true; } -bool RemoteSerializer::SendLogCreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Field* const * fields) +bool RemoteSerializer::SendLogCreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, const threading::Field* const * fields) { loop_over_list(peers, i) { @@ -2486,7 +2487,7 @@ bool RemoteSerializer::SendLogCreateWriter(EnumVal* id, EnumVal* writer, string return true; } -bool RemoteSerializer::SendLogCreateWriter(PeerID peer_id, EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Field* const * fields) +bool RemoteSerializer::SendLogCreateWriter(PeerID peer_id, EnumVal* id, EnumVal* writer, string path, int num_fields, const threading::Field* const * fields) { SetErrorDescr("logging"); @@ -2540,7 +2541,7 @@ error: return false; } -bool RemoteSerializer::SendLogWrite(EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Value* const * vals) +bool RemoteSerializer::SendLogWrite(EnumVal* id, EnumVal* writer, string path, int num_fields, const threading::Value* const * vals) { loop_over_list(peers, i) { @@ -2550,7 +2551,7 @@ bool RemoteSerializer::SendLogWrite(EnumVal* id, EnumVal* writer, string path, i return true; } -bool RemoteSerializer::SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Value* const * vals) +bool RemoteSerializer::SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const threading::Value* const * vals) { if ( peer->phase != Peer::HANDSHAKE && peer->phase != Peer::RUNNING ) return false; @@ -2641,7 +2642,7 @@ bool RemoteSerializer::ProcessLogCreateWriter() EnumVal* id_val = 0; EnumVal* writer_val = 0; - logging::Field** fields = 0; + threading::Field** fields = 0; BinarySerializationFormat fmt; fmt.StartRead(current_args->data, current_args->len); @@ -2658,11 +2659,11 @@ bool RemoteSerializer::ProcessLogCreateWriter() if ( ! success ) goto error; - fields = new logging::Field* [num_fields]; + fields = new threading::Field* [num_fields]; for ( int i = 0; i < num_fields; i++ ) { - fields[i] = new logging::Field; + fields[i] = new threading::Field; if ( ! fields[i]->Read(&fmt) ) goto error; } @@ -2703,7 +2704,7 @@ bool RemoteSerializer::ProcessLogWrite() // Unserialize one entry. EnumVal* id_val = 0; EnumVal* writer_val = 0; - logging::Value** vals = 0; + threading::Value** vals = 0; int id, writer; string path; @@ -2717,11 +2718,11 @@ bool RemoteSerializer::ProcessLogWrite() if ( ! success ) goto error; - vals = new logging::Value* [num_fields]; + vals = new threading::Value* [num_fields]; for ( int i = 0; i < num_fields; i++ ) { - vals[i] = new logging::Value; + vals[i] = new threading::Value; if ( ! vals[i]->Read(&fmt) ) goto error; } diff --git a/src/RemoteSerializer.h b/src/RemoteSerializer.h index ba0bde7d41..571fa72d39 100644 --- a/src/RemoteSerializer.h +++ b/src/RemoteSerializer.h @@ -15,7 +15,7 @@ class IncrementalSendTimer; -namespace logging { +namespace threading { class Field; class Value; } @@ -102,13 +102,13 @@ public: bool SendPrintHookEvent(BroFile* f, const char* txt, size_t len); // Send a request to create a writer on a remote side. - bool SendLogCreateWriter(PeerID peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Field* const * fields); + bool SendLogCreateWriter(PeerID peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const threading::Field* const * fields); // Broadcasts a request to create a writer. - bool SendLogCreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Field* const * fields); + bool SendLogCreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, const threading::Field* const * fields); // Broadcast a log entry to everybody interested. - bool SendLogWrite(EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Value* const * vals); + bool SendLogWrite(EnumVal* id, EnumVal* writer, string path, int num_fields, const threading::Value* const * vals); // Synchronzizes time with all connected peers. Returns number of // current sync-point, or -1 on error. @@ -303,7 +303,7 @@ protected: bool SendID(SerialInfo* info, Peer* peer, const ID& id); bool SendCapabilities(Peer* peer); bool SendPacket(SerialInfo* info, Peer* peer, const Packet& p); - bool SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const logging::Value* const * vals); + bool SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const threading::Value* const * vals); void UnregisterHandlers(Peer* peer); void RaiseEvent(EventHandlerPtr event, Peer* peer, const char* arg = 0); diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 6d53ea363f..2333d6c612 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -14,7 +14,11 @@ #include "writers/Ascii.h" #include "writers/None.h" +#include "threading/SerializationTypes.h" + using namespace logging; +using threading::Value; +using threading::Field; // Structure describing a log writer type. struct WriterDefinition { @@ -83,316 +87,6 @@ struct Manager::Stream { ~Stream(); }; -bool Field::Read(SerializationFormat* fmt) - { - int t; - int st; - - bool success = (fmt->Read(&name, "name") && fmt->Read(&t, "type") && fmt->Read(&st, "subtype") ); - type = (TypeTag) t; - subtype = (TypeTag) st; - - return success; - } - -bool Field::Write(SerializationFormat* fmt) const - { - return (fmt->Write(name, "name") && fmt->Write((int)type, "type") && fmt->Write((int)subtype, "subtype")); - } - -Value::~Value() - { - if ( (type == TYPE_ENUM || type == TYPE_STRING || type == TYPE_FILE || type == TYPE_FUNC) - && present ) - delete val.string_val; - - if ( type == TYPE_TABLE && present ) - { - for ( int i = 0; i < val.set_val.size; i++ ) - delete val.set_val.vals[i]; - - delete [] val.set_val.vals; - } - - if ( type == TYPE_VECTOR && present ) - { - for ( int i = 0; i < val.vector_val.size; i++ ) - delete val.vector_val.vals[i]; - - delete [] val.vector_val.vals; - } - } - -bool Value::IsCompatibleType(BroType* t, bool atomic_only) - { - if ( ! t ) - return false; - - switch ( t->Tag() ) { - case TYPE_BOOL: - case TYPE_INT: - case TYPE_COUNT: - case TYPE_COUNTER: - case TYPE_PORT: - case TYPE_SUBNET: - case TYPE_ADDR: - case TYPE_DOUBLE: - case TYPE_TIME: - case TYPE_INTERVAL: - case TYPE_ENUM: - case TYPE_STRING: - case TYPE_FILE: - case TYPE_FUNC: - return true; - - case TYPE_RECORD: - return ! atomic_only; - - case TYPE_TABLE: - { - if ( atomic_only ) - return false; - - if ( ! t->IsSet() ) - return false; - - return IsCompatibleType(t->AsSetType()->Indices()->PureType(), true); - } - - case TYPE_VECTOR: - { - if ( atomic_only ) - return false; - - return IsCompatibleType(t->AsVectorType()->YieldType(), true); - } - - default: - return false; - } - - return false; - } - -bool Value::Read(SerializationFormat* fmt) - { - int ty; - - if ( ! (fmt->Read(&ty, "type") && fmt->Read(&present, "present")) ) - return false; - - type = (TypeTag)(ty); - - if ( ! present ) - return true; - - switch ( type ) { - case TYPE_BOOL: - case TYPE_INT: - return fmt->Read(&val.int_val, "int"); - - case TYPE_COUNT: - case TYPE_COUNTER: - case TYPE_PORT: - return fmt->Read(&val.uint_val, "uint"); - - case TYPE_SUBNET: - { - uint32 net[4]; - if ( ! (fmt->Read(&net[0], "net0") && - fmt->Read(&net[1], "net1") && - fmt->Read(&net[2], "net2") && - fmt->Read(&net[3], "net3") && - fmt->Read(&val.subnet_val.width, "width")) ) - return false; - -#ifdef BROv6 - val.subnet_val.net[0] = net[0]; - val.subnet_val.net[1] = net[1]; - val.subnet_val.net[2] = net[2]; - val.subnet_val.net[3] = net[3]; -#else - val.subnet_val.net = net[0]; -#endif - return true; - } - - case TYPE_ADDR: - { - uint32 addr[4]; - if ( ! (fmt->Read(&addr[0], "addr0") && - fmt->Read(&addr[1], "addr1") && - fmt->Read(&addr[2], "addr2") && - fmt->Read(&addr[3], "addr3")) ) - return false; - - val.addr_val[0] = addr[0]; -#ifdef BROv6 - val.addr_val[1] = addr[1]; - val.addr_val[2] = addr[2]; - val.addr_val[3] = addr[3]; -#endif - return true; - } - - case TYPE_DOUBLE: - case TYPE_TIME: - case TYPE_INTERVAL: - return fmt->Read(&val.double_val, "double"); - - case TYPE_ENUM: - case TYPE_STRING: - case TYPE_FILE: - case TYPE_FUNC: - { - val.string_val = new string; - return fmt->Read(val.string_val, "string"); - } - - case TYPE_TABLE: - { - if ( ! fmt->Read(&val.set_val.size, "set_size") ) - return false; - - val.set_val.vals = new Value* [val.set_val.size]; - - for ( int i = 0; i < val.set_val.size; ++i ) - { - val.set_val.vals[i] = new Value; - - if ( ! val.set_val.vals[i]->Read(fmt) ) - return false; - } - - return true; - } - - case TYPE_VECTOR: - { - if ( ! fmt->Read(&val.vector_val.size, "vector_size") ) - return false; - - val.vector_val.vals = new Value* [val.vector_val.size]; - - for ( int i = 0; i < val.vector_val.size; ++i ) - { - val.vector_val.vals[i] = new Value; - - if ( ! val.vector_val.vals[i]->Read(fmt) ) - return false; - } - - return true; - } - - default: - reporter->InternalError("unsupported type %s in Value::Write", type_name(type)); - } - - return false; - } - -bool Value::Write(SerializationFormat* fmt) const - { - if ( ! (fmt->Write((int)type, "type") && - fmt->Write(present, "present")) ) - return false; - - if ( ! present ) - return true; - - switch ( type ) { - case TYPE_BOOL: - case TYPE_INT: - return fmt->Write(val.int_val, "int"); - - case TYPE_COUNT: - case TYPE_COUNTER: - case TYPE_PORT: - return fmt->Write(val.uint_val, "uint"); - - case TYPE_SUBNET: - { - uint32 net[4]; -#ifdef BROv6 - net[0] = val.subnet_val.net[0]; - net[1] = val.subnet_val.net[1]; - net[2] = val.subnet_val.net[2]; - net[3] = val.subnet_val.net[3]; -#else - net[0] = val.subnet_val.net; - net[1] = net[2] = net[3] = 0; -#endif - return fmt->Write(net[0], "net0") && - fmt->Write(net[1], "net1") && - fmt->Write(net[2], "net2") && - fmt->Write(net[3], "net3") && - fmt->Write(val.subnet_val.width, "width"); - } - - case TYPE_ADDR: - { - uint32 addr[4]; - addr[0] = val.addr_val[0]; -#ifdef BROv6 - addr[1] = val.addr_val[1]; - addr[2] = val.addr_val[2]; - addr[3] = val.addr_val[3]; -#else - addr[1] = addr[2] = addr[3] = 0; -#endif - return fmt->Write(addr[0], "addr0") && - fmt->Write(addr[1], "addr1") && - fmt->Write(addr[2], "addr2") && - fmt->Write(addr[3], "addr3"); - } - - case TYPE_DOUBLE: - case TYPE_TIME: - case TYPE_INTERVAL: - return fmt->Write(val.double_val, "double"); - - case TYPE_ENUM: - case TYPE_STRING: - case TYPE_FILE: - case TYPE_FUNC: - return fmt->Write(*val.string_val, "string"); - - case TYPE_TABLE: - { - if ( ! fmt->Write(val.set_val.size, "set_size") ) - return false; - - for ( int i = 0; i < val.set_val.size; ++i ) - { - if ( ! val.set_val.vals[i]->Write(fmt) ) - return false; - } - - return true; - } - - case TYPE_VECTOR: - { - if ( ! fmt->Write(val.vector_val.size, "vector_size") ) - return false; - - for ( int i = 0; i < val.vector_val.size; ++i ) - { - if ( ! val.vector_val.vals[i]->Write(fmt) ) - return false; - } - - return true; - } - - default: - reporter->InternalError("unsupported type %s in Value::REad", type_name(type)); - } - - return false; - } - Manager::Filter::~Filter() { for ( int i = 0; i < num_fields; ++i ) @@ -552,7 +246,7 @@ bool Manager::CreateStream(EnumVal* id, RecordVal* sval) if ( ! (columns->FieldDecl(i)->FindAttr(ATTR_LOG)) ) continue; - if ( ! Value::IsCompatibleType(columns->FieldType(i)) ) + if ( ! threading::Value::IsCompatibleType(columns->FieldType(i)) ) { reporter->Error("type of field '%s' is not support for logging output", columns->FieldName(i)); @@ -1089,7 +783,7 @@ bool Manager::Write(EnumVal* id, RecordVal* columns) if ( filter->local || filter->remote ) { - Value** vals = RecordToFilterVals(stream, filter, columns); + threading::Value** vals = RecordToFilterVals(stream, filter, columns); if ( filter->remote ) remote_serializer->SendLogWrite(stream->id, @@ -1125,15 +819,15 @@ bool Manager::Write(EnumVal* id, RecordVal* columns) return true; } -Value* Manager::ValToLogVal(Val* val, BroType* ty) +threading::Value* Manager::ValToLogVal(Val* val, BroType* ty) { if ( ! ty ) ty = val->Type(); if ( ! val ) - return new Value(ty->Tag(), false); + return new threading::Value(ty->Tag(), false); - Value* lval = new Value(ty->Tag()); + threading::Value* lval = new threading::Value(ty->Tag()); switch ( lval->type ) { case TYPE_BOOL: @@ -1213,7 +907,7 @@ Value* Manager::ValToLogVal(Val* val, BroType* ty) set = new ListVal(TYPE_INT); lval->val.set_val.size = set->Length(); - lval->val.set_val.vals = new Value* [lval->val.set_val.size]; + lval->val.set_val.vals = new threading::Value* [lval->val.set_val.size]; for ( int i = 0; i < lval->val.set_val.size; i++ ) lval->val.set_val.vals[i] = ValToLogVal(set->Index(i)); @@ -1227,7 +921,7 @@ Value* Manager::ValToLogVal(Val* val, BroType* ty) VectorVal* vec = val->AsVectorVal(); lval->val.vector_val.size = vec->Size(); lval->val.vector_val.vals = - new Value* [lval->val.vector_val.size]; + new threading::Value* [lval->val.vector_val.size]; for ( int i = 0; i < lval->val.vector_val.size; i++ ) { diff --git a/src/logging/Manager.h b/src/logging/Manager.h index f6829b3554..c5d1a9fc2d 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -15,118 +15,6 @@ class RotationTimer; namespace logging { -/** - * Definition of a log file, i.e., one column of a log stream. - */ -struct Field { - string name; //! Name of the field. - TypeTag type; //! Type of the field. - TypeTag subtype; //! Inner type for sets. - - /** - * Constructor. - */ - Field() { subtype = TYPE_VOID; } - - /** - * Copy constructor. - */ - Field(const Field& other) - : name(other.name), type(other.type), subtype(other.subtype) { } - - /** - * Unserializes a field. - * - * @param fmt The serialization format to use. The format handles - * low-level I/O. - * - * @return False if an error occured. - */ - bool Read(SerializationFormat* fmt); - - /** - * Serializes a field. - * - * @param fmt The serialization format to use. The format handles - * low-level I/O. - * - * @return False if an error occured. - */ - bool Write(SerializationFormat* fmt) const; -}; - -/** - * Definition of a log value, i.e., a entry logged by a stream. - * - * This struct essentialy represents a serialization of a Val instance (for - * those Vals supported). - */ -struct Value { - TypeTag type; //! The type of the value. - bool present; //! False for optional record fields that are not set. - - struct set_t { bro_int_t size; Value** vals; }; - typedef set_t vec_t; - - /** - * This union is a subset of BroValUnion, including only the types we - * can log directly. See IsCompatibleType(). - */ - union _val { - bro_int_t int_val; - bro_uint_t uint_val; - uint32 addr_val[NUM_ADDR_WORDS]; - subnet_type subnet_val; - double double_val; - string* string_val; - set_t set_val; - vec_t vector_val; - } val; - - /** - * Constructor. - * - * arg_type: The type of the value. - * - * arg_present: False if the value represents an optional record field - * that is not set. - */ - Value(TypeTag arg_type = TYPE_ERROR, bool arg_present = true) - : type(arg_type), present(arg_present) {} - - /** - * Destructor. - */ - ~Value(); - - /** - * Unserializes a value. - * - * @param fmt The serialization format to use. The format handles low-level I/O. - * - * @return False if an error occured. - */ - bool Read(SerializationFormat* fmt); - - /** - * Serializes a value. - * - * @param fmt The serialization format to use. The format handles - * low-level I/O. - * - * @return False if an error occured. - */ - bool Write(SerializationFormat* fmt) const; - - /** - * Returns true if the type can be represented by a Value. If - * `atomic_only` is true, will not permit composite types. - */ - static bool IsCompatibleType(BroType* t, bool atomic_only=false); - -private: - Value(const Value& other) { } // Disabled. -}; class WriterBackend; class WriterFrontend; @@ -168,7 +56,7 @@ public: * logging.bif, which just forwards here. */ bool EnableStream(EnumVal* id); - + /** * Disables a log stream. * @@ -265,11 +153,11 @@ protected: // Takes ownership of fields. WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, string path, - int num_fields, const Field* const* fields); + int num_fields, const threading::Field* const* fields); // Takes ownership of values.. bool Write(EnumVal* id, EnumVal* writer, string path, - int num_fields, Value** vals); + int num_fields, threading::Value** vals); // Announces all instantiated writers to peer. void SendAllWritersTo(RemoteSerializer::PeerID peer); @@ -282,7 +170,7 @@ protected: void Error(WriterFrontend* writer, const char* msg); // Deletes the values as passed into Write(). - void DeleteVals(int num_fields, Value** vals); + void DeleteVals(int num_fields, threading::Value** vals); private: struct Filter; @@ -292,10 +180,10 @@ private: bool TraverseRecord(Stream* stream, Filter* filter, RecordType* rt, TableVal* include, TableVal* exclude, string path, list indices); - Value** RecordToFilterVals(Stream* stream, Filter* filter, + threading::Value** RecordToFilterVals(Stream* stream, Filter* filter, RecordVal* columns); - Value* ValToLogVal(Val* val, BroType* ty = 0); + threading::Value* ValToLogVal(Val* val, BroType* ty = 0); Stream* FindStream(EnumVal* id); void RemoveDisabledWriters(Stream* stream); void InstallRotationTimer(WriterInfo* winfo); diff --git a/src/logging/WriterBackend.cc b/src/logging/WriterBackend.cc index 4d2e497b14..3ecc54e240 100644 --- a/src/logging/WriterBackend.cc +++ b/src/logging/WriterBackend.cc @@ -4,9 +4,13 @@ #include "WriterBackend.h" #include "WriterFrontend.h" +#include "../threading/SerializationTypes.h" // Messages sent from backend to frontend (i.e., "OutputMessages"). +using threading::Value; +using threading::Field; + namespace logging { class RotationFinishedMessage : public threading::OutputMessage diff --git a/src/logging/WriterBackend.h b/src/logging/WriterBackend.h index 33271e43f9..9ffa26d0c8 100644 --- a/src/logging/WriterBackend.h +++ b/src/logging/WriterBackend.h @@ -55,7 +55,7 @@ public: * * @return False if an error occured. */ - bool Init(string path, int num_fields, const Field* const* fields); + bool Init(string path, int num_fields, const threading::Field* const* fields); /** * Writes one log entry. @@ -72,7 +72,7 @@ public: * * @return False if an error occured. */ - bool Write(int num_fields, int num_writes, Value*** vals); + bool Write(int num_fields, int num_writes, threading::Value*** vals); /** * Sets the buffering status for the writer, assuming the writer @@ -129,7 +129,7 @@ public: /** * Returns the log fields as passed into the constructor. */ - const Field* const * Fields() const { return fields; } + const threading::Field* const * Fields() const { return fields; } /** * Returns the current buffering state. @@ -170,7 +170,7 @@ protected: * implementation should also call Error() to indicate what happened. */ virtual bool DoInit(string path, int num_fields, - const Field* const* fields) = 0; + const threading::Field* const* fields) = 0; /** * Writer-specific output method implementing recording of fone log @@ -182,8 +182,8 @@ protected: * disabled and eventually deleted. When returning false, an * implementation should also call Error() to indicate what happened. */ - virtual bool DoWrite(int num_fields, const Field* const* fields, - Value** vals) = 0; + virtual bool DoWrite(int num_fields, const threading::Field* const* fields, + threading::Value** vals) = 0; /** * Writer-specific method implementing a change of fthe buffering @@ -287,7 +287,7 @@ private: /** * Deletes the values as passed into Write(). */ - void DeleteVals(int num_writes, Value*** vals); + void DeleteVals(int num_writes, threading::Value*** vals); // Frontend that instantiated us. This object must not be access from // this class, it's running in a different thread! @@ -295,7 +295,7 @@ private: string path; // Log path. int num_fields; // Number of log fields. - const Field* const* fields; // Log fields. + const threading::Field* const* fields; // Log fields. bool buffering; // True if buffering is enabled. }; diff --git a/src/logging/WriterFrontend.cc b/src/logging/WriterFrontend.cc index 79278870f9..1f7af5a53d 100644 --- a/src/logging/WriterFrontend.cc +++ b/src/logging/WriterFrontend.cc @@ -1,6 +1,10 @@ #include "WriterFrontend.h" #include "WriterBackend.h" +#include "../threading/SerializationTypes.h" + +using threading::Value; +using threading::Field; namespace logging { diff --git a/src/logging/WriterFrontend.h b/src/logging/WriterFrontend.h index e0bc590dfc..56c8885cf9 100644 --- a/src/logging/WriterFrontend.h +++ b/src/logging/WriterFrontend.h @@ -64,7 +64,7 @@ public: * * This method must only be called from the main thread. */ - void Init(string path, int num_fields, const Field* const* fields); + void Init(string path, int num_fields, const threading::Field* const* fields); /** * Write out a record. @@ -86,7 +86,7 @@ public: * * This method must only be called from the main thread. */ - void Write(int num_fields, Value** vals); + void Write(int num_fields, threading::Value** vals); /** * Sets the buffering state. @@ -185,7 +185,7 @@ public: /** * Returns the log fields as passed into the constructor. */ - const Field* const * Fields() const { return fields; } + const threading::Field* const * Fields() const { return fields; } protected: friend class Manager; @@ -198,12 +198,12 @@ protected: string ty_name; // Name of the backend type. Set by the manager. string path; // The log path. int num_fields; // The number of log fields. - const Field* const* fields; // The log fields. + const threading::Field* const* fields; // The log fields. // Buffer for bulk writes. static const int WRITER_BUFFER_SIZE = 50; int write_buffer_pos; // Position of next write in buffer. - Value*** write_buffer; // Buffer of size WRITER_BUFFER_SIZE. + threading::Value*** write_buffer; // Buffer of size WRITER_BUFFER_SIZE. }; } diff --git a/src/logging/writers/Ascii.cc b/src/logging/writers/Ascii.cc index 7cc8459e68..fc6832afea 100644 --- a/src/logging/writers/Ascii.cc +++ b/src/logging/writers/Ascii.cc @@ -6,9 +6,12 @@ #include "../../NetVar.h" #include "Ascii.h" +#include "../../threading/SerializationTypes.h" using namespace logging; using namespace writer; +using threading::Value; +using threading::Field; Ascii::Ascii(WriterFrontend* frontend) : WriterBackend(frontend) { diff --git a/src/logging/writers/Ascii.h b/src/logging/writers/Ascii.h index 4a24aad9b7..6f507aff01 100644 --- a/src/logging/writers/Ascii.h +++ b/src/logging/writers/Ascii.h @@ -20,9 +20,9 @@ public: protected: virtual bool DoInit(string path, int num_fields, - const Field* const* fields); - virtual bool DoWrite(int num_fields, const Field* const* fields, - Value** vals); + const threading::Field* const* fields); + virtual bool DoWrite(int num_fields, const threading::Field* const* fields, + threading::Value** vals); virtual bool DoSetBuf(bool enabled); virtual bool DoRotate(string rotated_path, double open, double close, bool terminating); @@ -31,7 +31,7 @@ protected: private: bool IsSpecial(string path) { return path.find("/dev/") == 0; } - bool DoWriteOne(ODesc* desc, Value* val, const Field* field); + bool DoWriteOne(ODesc* desc, threading::Value* val, const threading::Field* field); bool WriteHeaderField(const string& key, const string& value); FILE* file; diff --git a/src/logging/writers/None.h b/src/logging/writers/None.h index 6a62161f49..cce48953d1 100644 --- a/src/logging/writers/None.h +++ b/src/logging/writers/None.h @@ -19,10 +19,10 @@ public: protected: virtual bool DoInit(string path, int num_fields, - const Field* const * fields) { return true; } + const threading::Field* const * fields) { return true; } - virtual bool DoWrite(int num_fields, const Field* const* fields, - Value** vals) { return true; } + virtual bool DoWrite(int num_fields, const threading::Field* const* fields, + threading::Value** vals) { return true; } virtual bool DoSetBuf(bool enabled) { return true; } virtual bool DoRotate(string rotated_path, double open, double close, bool terminating); diff --git a/src/threading/SerializationTypes.cc b/src/threading/SerializationTypes.cc new file mode 100644 index 0000000000..01f0ac84ce --- /dev/null +++ b/src/threading/SerializationTypes.cc @@ -0,0 +1,319 @@ +// See the file "COPYING" in the main distribution directory for copyright. + + +#include "SerializationTypes.h" +#include "../RemoteSerializer.h" + + +using namespace threading; + +bool Field::Read(SerializationFormat* fmt) + { + int t; + int st; + + bool success = (fmt->Read(&name, "name") && fmt->Read(&t, "type") && fmt->Read(&st, "subtype") ); + type = (TypeTag) t; + subtype = (TypeTag) st; + + return success; + } + +bool Field::Write(SerializationFormat* fmt) const + { + return (fmt->Write(name, "name") && fmt->Write((int)type, "type") && fmt->Write((int)subtype, "subtype")); + } + +Value::~Value() + { + if ( (type == TYPE_ENUM || type == TYPE_STRING || type == TYPE_FILE || type == TYPE_FUNC) + && present ) + delete val.string_val; + + if ( type == TYPE_TABLE && present ) + { + for ( int i = 0; i < val.set_val.size; i++ ) + delete val.set_val.vals[i]; + + delete [] val.set_val.vals; + } + + if ( type == TYPE_VECTOR && present ) + { + for ( int i = 0; i < val.vector_val.size; i++ ) + delete val.vector_val.vals[i]; + + delete [] val.vector_val.vals; + } + } + +bool Value::IsCompatibleType(BroType* t, bool atomic_only) + { + if ( ! t ) + return false; + + switch ( t->Tag() ) { + case TYPE_BOOL: + case TYPE_INT: + case TYPE_COUNT: + case TYPE_COUNTER: + case TYPE_PORT: + case TYPE_SUBNET: + case TYPE_ADDR: + case TYPE_DOUBLE: + case TYPE_TIME: + case TYPE_INTERVAL: + case TYPE_ENUM: + case TYPE_STRING: + case TYPE_FILE: + case TYPE_FUNC: + return true; + + case TYPE_RECORD: + return ! atomic_only; + + case TYPE_TABLE: + { + if ( atomic_only ) + return false; + + if ( ! t->IsSet() ) + return false; + + return IsCompatibleType(t->AsSetType()->Indices()->PureType(), true); + } + + case TYPE_VECTOR: + { + if ( atomic_only ) + return false; + + return IsCompatibleType(t->AsVectorType()->YieldType(), true); + } + + default: + return false; + } + + return false; + } + +bool Value::Read(SerializationFormat* fmt) + { + int ty; + + if ( ! (fmt->Read(&ty, "type") && fmt->Read(&present, "present")) ) + return false; + + type = (TypeTag)(ty); + + if ( ! present ) + return true; + + switch ( type ) { + case TYPE_BOOL: + case TYPE_INT: + return fmt->Read(&val.int_val, "int"); + + case TYPE_COUNT: + case TYPE_COUNTER: + case TYPE_PORT: + return fmt->Read(&val.uint_val, "uint"); + + case TYPE_SUBNET: + { + uint32 net[4]; + if ( ! (fmt->Read(&net[0], "net0") && + fmt->Read(&net[1], "net1") && + fmt->Read(&net[2], "net2") && + fmt->Read(&net[3], "net3") && + fmt->Read(&val.subnet_val.width, "width")) ) + return false; + +#ifdef BROv6 + val.subnet_val.net[0] = net[0]; + val.subnet_val.net[1] = net[1]; + val.subnet_val.net[2] = net[2]; + val.subnet_val.net[3] = net[3]; +#else + val.subnet_val.net = net[0]; +#endif + return true; + } + + case TYPE_ADDR: + { + uint32 addr[4]; + if ( ! (fmt->Read(&addr[0], "addr0") && + fmt->Read(&addr[1], "addr1") && + fmt->Read(&addr[2], "addr2") && + fmt->Read(&addr[3], "addr3")) ) + return false; + + val.addr_val[0] = addr[0]; +#ifdef BROv6 + val.addr_val[1] = addr[1]; + val.addr_val[2] = addr[2]; + val.addr_val[3] = addr[3]; +#endif + return true; + } + + case TYPE_DOUBLE: + case TYPE_TIME: + case TYPE_INTERVAL: + return fmt->Read(&val.double_val, "double"); + + case TYPE_ENUM: + case TYPE_STRING: + case TYPE_FILE: + case TYPE_FUNC: + { + val.string_val = new string; + return fmt->Read(val.string_val, "string"); + } + + case TYPE_TABLE: + { + if ( ! fmt->Read(&val.set_val.size, "set_size") ) + return false; + + val.set_val.vals = new Value* [val.set_val.size]; + + for ( int i = 0; i < val.set_val.size; ++i ) + { + val.set_val.vals[i] = new Value; + + if ( ! val.set_val.vals[i]->Read(fmt) ) + return false; + } + + return true; + } + + case TYPE_VECTOR: + { + if ( ! fmt->Read(&val.vector_val.size, "vector_size") ) + return false; + + val.vector_val.vals = new Value* [val.vector_val.size]; + + for ( int i = 0; i < val.vector_val.size; ++i ) + { + val.vector_val.vals[i] = new Value; + + if ( ! val.vector_val.vals[i]->Read(fmt) ) + return false; + } + + return true; + } + + default: + reporter->InternalError("unsupported type %s in Value::Write", type_name(type)); + } + + return false; + } + +bool Value::Write(SerializationFormat* fmt) const + { + if ( ! (fmt->Write((int)type, "type") && + fmt->Write(present, "present")) ) + return false; + + if ( ! present ) + return true; + + switch ( type ) { + case TYPE_BOOL: + case TYPE_INT: + return fmt->Write(val.int_val, "int"); + + case TYPE_COUNT: + case TYPE_COUNTER: + case TYPE_PORT: + return fmt->Write(val.uint_val, "uint"); + + case TYPE_SUBNET: + { + uint32 net[4]; +#ifdef BROv6 + net[0] = val.subnet_val.net[0]; + net[1] = val.subnet_val.net[1]; + net[2] = val.subnet_val.net[2]; + net[3] = val.subnet_val.net[3]; +#else + net[0] = val.subnet_val.net; + net[1] = net[2] = net[3] = 0; +#endif + return fmt->Write(net[0], "net0") && + fmt->Write(net[1], "net1") && + fmt->Write(net[2], "net2") && + fmt->Write(net[3], "net3") && + fmt->Write(val.subnet_val.width, "width"); + } + + case TYPE_ADDR: + { + uint32 addr[4]; + addr[0] = val.addr_val[0]; +#ifdef BROv6 + addr[1] = val.addr_val[1]; + addr[2] = val.addr_val[2]; + addr[3] = val.addr_val[3]; +#else + addr[1] = addr[2] = addr[3] = 0; +#endif + return fmt->Write(addr[0], "addr0") && + fmt->Write(addr[1], "addr1") && + fmt->Write(addr[2], "addr2") && + fmt->Write(addr[3], "addr3"); + } + + case TYPE_DOUBLE: + case TYPE_TIME: + case TYPE_INTERVAL: + return fmt->Write(val.double_val, "double"); + + case TYPE_ENUM: + case TYPE_STRING: + case TYPE_FILE: + case TYPE_FUNC: + return fmt->Write(*val.string_val, "string"); + + case TYPE_TABLE: + { + if ( ! fmt->Write(val.set_val.size, "set_size") ) + return false; + + for ( int i = 0; i < val.set_val.size; ++i ) + { + if ( ! val.set_val.vals[i]->Write(fmt) ) + return false; + } + + return true; + } + + case TYPE_VECTOR: + { + if ( ! fmt->Write(val.vector_val.size, "vector_size") ) + return false; + + for ( int i = 0; i < val.vector_val.size; ++i ) + { + if ( ! val.vector_val.vals[i]->Write(fmt) ) + return false; + } + + return true; + } + + default: + reporter->InternalError("unsupported type %s in Value::REad", type_name(type)); + } + + return false; + } + diff --git a/src/threading/SerializationTypes.h b/src/threading/SerializationTypes.h new file mode 100644 index 0000000000..8cae99c117 --- /dev/null +++ b/src/threading/SerializationTypes.h @@ -0,0 +1,126 @@ + +#ifndef THREADING_SERIALIZATIONTYPES_H +#define THREADING_SERIALIZATIONTZPES_H + +#include "../RemoteSerializer.h" + +using namespace std; + +namespace threading { + +/** + * Definition of a log file, i.e., one column of a log stream. + */ +struct Field { + string name; //! Name of the field. + TypeTag type; //! Type of the field. + TypeTag subtype; //! Inner type for sets. + + /** + * Constructor. + */ + Field() { subtype = TYPE_VOID; } + + /** + * Copy constructor. + */ + Field(const Field& other) + : name(other.name), type(other.type), subtype(other.subtype) { } + + /** + * Unserializes a field. + * + * @param fmt The serialization format to use. The format handles + * low-level I/O. + * + * @return False if an error occured. + */ + bool Read(SerializationFormat* fmt); + + /** + * Serializes a field. + * + * @param fmt The serialization format to use. The format handles + * low-level I/O. + * + * @return False if an error occured. + */ + bool Write(SerializationFormat* fmt) const; +}; + +/** + * Definition of a log value, i.e., a entry logged by a stream. + * + * This struct essentialy represents a serialization of a Val instance (for + * those Vals supported). + */ +struct Value { + TypeTag type; //! The type of the value. + bool present; //! False for optional record fields that are not set. + + struct set_t { bro_int_t size; Value** vals; }; + typedef set_t vec_t; + + /** + * This union is a subset of BroValUnion, including only the types we + * can log directly. See IsCompatibleType(). + */ + union _val { + bro_int_t int_val; + bro_uint_t uint_val; + uint32 addr_val[NUM_ADDR_WORDS]; + subnet_type subnet_val; + double double_val; + string* string_val; + set_t set_val; + vec_t vector_val; + } val; + + /** + * Constructor. + * + * arg_type: The type of the value. + * + * arg_present: False if the value represents an optional record field + * that is not set. + */ + Value(TypeTag arg_type = TYPE_ERROR, bool arg_present = true) + : type(arg_type), present(arg_present) {} + + /** + * Destructor. + */ + ~Value(); + + /** + * Unserializes a value. + * + * @param fmt The serialization format to use. The format handles low-level I/O. + * + * @return False if an error occured. + */ + bool Read(SerializationFormat* fmt); + + /** + * Serializes a value. + * + * @param fmt The serialization format to use. The format handles + * low-level I/O. + * + * @return False if an error occured. + */ + bool Write(SerializationFormat* fmt) const; + + /** + * Returns true if the type can be represented by a Value. If + * `atomic_only` is true, will not permit composite types. + */ + static bool IsCompatibleType(BroType* t, bool atomic_only=false); + +private: + Value(const Value& other) { } // Disabled. +}; + +} + +#endif /* THREADING_SERIALIZATIONTZPES_H */