From 87e10b5f97a897f8c5fac2f983379a8c8966dcae Mon Sep 17 00:00:00 2001 From: Robin Sommer Date: Wed, 18 Jul 2012 12:47:13 -0700 Subject: [PATCH] Further threading and API restructuring for logging and input frameworks. There were a number of cases that weren't thread-safe. In particular, we don't use std::string anymore for anything that's passed between threads (but instead plain old const char*, with manual memmory managmenet). This is still a check-point commit, I'll do more testing. --- src/RemoteSerializer.cc | 4 +- src/input/Manager.cc | 68 ++++++------ src/input/ReaderBackend.cc | 21 ++-- src/input/ReaderBackend.h | 49 +++++++-- src/input/ReaderFrontend.cc | 30 +++--- src/input/ReaderFrontend.h | 27 ++--- src/input/readers/Ascii.cc | 23 +++-- src/input/readers/Benchmark.cc | 8 +- src/input/readers/Raw.cc | 10 +- src/logging/Manager.cc | 89 +++++++++------- src/logging/Manager.h | 6 +- src/logging/WriterBackend.cc | 40 +++++--- src/logging/WriterBackend.h | 51 +++++++-- src/logging/WriterFrontend.cc | 46 ++++----- src/logging/WriterFrontend.h | 18 ++-- src/logging/writers/Ascii.cc | 45 ++++---- src/logging/writers/Ascii.h | 2 +- src/logging/writers/DataSeries.cc | 15 +-- src/logging/writers/DataSeries.h | 2 +- src/logging/writers/None.cc | 21 +++- src/logging/writers/None.h | 2 +- src/threading/BasicThread.cc | 95 ++++++++++++----- src/threading/BasicThread.h | 55 ++++++++-- src/threading/Manager.cc | 19 +++- src/threading/MsgThread.cc | 137 +++++++++++++------------ src/threading/MsgThread.h | 18 ++-- src/threading/Queue.h | 82 ++++++++++++--- src/threading/SerialTypes.cc | 48 +++++++-- src/threading/SerialTypes.h | 34 ++++-- testing/btest/istate/events.bro | 4 +- testing/scripts/diff-remove-timestamps | 4 +- 31 files changed, 692 insertions(+), 381 deletions(-) diff --git a/src/RemoteSerializer.cc b/src/RemoteSerializer.cc index 9409a34634..7ed8b9318e 100644 --- a/src/RemoteSerializer.cc +++ b/src/RemoteSerializer.cc @@ -2692,12 +2692,12 @@ bool RemoteSerializer::ProcessLogCreateWriter() int id, writer; int num_fields; - logging::WriterBackend::WriterInfo info; + logging::WriterBackend::WriterInfo* info = new logging::WriterBackend::WriterInfo(); bool success = fmt.Read(&id, "id") && fmt.Read(&writer, "writer") && fmt.Read(&num_fields, "num_fields") && - info.Read(&fmt); + info->Read(&fmt); if ( ! success ) goto error; diff --git a/src/input/Manager.cc b/src/input/Manager.cc index 1c6b69e8ec..f38613a6f8 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -71,7 +71,7 @@ declare(PDict, InputHash); class Manager::Stream { public: string name; - ReaderBackend::ReaderInfo info; + ReaderBackend::ReaderInfo* info; bool removed; StreamType stream_type; // to distinguish between event and table streams @@ -257,7 +257,6 @@ ReaderBackend* Manager::CreateBackend(ReaderFrontend* frontend, bro_int_t type) assert(ir->factory); - frontend->SetTypeName(ir->name); ReaderBackend* backend = (*ir->factory)(frontend); assert(backend); @@ -291,9 +290,6 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) EnumVal* reader = description->LookupWithDefault(rtype->FieldOffset("reader"))->AsEnumVal(); - ReaderFrontend* reader_obj = new ReaderFrontend(reader->InternalInt()); - assert(reader_obj); - // get the source ... Val* sourceval = description->LookupWithDefault(rtype->FieldOffset("source")); assert ( sourceval != 0 ); @@ -301,21 +297,22 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) string source((const char*) bsource->Bytes(), bsource->Len()); Unref(sourceval); - EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal(); - Val* config = description->LookupWithDefault(rtype->FieldOffset("config")); + ReaderBackend::ReaderInfo* rinfo = new ReaderBackend::ReaderInfo(); + rinfo->source = copy_string(source.c_str()); + EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal(); switch ( mode->InternalInt() ) { case 0: - info->info.mode = MODE_MANUAL; + rinfo->mode = MODE_MANUAL; break; case 1: - info->info.mode = MODE_REREAD; + rinfo->mode = MODE_REREAD; break; case 2: - info->info.mode = MODE_STREAM; + rinfo->mode = MODE_STREAM; break; default: @@ -324,12 +321,16 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) Unref(mode); + Val* config = description->LookupWithDefault(rtype->FieldOffset("config")); + + ReaderFrontend* reader_obj = new ReaderFrontend(*rinfo, reader); + assert(reader_obj); + info->reader = reader_obj; info->type = reader->AsEnumVal(); // ref'd by lookupwithdefault info->name = name; info->config = config->AsTableVal(); // ref'd by LookupWithDefault - - info->info.source = source; + info->info = rinfo; Ref(description); info->description = description; @@ -344,7 +345,7 @@ bool Manager::CreateStream(Stream* info, RecordVal* description) ListVal* index = info->config->RecoverIndex(k); string key = index->Index(0)->AsString()->CheckString(); string value = v->Value()->AsString()->CheckString(); - info->info.config.insert(std::make_pair(key, value)); + info->info->config.insert(std::make_pair(copy_string(key.c_str()), copy_string(value.c_str()))); Unref(index); delete k; } @@ -475,7 +476,7 @@ bool Manager::CreateEventStream(RecordVal* fval) assert(stream->reader); - stream->reader->Init(stream->info, stream->num_fields, logf ); + stream->reader->Init(stream->num_fields, logf ); readers[stream->reader] = stream; @@ -652,7 +653,7 @@ bool Manager::CreateTableStream(RecordVal* fval) assert(stream->reader); - stream->reader->Init(stream->info, fieldsV.size(), fields ); + stream->reader->Init(fieldsV.size(), fields ); readers[stream->reader] = stream; @@ -791,17 +792,19 @@ bool Manager::UnrollRecordType(vector *fields, else { - Field* field = new Field(); - field->name = nameprepend + rec->FieldName(i); - field->type = rec->FieldType(i)->Tag(); + string name = nameprepend + rec->FieldName(i); + const char* secondary = 0; + TypeTag ty = rec->FieldType(i)->Tag(); + TypeTag st = TYPE_VOID; + bool optional = false; - if ( field->type == TYPE_TABLE ) - field->subtype = rec->FieldType(i)->AsSetType()->Indices()->PureType()->Tag(); + if ( ty == TYPE_TABLE ) + st = rec->FieldType(i)->AsSetType()->Indices()->PureType()->Tag(); - else if ( field->type == TYPE_VECTOR ) - field->subtype = rec->FieldType(i)->AsVectorType()->YieldType()->Tag(); + else if ( ty == TYPE_VECTOR ) + st = rec->FieldType(i)->AsVectorType()->YieldType()->Tag(); - else if ( field->type == TYPE_PORT && + else if ( ty == TYPE_PORT && rec->FieldDecl(i)->FindAttr(ATTR_TYPE_COLUMN) ) { // we have an annotation for the second column @@ -811,12 +814,13 @@ bool Manager::UnrollRecordType(vector *fields, assert(c); assert(c->Type()->Tag() == TYPE_STRING); - field->secondary_name = c->AsStringVal()->AsString()->CheckString(); + secondary = c->AsStringVal()->AsString()->CheckString(); } if ( rec->FieldDecl(i)->FindAttr(ATTR_OPTIONAL ) ) - field->optional = true; + optional = true; + Field* field = new Field(name.c_str(), secondary, ty, st, optional); fields->push_back(field); } } @@ -1230,7 +1234,7 @@ void Manager::EndCurrentSend(ReaderFrontend* reader) #endif // Send event that the current update is indeed finished. - SendEvent(update_finished, 2, new StringVal(i->name.c_str()), new StringVal(i->info.source.c_str())); + SendEvent(update_finished, 2, new StringVal(i->name.c_str()), new StringVal(i->info->source)); } void Manager::Put(ReaderFrontend* reader, Value* *vals) @@ -1707,7 +1711,7 @@ int Manager::GetValueLength(const Value* val) { case TYPE_STRING: case TYPE_ENUM: { - length += val->val.string_val->size(); + length += val->val.string_val.length; break; } @@ -1806,8 +1810,8 @@ int Manager::CopyValue(char *data, const int startpos, const Value* val) case TYPE_STRING: case TYPE_ENUM: { - memcpy(data+startpos, val->val.string_val->c_str(), val->val.string_val->length()); - return val->val.string_val->size(); + memcpy(data+startpos, val->val.string_val.data, val->val.string_val.length); + return val->val.string_val.length; } case TYPE_ADDR: @@ -1955,7 +1959,7 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type) case TYPE_STRING: { - BroString *s = new BroString(*(val->val.string_val)); + BroString *s = new BroString((const u_char*)val->val.string_val.data, val->val.string_val.length, 0); return new StringVal(s); } @@ -2039,8 +2043,8 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type) case TYPE_ENUM: { // well, this is kind of stupid, because EnumType just mangles the module name and the var name together again... // but well - string module = extract_module_name(val->val.string_val->c_str()); - string var = extract_var_name(val->val.string_val->c_str()); + string module = extract_module_name(val->val.string_val.data); + string var = extract_var_name(val->val.string_val.data); bro_int_t index = request_type->AsEnumType()->Lookup(module, var.c_str()); if ( index == -1 ) reporter->InternalError("Value not found in enum mappimg. Module: %s, var: %s", diff --git a/src/input/ReaderBackend.cc b/src/input/ReaderBackend.cc index 84106a3c94..88a78c3cd7 100644 --- a/src/input/ReaderBackend.cc +++ b/src/input/ReaderBackend.cc @@ -56,22 +56,24 @@ private: class SendEventMessage : public threading::OutputMessage { public: - SendEventMessage(ReaderFrontend* reader, const string& name, const int num_vals, Value* *val) + SendEventMessage(ReaderFrontend* reader, const char* name, const int num_vals, Value* *val) : threading::OutputMessage("SendEvent", reader), - name(name), num_vals(num_vals), val(val) {} + name(copy_string(name)), num_vals(num_vals), val(val) {} + + virtual ~SendEventMessage() { delete [] name; } virtual bool Process() { bool success = input_mgr->SendEvent(name, num_vals, val); if ( ! success ) - reporter->Error("SendEvent for event %s failed", name.c_str()); + reporter->Error("SendEvent for event %s failed", name); return true; // We do not want to die if sendEvent fails because the event did not return. } private: - const string name; + const char* name; const int num_vals; Value* *val; }; @@ -146,12 +148,14 @@ ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread() { disabled = true; // disabled will be set correcty in init. frontend = arg_frontend; + info = new ReaderInfo(frontend->Info()); SetName(frontend->Name()); } ReaderBackend::~ReaderBackend() { + delete info; } void ReaderBackend::Put(Value* *val) @@ -169,7 +173,7 @@ void ReaderBackend::Clear() SendOut(new ClearMessage(frontend)); } -void ReaderBackend::SendEvent(const string& name, const int num_vals, Value* *vals) +void ReaderBackend::SendEvent(const char* name, const int num_vals, Value* *vals) { SendOut(new SendEventMessage(frontend, name, num_vals, vals)); } @@ -184,17 +188,14 @@ void ReaderBackend::SendEntry(Value* *vals) SendOut(new SendEntryMessage(frontend, vals)); } -bool ReaderBackend::Init(const ReaderInfo& arg_info, const int arg_num_fields, +bool ReaderBackend::Init(const int arg_num_fields, const threading::Field* const* arg_fields) { - info = arg_info; num_fields = arg_num_fields; fields = arg_fields; - SetName("InputReader/"+info.source); - // disable if DoInit returns error. - int success = DoInit(arg_info, arg_num_fields, arg_fields); + int success = DoInit(*info, arg_num_fields, arg_fields); if ( ! success ) { diff --git a/src/input/ReaderBackend.h b/src/input/ReaderBackend.h index 1e77a61f37..7626cc25ed 100644 --- a/src/input/ReaderBackend.h +++ b/src/input/ReaderBackend.h @@ -34,7 +34,10 @@ enum ReaderMode { * for new appended data. When new data is appended is has to be sent * using the Put api functions. */ - MODE_STREAM + MODE_STREAM, + + /** Internal dummy mode for initialization. */ + MODE_NONE }; class ReaderFrontend; @@ -70,14 +73,17 @@ public: */ struct ReaderInfo { - typedef std::map config_map; + // Structure takes ownership of the strings. + typedef std::map config_map; /** * A string left to the interpretation of the reader * implementation; it corresponds to the value configured on * the script-level for the logging filter. + * + * Structure takes ownership of the string. */ - string source; + const char* source; /** * A map of key/value pairs corresponding to the relevant @@ -89,6 +95,35 @@ public: * The opening mode for the input source. */ ReaderMode mode; + + ReaderInfo() + { + source = 0; + mode = MODE_NONE; + } + + ReaderInfo(const ReaderInfo& other) + { + source = other.source ? copy_string(other.source) : 0; + mode = other.mode; + + for ( config_map::const_iterator i = other.config.begin(); i != other.config.end(); i++ ) + config.insert(std::make_pair(copy_string(i->first), copy_string(i->second))); + } + + ~ReaderInfo() + { + delete [] source; + + for ( config_map::iterator i = config.begin(); i != config.end(); i++ ) + { + delete [] i->first; + delete [] i->second; + } + } + + private: + const ReaderInfo& operator=(const ReaderInfo& other); // Disable. }; /** @@ -106,7 +141,7 @@ public: * * @return False if an error occured. */ - bool Init(const ReaderInfo& info, int num_fields, const threading::Field* const* fields); + bool Init(int num_fields, const threading::Field* const* fields); /** * Force trigger an update of the input stream. The action that will @@ -133,7 +168,7 @@ public: /** * Returns the additional reader information into the constructor. */ - const ReaderInfo& Info() const { return info; } + const ReaderInfo& Info() const { return *info; } /** * Returns the number of log fields as passed into the constructor. @@ -209,7 +244,7 @@ protected: * * @param vals the values to be given to the event */ - void SendEvent(const string& name, const int num_vals, threading::Value* *vals); + void SendEvent(const char* name, const int num_vals, threading::Value* *vals); // Content-sending-functions (simple mode). Include table-specific // functionality that simply is not used if we have no table. @@ -291,7 +326,7 @@ private: // from this class, it's running in a different thread! ReaderFrontend* frontend; - ReaderInfo info; + ReaderInfo* info; unsigned int num_fields; const threading::Field* const * fields; // raw mapping diff --git a/src/input/ReaderFrontend.cc b/src/input/ReaderFrontend.cc index 7e4ef201b1..a8528c002d 100644 --- a/src/input/ReaderFrontend.cc +++ b/src/input/ReaderFrontend.cc @@ -11,18 +11,17 @@ namespace input { class InitMessage : public threading::InputMessage { public: - InitMessage(ReaderBackend* backend, const ReaderBackend::ReaderInfo& info, + InitMessage(ReaderBackend* backend, const int num_fields, const threading::Field* const* fields) : threading::InputMessage("Init", backend), - info(info), num_fields(num_fields), fields(fields) { } + num_fields(num_fields), fields(fields) { } virtual bool Process() { - return Object()->Init(info, num_fields, fields); + return Object()->Init(num_fields, fields); } private: - const ReaderBackend::ReaderInfo info; const int num_fields; const threading::Field* const* fields; }; @@ -37,21 +36,26 @@ public: virtual bool Process() { return Object()->Update(); } }; -ReaderFrontend::ReaderFrontend(bro_int_t type) +ReaderFrontend::ReaderFrontend(const ReaderBackend::ReaderInfo& arg_info, EnumVal* type) { disabled = initialized = false; - ty_name = ""; - backend = input_mgr->CreateBackend(this, type); + info = new ReaderBackend::ReaderInfo(arg_info); + const char* t = type->Type()->AsEnumType()->Lookup(type->InternalInt()); + name = copy_string(fmt("%s/%s", arg_info.source, t)); + + backend = input_mgr->CreateBackend(this, type->InternalInt()); assert(backend); backend->Start(); } ReaderFrontend::~ReaderFrontend() { + delete [] name; + delete info; } -void ReaderFrontend::Init(const ReaderBackend::ReaderInfo& arg_info, const int arg_num_fields, +void ReaderFrontend::Init(const int arg_num_fields, const threading::Field* const* arg_fields) { if ( disabled ) @@ -60,12 +64,11 @@ void ReaderFrontend::Init(const ReaderBackend::ReaderInfo& arg_info, const int a if ( initialized ) reporter->InternalError("reader initialize twice"); - info = arg_info; num_fields = arg_num_fields; fields = arg_fields; initialized = true; - backend->SendIn(new InitMessage(backend, info, num_fields, fields)); + backend->SendIn(new InitMessage(backend, num_fields, fields)); } void ReaderFrontend::Update() @@ -82,12 +85,9 @@ void ReaderFrontend::Update() backend->SendIn(new UpdateMessage(backend)); } -string ReaderFrontend::Name() const +const char* ReaderFrontend::Name() const { - if ( ! info.source.size() ) - return ty_name; - - return ty_name + "/" + info.source; + return name; } } diff --git a/src/input/ReaderFrontend.h b/src/input/ReaderFrontend.h index 93e416e65b..a93f7703ac 100644 --- a/src/input/ReaderFrontend.h +++ b/src/input/ReaderFrontend.h @@ -4,10 +4,11 @@ #define INPUT_READERFRONTEND_H #include "ReaderBackend.h" - #include "threading/MsgThread.h" #include "threading/SerialTypes.h" +#include "Val.h" + namespace input { class Manager; @@ -25,6 +26,8 @@ public: /** * Constructor. * + * info: The meta information struct for the writer. + * * type: The backend writer type, with the value corresponding to the * script-level \c Input::Reader enum (e.g., \a READER_ASCII). The * frontend will internally instantiate a ReaderBackend of the @@ -32,7 +35,7 @@ public: * * Frontends must only be instantiated by the main thread. */ - ReaderFrontend(bro_int_t type); + ReaderFrontend(const ReaderBackend::ReaderInfo& info, EnumVal* type); /** * Destructor. @@ -52,7 +55,7 @@ public: * * This method must only be called from the main thread. */ - void Init(const ReaderBackend::ReaderInfo& info, const int arg_num_fields, const threading::Field* const* fields); + void Init(const int arg_num_fields, const threading::Field* const* fields); /** * Force an update of the current input source. Actual action depends @@ -100,12 +103,12 @@ public: * * This method is safe to call from any thread. */ - string Name() const; + const char* Name() const; /** * Returns the additional reader information passed into the constructor. */ - const ReaderBackend::ReaderInfo& Info() const { return info; } + const ReaderBackend::ReaderInfo& Info() const { assert(info); return *info; } /** * Returns the number of log fields as passed into the constructor. @@ -120,24 +123,14 @@ public: protected: friend class Manager; - /** - * Returns the name of the backend's type. - */ - const string& TypeName() const { return ty_name; } - - /** - * Sets the name of the backend's type. - */ - void SetTypeName(const string& name) { ty_name = name; } - private: ReaderBackend* backend; // The backend we have instanatiated. - ReaderBackend::ReaderInfo info; // Meta information as passed to Init(). + ReaderBackend::ReaderInfo* info; // Meta information. const threading::Field* const* fields; // The input fields. int num_fields; // Information as passed to Init(). - string ty_name; // Backend type, set by manager. bool disabled; // True if disabled. bool initialized; // True if initialized. + const char* name; // Descriptive name. }; } diff --git a/src/input/readers/Ascii.cc b/src/input/readers/Ascii.cc index 7f93a3138c..73821d7cb6 100644 --- a/src/input/readers/Ascii.cc +++ b/src/input/readers/Ascii.cc @@ -87,10 +87,10 @@ bool Ascii::DoInit(const ReaderInfo& info, int num_fields, const Field* const* f { mtime = 0; - file = new ifstream(info.source.c_str()); + file = new ifstream(info.source); if ( ! file->is_open() ) { - Error(Fmt("Init: cannot open %s", info.source.c_str())); + Error(Fmt("Init: cannot open %s", info.source)); delete(file); file = 0; return false; @@ -98,7 +98,7 @@ bool Ascii::DoInit(const ReaderInfo& info, int num_fields, const Field* const* f if ( ReadHeader(false) == false ) { - Error(Fmt("Init: cannot open %s; headers are incorrect", info.source.c_str())); + Error(Fmt("Init: cannot open %s; headers are incorrect", info.source)); file->close(); delete(file); file = 0; @@ -164,20 +164,20 @@ bool Ascii::ReadHeader(bool useCached) } Error(Fmt("Did not find requested field %s in input data file %s.", - field->name.c_str(), Info().source.c_str())); + field->name, Info().source)); return false; } FieldMapping f(field->name, field->type, field->subtype, ifields[field->name]); - if ( field->secondary_name != "" ) + if ( field->secondary_name && strlen(field->secondary_name) != 0 ) { map::iterator fit2 = ifields.find(field->secondary_name); if ( fit2 == ifields.end() ) { Error(Fmt("Could not find requested port type field %s in input data file.", - field->secondary_name.c_str())); + field->secondary_name)); return false; } @@ -220,7 +220,8 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) switch ( field.type ) { case TYPE_ENUM: case TYPE_STRING: - val->val.string_val = new string(s); + val->val.string_val.length = s.size(); + val->val.string_val.data = copy_string(s.c_str()); break; case TYPE_BOOL: @@ -367,9 +368,9 @@ bool Ascii::DoUpdate() { // check if the file has changed struct stat sb; - if ( stat(Info().source.c_str(), &sb) == -1 ) + if ( stat(Info().source, &sb) == -1 ) { - Error(Fmt("Could not get stat for %s", Info().source.c_str())); + Error(Fmt("Could not get stat for %s", Info().source)); return false; } @@ -403,10 +404,10 @@ bool Ascii::DoUpdate() file = 0; } - file = new ifstream(Info().source.c_str()); + file = new ifstream(Info().source); if ( ! file->is_open() ) { - Error(Fmt("cannot open %s", Info().source.c_str())); + Error(Fmt("cannot open %s", Info().source)); return false; } diff --git a/src/input/readers/Benchmark.cc b/src/input/readers/Benchmark.cc index 28afdc1c89..b8cec0f14d 100644 --- a/src/input/readers/Benchmark.cc +++ b/src/input/readers/Benchmark.cc @@ -38,7 +38,7 @@ void Benchmark::DoClose() bool Benchmark::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fields) { - num_lines = atoi(info.source.c_str()); + num_lines = atoi(info.source); if ( autospread != 0.0 ) autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) ); @@ -126,8 +126,12 @@ threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype) assert(false); // no enums, please. case TYPE_STRING: - val->val.string_val = new string(RandomString(10)); + { + string rnd = RandomString(10); + val->val.string_val.data = copy_string(rnd.c_str()); + val->val.string_val.length = rnd.size(); break; + } case TYPE_BOOL: val->val.int_val = 1; // we never lie. diff --git a/src/input/readers/Raw.cc b/src/input/readers/Raw.cc index f62e966883..ac96e5c0f5 100644 --- a/src/input/readers/Raw.cc +++ b/src/input/readers/Raw.cc @@ -108,7 +108,7 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie firstrun = true; bool result; - if ( info.source.length() == 0 ) + if ( ! info.source || strlen(info.source) == 0 ) { Error("No source path provided"); return false; @@ -129,11 +129,12 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie } // do Initialization - char last = info.source[info.source.length()-1]; + string source = string(info.source); + char last = info.source[source.length() - 1]; if ( last == '|' ) { execute = true; - fname = info.source.substr(0, fname.length() - 1); + fname = source.substr(0, fname.length() - 1); if ( (info.mode != MODE_MANUAL) ) { @@ -237,7 +238,8 @@ bool Raw::DoUpdate() // filter has exactly one text field. convert to it. Value* val = new Value(TYPE_STRING, true); - val->val.string_val = new string(line); + val->val.string_val.data = copy_string(line.c_str()); + val->val.string_val.length = line.size(); fields[0] = val; Put(fields); diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 1808b83738..fd970c48b2 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -6,6 +6,7 @@ #include "../EventHandler.h" #include "../NetVar.h" #include "../Net.h" +#include "../Type.h" #include "threading/Manager.h" #include "threading/SerialTypes.h" @@ -75,7 +76,7 @@ struct Manager::WriterInfo { double interval; Func* postprocessor; WriterFrontend* writer; - WriterBackend::WriterInfo info; + WriterBackend::WriterInfo* info; }; struct Manager::Stream { @@ -118,6 +119,7 @@ Manager::Stream::~Stream() Unref(winfo->type); delete winfo->writer; + delete winfo->info; delete winfo; } @@ -193,7 +195,6 @@ WriterBackend* Manager::CreateBackend(WriterFrontend* frontend, bro_int_t type) assert(ld->factory); - frontend->ty_name = ld->name; WriterBackend* backend = (*ld->factory)(frontend); assert(backend); @@ -476,18 +477,17 @@ bool Manager::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt, return false; } - threading::Field* field = new threading::Field(); - field->name = new_path; - field->type = t->Tag(); - field->optional = rt->FieldDecl(i)->FindAttr(ATTR_OPTIONAL); + TypeTag st = TYPE_VOID; - if ( field->type == TYPE_TABLE ) - field->subtype = t->AsSetType()->Indices()->PureType()->Tag(); + if ( t->Tag() == TYPE_TABLE ) + st = t->AsSetType()->Indices()->PureType()->Tag(); - else if ( field->type == TYPE_VECTOR ) - field->subtype = t->AsVectorType()->YieldType()->Tag(); + else if ( t->Tag() == TYPE_VECTOR ) + st = t->AsVectorType()->YieldType()->Tag(); - filter->fields[filter->num_fields - 1] = field; + bool optional = rt->FieldDecl(i)->FindAttr(ATTR_OPTIONAL); + + filter->fields[filter->num_fields - 1] = new threading::Field(new_path.c_str(), 0, t->Tag(), st, optional); } return true; @@ -594,7 +594,7 @@ bool Manager::AddFilter(EnumVal* id, RecordVal* fval) { threading::Field* field = filter->fields[i]; DBG_LOG(DBG_LOGGING, " field %10s: %s", - field->name.c_str(), type_name(field->type)); + field->name, type_name(field->type)); } #endif @@ -769,9 +769,9 @@ bool Manager::Write(EnumVal* id, RecordVal* columns) for ( int j = 0; j < filter->num_fields; ++j ) arg_fields[j] = new threading::Field(*filter->fields[j]); - WriterBackend::WriterInfo info; - info.path = path; - info.network_time = network_time; + WriterBackend::WriterInfo* info = new WriterBackend::WriterInfo; + info->path = copy_string(path.c_str()); + info->network_time = network_time; HashKey* k; IterCookie* c = filter->config->AsTable()->InitForIteration(); @@ -782,7 +782,7 @@ bool Manager::Write(EnumVal* id, RecordVal* columns) ListVal* index = filter->config->RecoverIndex(k); string key = index->Index(0)->AsString()->CheckString(); string value = v->Value()->AsString()->CheckString(); - info.config.insert(std::make_pair(key, value)); + info->config.insert(std::make_pair(copy_string(key.c_str()), copy_string(value.c_str()))); Unref(index); delete k; } @@ -844,11 +844,16 @@ threading::Value* Manager::ValToLogVal(Val* val, BroType* ty) val->Type()->AsEnumType()->Lookup(val->InternalInt()); if ( s ) - lval->val.string_val = new string(s); + { + lval->val.string_val.data = copy_string(s); + lval->val.string_val.length = strlen(s); + } + else { val->Type()->Error("enum type does not contain value", val); - lval->val.string_val = new string(); + lval->val.string_val.data = copy_string(""); + lval->val.string_val.length = 0; } break; } @@ -880,15 +885,20 @@ threading::Value* Manager::ValToLogVal(Val* val, BroType* ty) case TYPE_STRING: { const BroString* s = val->AsString(); - lval->val.string_val = - new string((const char*) s->Bytes(), s->Len()); + char* buf = new char[s->Len()]; + memcpy(buf, s->Bytes(), s->Len()); + + lval->val.string_val.data = buf; + lval->val.string_val.length = s->Len(); break; } case TYPE_FILE: { const BroFile* f = val->AsFile(); - lval->val.string_val = new string(f->Name()); + string s = f->Name(); + lval->val.string_val.data = copy_string(s.c_str()); + lval->val.string_val.length = s.size(); break; } @@ -897,7 +907,9 @@ threading::Value* Manager::ValToLogVal(Val* val, BroType* ty) ODesc d; const Func* f = val->AsFunc(); f->Describe(&d); - lval->val.string_val = new string(d.Description()); + const char* s = d.Description(); + lval->val.string_val.data = copy_string(s); + lval->val.string_val.length = strlen(s); break; } @@ -977,7 +989,7 @@ threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter, return vals; } -WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, const WriterBackend::WriterInfo& info, +WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info, int num_fields, const threading::Field* const* fields, bool local, bool remote) { Stream* stream = FindStream(id); @@ -987,7 +999,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, const Writer return 0; Stream::WriterMap::iterator w = - stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), info.path)); + stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), info->path)); if ( w != stream->writers.end() ) // If we already have a writer for this. That's fine, we just @@ -1013,7 +1025,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, const Writer { Filter* f = *it; if ( f->writer->AsEnum() == writer->AsEnum() && - f->path == info.path ) + f->path == info->path ) { found_filter_match = true; winfo->interval = f->interval; @@ -1030,7 +1042,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, const Writer } stream->writers.insert( - Stream::WriterMap::value_type(Stream::WriterPathPair(writer->AsEnum(), info.path), + Stream::WriterMap::value_type(Stream::WriterPathPair(writer->AsEnum(), info->path), winfo)); // Still need to set the WriterInfo's rotation parameters, which we @@ -1038,11 +1050,11 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, const Writer const char* base_time = log_rotate_base_time ? log_rotate_base_time->AsString()->CheckString() : 0; - winfo->info.rotation_interval = winfo->interval; - winfo->info.rotation_base = parse_rotate_base_time(base_time); + winfo->info->rotation_interval = winfo->interval; + winfo->info->rotation_base = parse_rotate_base_time(base_time); - winfo->writer = new WriterFrontend(id, writer, local, remote); - winfo->writer->Init(winfo->info, num_fields, fields); + winfo->writer = new WriterFrontend(*winfo->info, id, writer, local, remote); + winfo->writer->Init(num_fields, fields); InstallRotationTimer(winfo); @@ -1124,7 +1136,7 @@ void Manager::SendAllWritersTo(RemoteSerializer::PeerID peer) EnumVal writer_val(i->first.first, BifType::Enum::Log::Writer); remote_serializer->SendLogCreateWriter(peer, (*s)->id, &writer_val, - i->second->info, + *i->second->info, writer->NumFields(), writer->Fields()); } @@ -1260,14 +1272,14 @@ void Manager::InstallRotationTimer(WriterInfo* winfo) timer_mgr->Add(winfo->rotation_timer); DBG_LOG(DBG_LOGGING, "Scheduled rotation timer for %s to %.6f", - winfo->writer->Name().c_str(), winfo->rotation_timer->Time()); + winfo->writer->Name(), winfo->rotation_timer->Time()); } } void Manager::Rotate(WriterInfo* winfo) { DBG_LOG(DBG_LOGGING, "Rotating %s at %.6f", - winfo->writer->Name().c_str(), network_time); + winfo->writer->Name(), network_time); // Build a temporary path for the writer to move the file to. struct tm tm; @@ -1278,15 +1290,14 @@ void Manager::Rotate(WriterInfo* winfo) localtime_r(&teatime, &tm); strftime(buf, sizeof(buf), date_fmt, &tm); - string tmp = string(fmt("%s-%s", winfo->writer->Info().path.c_str(), buf)); - // Trigger the rotation. + const char* tmp = fmt("%s-%s", winfo->writer->Info().path, buf); winfo->writer->Rotate(tmp, winfo->open_time, network_time, terminating); ++rotations_pending; } -bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string old_name, +bool Manager::FinishedRotation(WriterFrontend* writer, const char* new_name, const char* old_name, double open, double close, bool terminating) { --rotations_pending; @@ -1296,7 +1307,7 @@ bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string o return true; DBG_LOG(DBG_LOGGING, "Finished rotating %s at %.6f, new name %s", - writer->Name().c_str(), network_time, new_name.c_str()); + writer->Name(), network_time, new_name); WriterInfo* winfo = FindWriter(writer); if ( ! winfo ) @@ -1305,8 +1316,8 @@ bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string o // Create the RotationInfo record. RecordVal* info = new RecordVal(BifType::Record::Log::RotationInfo); info->Assign(0, winfo->type->Ref()); - info->Assign(1, new StringVal(new_name.c_str())); - info->Assign(2, new StringVal(winfo->writer->Info().path.c_str())); + info->Assign(1, new StringVal(new_name)); + info->Assign(2, new StringVal(winfo->writer->Info().path)); info->Assign(3, new Val(open, TYPE_TIME)); info->Assign(4, new Val(close, TYPE_TIME)); info->Assign(5, new Val(terminating, TYPE_BOOL)); diff --git a/src/logging/Manager.h b/src/logging/Manager.h index 38dd9258b3..ae7a1796ba 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -162,8 +162,8 @@ protected: //// Function also used by the RemoteSerializer. - // Takes ownership of fields. - WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, const WriterBackend::WriterInfo& info, + // Takes ownership of fields and info. + WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info, int num_fields, const threading::Field* const* fields, bool local, bool remote); @@ -175,7 +175,7 @@ protected: void SendAllWritersTo(RemoteSerializer::PeerID peer); // Signals that a file has been rotated. - bool FinishedRotation(WriterFrontend* writer, string new_name, string old_name, + bool FinishedRotation(WriterFrontend* writer, const char* new_name, const char* old_name, double open, double close, bool terminating); // Deletes the values as passed into Write(). diff --git a/src/logging/WriterBackend.cc b/src/logging/WriterBackend.cc index a284c56201..8f119d6f8f 100644 --- a/src/logging/WriterBackend.cc +++ b/src/logging/WriterBackend.cc @@ -18,20 +18,26 @@ namespace logging { class RotationFinishedMessage : public threading::OutputMessage { public: - RotationFinishedMessage(WriterFrontend* writer, string new_name, string old_name, + RotationFinishedMessage(WriterFrontend* writer, const char* new_name, const char* old_name, double open, double close, bool terminating) : threading::OutputMessage("RotationFinished", writer), - new_name(new_name), old_name(old_name), open(open), + new_name(copy_string(new_name)), old_name(copy_string(old_name)), open(open), close(close), terminating(terminating) { } + virtual ~RotationFinishedMessage() + { + delete [] new_name; + delete [] old_name; + } + virtual bool Process() { return log_mgr->FinishedRotation(Object(), new_name, old_name, open, close, terminating); } private: - string new_name; - string old_name; + const char* new_name; + const char* old_name; double open; double close; bool terminating; @@ -65,12 +71,16 @@ bool WriterBackend::WriterInfo::Read(SerializationFormat* fmt) { int size; - if ( ! (fmt->Read(&path, "path") && + string tmp_path; + + if ( ! (fmt->Read(&tmp_path, "path") && fmt->Read(&rotation_base, "rotation_base") && fmt->Read(&rotation_interval, "rotation_interval") && fmt->Read(&size, "config_size")) ) return false; + path = copy_string(tmp_path.c_str()); + config.clear(); while ( size ) @@ -81,7 +91,7 @@ bool WriterBackend::WriterInfo::Read(SerializationFormat* fmt) if ( ! (fmt->Read(&value, "config-value") && fmt->Read(&value, "config-key")) ) return false; - config.insert(std::make_pair(value, key)); + config.insert(std::make_pair(copy_string(value.c_str()), copy_string(key.c_str()))); } return true; @@ -98,7 +108,7 @@ bool WriterBackend::WriterInfo::Write(SerializationFormat* fmt) const fmt->Write(size, "config_size")) ) return false; - for ( config_map::const_iterator i = config.begin(); i != config.end(); ++i ) + for ( config_map::const_iterator i = config.begin(); i != config.end(); ++i ) { if ( ! (fmt->Write(i->first, "config-value") && fmt->Write(i->second, "config-key")) ) return false; @@ -113,8 +123,7 @@ WriterBackend::WriterBackend(WriterFrontend* arg_frontend) : MsgThread() fields = 0; buffering = true; frontend = arg_frontend; - - info.path = ""; + info = new WriterInfo(frontend->Info()); SetName(frontend->Name()); } @@ -128,6 +137,8 @@ WriterBackend::~WriterBackend() delete [] fields; } + + delete info; } void WriterBackend::DeleteVals(int num_writes, Value*** vals) @@ -144,7 +155,7 @@ void WriterBackend::DeleteVals(int num_writes, Value*** vals) delete [] vals; } -bool WriterBackend::FinishedRotation(string new_name, string old_name, +bool WriterBackend::FinishedRotation(const char* new_name, const char* old_name, double open, double close, bool terminating) { SendOut(new RotationFinishedMessage(frontend, new_name, old_name, open, close, terminating)); @@ -156,15 +167,12 @@ void WriterBackend::DisableFrontend() SendOut(new DisableMessage(frontend)); } -bool WriterBackend::Init(const WriterInfo& arg_info, int arg_num_fields, const Field* const* arg_fields, const string& frontend_name) +bool WriterBackend::Init(int arg_num_fields, const Field* const* arg_fields) { - info = arg_info; num_fields = arg_num_fields; fields = arg_fields; - SetName(frontend->Name()); - - if ( ! DoInit(arg_info, arg_num_fields, arg_fields) ) + if ( ! DoInit(*info, arg_num_fields, arg_fields) ) { DisableFrontend(); return false; @@ -246,7 +254,7 @@ bool WriterBackend::SetBuf(bool enabled) return true; } -bool WriterBackend::Rotate(string rotated_path, double open, +bool WriterBackend::Rotate(const char* rotated_path, double open, double close, bool terminating) { if ( ! DoRotate(rotated_path, open, close, terminating) ) diff --git a/src/logging/WriterBackend.h b/src/logging/WriterBackend.h index 33cde8679e..a59cd1893e 100644 --- a/src/logging/WriterBackend.h +++ b/src/logging/WriterBackend.h @@ -48,14 +48,17 @@ public: */ struct WriterInfo { - typedef std::map config_map; + // Structure takes ownership of these strings. + typedef std::map config_map; /** * A string left to the interpretation of the writer * implementation; it corresponds to the 'path' value configured * on the script-level for the logging filter. + * + * Structure takes ownership of string. */ - string path; + const char* path; /** * The rotation interval as configured for this writer. @@ -76,9 +79,38 @@ public: * A map of key/value pairs corresponding to the relevant * filter's "config" table. */ - std::map config; + config_map config; + + WriterInfo() + { + path = 0; + } + + WriterInfo(const WriterInfo& other) + { + path = other.path ? copy_string(other.path) : 0; + rotation_interval = other.rotation_interval; + rotation_base = other.rotation_base; + network_time = other.network_time; + + for ( config_map::const_iterator i = other.config.begin(); i != other.config.end(); i++ ) + config.insert(std::make_pair(copy_string(i->first), copy_string(i->second))); + } + + ~WriterInfo() + { + delete [] path; + + for ( config_map::iterator i = config.begin(); i != config.end(); i++ ) + { + delete [] i->first; + delete [] i->second; + } + } private: + const WriterInfo& operator=(const WriterInfo& other); // Disable. + friend class ::RemoteSerializer; // Note, these need to be adapted when changing the struct's @@ -90,7 +122,6 @@ public: /** * One-time initialization of the writer to define the logged fields. * - * @param info Meta information for the writer. * @param num_fields * * @param fields An array of size \a num_fields with the log fields. @@ -100,7 +131,7 @@ public: * * @return False if an error occured. */ - bool Init(const WriterInfo& info, int num_fields, const threading::Field* const* fields, const string& frontend_name); + bool Init(int num_fields, const threading::Field* const* fields); /** * Writes one log entry. @@ -146,7 +177,7 @@ public: * * @return False if an error occured. */ - bool Rotate(string rotated_path, double open, double close, bool terminating); + bool Rotate(const char* rotated_path, double open, double close, bool terminating); /** * Disables the frontend that has instantiated this backend. Once @@ -157,7 +188,7 @@ public: /** * Returns the additional writer information passed into the constructor. */ - const WriterInfo& Info() const { return info; } + const WriterInfo& Info() const { return *info; } /** * Returns the number of log fields as passed into the constructor. @@ -193,7 +224,7 @@ public: * @param terminating: True if the original rotation request occured * due to the main Bro process shutting down. */ - bool FinishedRotation(string new_name, string old_name, + bool FinishedRotation(const char* new_name, const char* old_name, double open, double close, bool terminating); /** Helper method to render an IP address as a string. @@ -322,7 +353,7 @@ protected: * due the main Bro prcoess terminating (and not because we've * reached a regularly scheduled time for rotation). */ - virtual bool DoRotate(string rotated_path, double open, double close, + virtual bool DoRotate(const char* rotated_path, double open, double close, bool terminating) = 0; /** @@ -351,7 +382,7 @@ private: // this class, it's running in a different thread! WriterFrontend* frontend; - WriterInfo info; // Meta information as passed to Init(). + const WriterInfo* info; // Meta information. int num_fields; // Number of log fields. const threading::Field* const* fields; // Log fields. bool buffering; // True if buffering is enabled. diff --git a/src/logging/WriterFrontend.cc b/src/logging/WriterFrontend.cc index b816327e9c..fc237d6f6e 100644 --- a/src/logging/WriterFrontend.cc +++ b/src/logging/WriterFrontend.cc @@ -16,35 +16,36 @@ namespace logging { class InitMessage : public threading::InputMessage { public: - InitMessage(WriterBackend* backend, const WriterBackend::WriterInfo& info, const int num_fields, const Field* const* fields, const string& frontend_name) + InitMessage(WriterBackend* backend, const int num_fields, const Field* const* fields) : threading::InputMessage("Init", backend), - info(info), num_fields(num_fields), fields(fields), - frontend_name(frontend_name) { } + num_fields(num_fields), fields(fields) + {} - virtual bool Process() { return Object()->Init(info, num_fields, fields, frontend_name); } + + virtual bool Process() { return Object()->Init(num_fields, fields); } private: - WriterBackend::WriterInfo info; const int num_fields; const Field * const* fields; - const string frontend_name; }; class RotateMessage : public threading::InputMessage { public: - RotateMessage(WriterBackend* backend, WriterFrontend* frontend, const string rotated_path, const double open, + RotateMessage(WriterBackend* backend, WriterFrontend* frontend, const char* rotated_path, const double open, const double close, const bool terminating) : threading::InputMessage("Rotate", backend), frontend(frontend), - rotated_path(rotated_path), open(open), + rotated_path(copy_string(rotated_path)), open(open), close(close), terminating(terminating) { } + virtual ~RotateMessage() { delete [] rotated_path; } + virtual bool Process() { return Object()->Rotate(rotated_path, open, close, terminating); } private: WriterFrontend* frontend; - const string rotated_path; + const char* rotated_path; const double open; const double close; const bool terminating; @@ -96,7 +97,7 @@ private: using namespace logging; -WriterFrontend::WriterFrontend(EnumVal* arg_stream, EnumVal* arg_writer, bool arg_local, bool arg_remote) +WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVal* arg_stream, EnumVal* arg_writer, bool arg_local, bool arg_remote) { stream = arg_stream; writer = arg_writer; @@ -109,7 +110,10 @@ WriterFrontend::WriterFrontend(EnumVal* arg_stream, EnumVal* arg_writer, bool ar remote = arg_remote; write_buffer = 0; write_buffer_pos = 0; - ty_name = ""; + info = new WriterBackend::WriterInfo(arg_info); + + const char* w = arg_writer->Type()->AsEnumType()->Lookup(arg_stream->InternalInt()); + name = copy_string(fmt("%s/%s", arg_info.path, w)); if ( local ) { @@ -127,14 +131,7 @@ WriterFrontend::~WriterFrontend() { Unref(stream); Unref(writer); - } - -string WriterFrontend::Name() const - { - if ( ! info.path.size() ) - return ty_name; - - return ty_name + "/" + info.path; + delete info; } void WriterFrontend::Stop() @@ -143,7 +140,7 @@ void WriterFrontend::Stop() SetDisable(); } -void WriterFrontend::Init(const WriterBackend::WriterInfo& arg_info, int arg_num_fields, const Field* const * arg_fields) +void WriterFrontend::Init(int arg_num_fields, const Field* const * arg_fields) { if ( disabled ) return; @@ -151,19 +148,18 @@ void WriterFrontend::Init(const WriterBackend::WriterInfo& arg_info, int arg_num if ( initialized ) reporter->InternalError("writer initialize twice"); - info = arg_info; num_fields = arg_num_fields; fields = arg_fields; initialized = true; if ( backend ) - backend->SendIn(new InitMessage(backend, arg_info, arg_num_fields, arg_fields, Name())); + backend->SendIn(new InitMessage(backend, arg_num_fields, arg_fields)); if ( remote ) remote_serializer->SendLogCreateWriter(stream, writer, - arg_info, + *info, arg_num_fields, arg_fields); @@ -177,7 +173,7 @@ void WriterFrontend::Write(int num_fields, Value** vals) if ( remote ) remote_serializer->SendLogWrite(stream, writer, - info.path, + info->path, num_fields, vals); @@ -242,7 +238,7 @@ void WriterFrontend::Flush(double network_time) backend->SendIn(new FlushMessage(backend, network_time)); } -void WriterFrontend::Rotate(string rotated_path, double open, double close, bool terminating) +void WriterFrontend::Rotate(const char* rotated_path, double open, double close, bool terminating) { if ( disabled ) return; diff --git a/src/logging/WriterFrontend.h b/src/logging/WriterFrontend.h index e8f3d06d6c..549d602bd5 100644 --- a/src/logging/WriterFrontend.h +++ b/src/logging/WriterFrontend.h @@ -31,6 +31,10 @@ public: * script-level \c Log::Writer enum (e.g., \a WRITER_ASCII). The * frontend will internally instantiate a WriterBackend of the * corresponding type. + * + * info: The meta information struct for the writer. + * + * writer_name: A descriptive name for the writer's type. * * local: If true, the writer will instantiate a local backend. * @@ -39,7 +43,7 @@ public: * * Frontends must only be instantiated by the main thread. */ - WriterFrontend(EnumVal* stream, EnumVal* writer, bool local, bool remote); + WriterFrontend(const WriterBackend::WriterInfo& info, EnumVal* stream, EnumVal* writer, bool local, bool remote); /** * Destructor. @@ -68,7 +72,7 @@ public: * * This method must only be called from the main thread. */ - void Init(const WriterBackend::WriterInfo& info, int num_fields, const threading::Field* const* fields); + void Init(int num_fields, const threading::Field* const* fields); /** * Write out a record. @@ -130,7 +134,7 @@ public: * * This method must only be called from the main thread. */ - void Rotate(string rotated_path, double open, double close, bool terminating); + void Rotate(const char* rotated_path, double open, double close, bool terminating); /** * Finalizes writing to this tream. @@ -175,7 +179,7 @@ public: /** * Returns the additional writer information as passed into the constructor. */ - const WriterBackend::WriterInfo& Info() const { return info; } + const WriterBackend::WriterInfo& Info() const { return *info; } /** * Returns the number of log fields as passed into the constructor. @@ -188,7 +192,7 @@ public: * * This method is safe to call from any thread. */ - string Name() const; + const char* Name() const { return name; } /** * Returns the log fields as passed into the constructor. @@ -210,8 +214,8 @@ protected: bool local; // True if logging locally. bool remote; // True if loggin remotely. - string ty_name; // Name of the backend type. Set by the manager. - WriterBackend::WriterInfo info; // The writer information. + const char* name; // Descriptive name of the + WriterBackend::WriterInfo* info; // The writer information. int num_fields; // The number of log fields. const threading::Field* const* fields; // The log fields. diff --git a/src/logging/writers/Ascii.cc b/src/logging/writers/Ascii.cc index 20963d1535..99fd3f3c6e 100644 --- a/src/logging/writers/Ascii.cc +++ b/src/logging/writers/Ascii.cc @@ -52,6 +52,8 @@ Ascii::Ascii(WriterFrontend* frontend) : WriterBackend(frontend) Ascii::~Ascii() { + //fprintf(stderr, "DTOR %p\n", this); + // Normally, the file will be closed here already via the Finish() // message. But when we terminate abnormally, we may still have it open. if ( fd ) @@ -78,7 +80,10 @@ void Ascii::CloseFile(double t) return; if ( include_meta ) - WriteHeaderField("end", t ? Timestamp(t) : ""); + { + string ts = t ? Timestamp(t) : string(""); + WriteHeaderField("end", ts); + } close(fd); fd = 0; @@ -118,6 +123,8 @@ bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const * if ( ! safe_write(fd, str.c_str(), str.length()) ) goto write_error; + string ts = Timestamp(info.network_time); + if ( ! (WriteHeaderField("set_separator", get_escaped_string( string(set_separator, set_separator_len), false)) && WriteHeaderField("empty_field", get_escaped_string( @@ -125,8 +132,8 @@ bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const * WriteHeaderField("unset_field", get_escaped_string( string(unset_field, unset_field_len), false)) && WriteHeaderField("path", get_escaped_string(path, false)) && - WriteHeaderField("start", Timestamp(info.network_time))) ) - goto write_error; + WriteHeaderField("start", ts)) ) + goto write_error; for ( int i = 0; i < num_fields; ++i ) { @@ -136,8 +143,8 @@ bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const * types += string(separator, separator_len); } - names += fields[i]->name; - types += fields[i]->TypeName(); + names += string(fields[i]->name); + types += fields[i]->TypeName().c_str(); } if ( ! (WriteHeaderField("fields", names) @@ -229,8 +236,8 @@ bool Ascii::DoWriteOne(ODesc* desc, Value* val, const Field* field) case TYPE_FILE: case TYPE_FUNC: { - int size = val->val.string_val->size(); - const char* data = val->val.string_val->data(); + int size = val->val.string_val.length; + const char* data = val->val.string_val.data; if ( ! size ) { @@ -311,8 +318,7 @@ bool Ascii::DoWriteOne(ODesc* desc, Value* val, const Field* field) } default: - Error(Fmt("unsupported field format %d for %s", val->type, - field->name.c_str())); + Error(Fmt("unsupported field format %d for %s", val->type, field->name)); return false; } @@ -366,7 +372,7 @@ write_error: return false; } -bool Ascii::DoRotate(string rotated_path, double open, double close, bool terminating) +bool Ascii::DoRotate(const char* rotated_path, double open, double close, bool terminating) { // Don't rotate special files or if there's not one currently open. if ( ! fd || IsSpecial(Info().path) ) @@ -374,10 +380,10 @@ bool Ascii::DoRotate(string rotated_path, double open, double close, bool termin CloseFile(close); - string nname = rotated_path + "." + LogExt(); + string nname = string(rotated_path) + "." + LogExt(); rename(fname.c_str(), nname.c_str()); - if ( ! FinishedRotation(nname, fname, open, close, terminating) ) + if ( ! FinishedRotation(nname.c_str(), fname.c_str(), open, close, terminating) ) { Error(Fmt("error rotating %s to %s", fname.c_str(), nname.c_str())); return false; @@ -401,19 +407,22 @@ bool Ascii::DoHeartbeat(double network_time, double current_time) string Ascii::LogExt() { const char* ext = getenv("BRO_LOG_SUFFIX"); - if ( ! ext ) ext = "log"; + if ( ! ext ) + ext = "log"; + return ext; } string Ascii::Timestamp(double t) { - struct tm tm; - char buf[128]; - const char* const date_fmt = "%Y-%m-%d-%H-%M-%S"; time_t teatime = time_t(t); - localtime_r(&teatime, &tm); - strftime(buf, sizeof(buf), date_fmt, &tm); + struct tm tmbuf; + struct tm* tm = localtime_r(&teatime, &tmbuf); + + char buf[128]; + const char* const date_fmt = "%Y-%m-%d-%H-%M-%S"; + strftime(buf, sizeof(buf), date_fmt, tm); return buf; } diff --git a/src/logging/writers/Ascii.h b/src/logging/writers/Ascii.h index 371ded4344..cb82860cb7 100644 --- a/src/logging/writers/Ascii.h +++ b/src/logging/writers/Ascii.h @@ -24,7 +24,7 @@ protected: virtual bool DoWrite(int num_fields, const threading::Field* const* fields, threading::Value** vals); virtual bool DoSetBuf(bool enabled); - virtual bool DoRotate(string rotated_path, double open, + virtual bool DoRotate(const char* rotated_path, double open, double close, bool terminating); virtual bool DoFlush(double network_time); virtual bool DoFinish(double network_time); diff --git a/src/logging/writers/DataSeries.cc b/src/logging/writers/DataSeries.cc index 1978a8b781..7d3053e341 100644 --- a/src/logging/writers/DataSeries.cc +++ b/src/logging/writers/DataSeries.cc @@ -78,10 +78,10 @@ std::string DataSeries::LogValueToString(threading::Value *val) case TYPE_STRING: case TYPE_FILE: case TYPE_FUNC: - if ( ! val->val.string_val->size() ) + if ( ! val->val.string_val.length ) return ""; - return string(val->val.string_val->data(), val->val.string_val->size()); + return string(val->val.string_val.data, val->val.string_val.length); case TYPE_TABLE: { @@ -302,7 +302,8 @@ bool DataSeries::DoInit(const WriterInfo& info, int num_fields, const threading: if( ds_dump_schema ) { - FILE* pFile = fopen ( string(info.path + ".ds.xml").c_str() , "wb" ); + string name = string(info.path) + ".ds.xml"; + FILE* pFile = fopen(name.c_str(), "wb" ); if( pFile ) { @@ -394,17 +395,17 @@ bool DataSeries::DoWrite(int num_fields, const threading::Field* const * fields, return true; } -bool DataSeries::DoRotate(string rotated_path, double open, double close, bool terminating) +bool DataSeries::DoRotate(const char* rotated_path, double open, double close, bool terminating) { // Note that if DS files are rotated too often, the aggregate log // size will be (much) larger. CloseLog(); - string dsname = Info().path + ".ds"; - string nname = rotated_path + ".ds"; + string dsname = string(Info().path) + ".ds"; + string nname = string(rotated_path) + ".ds"; rename(dsname.c_str(), nname.c_str()); - if ( ! FinishedRotation(nname, dsname, open, close, terminating) ) + if ( ! FinishedRotation(nname.c_str(), dsname.c_str(), open, close, terminating) ) { Error(Fmt("error rotating %s to %s", dsname.c_str(), nname.c_str())); return false; diff --git a/src/logging/writers/DataSeries.h b/src/logging/writers/DataSeries.h index 31d17a1a7b..9773c7ce1b 100644 --- a/src/logging/writers/DataSeries.h +++ b/src/logging/writers/DataSeries.h @@ -32,7 +32,7 @@ protected: virtual bool DoWrite(int num_fields, const threading::Field* const* fields, threading::Value** vals); virtual bool DoSetBuf(bool enabled); - virtual bool DoRotate(string rotated_path, double open, + virtual bool DoRotate(const char* rotated_path, double open, double close, bool terminating); virtual bool DoFlush(double network_time); virtual bool DoFinish(double network_time); diff --git a/src/logging/writers/None.cc b/src/logging/writers/None.cc index acf9355cf7..9b91b82199 100644 --- a/src/logging/writers/None.cc +++ b/src/logging/writers/None.cc @@ -1,4 +1,6 @@ +#include + #include "None.h" #include "NetVar.h" @@ -15,8 +17,17 @@ bool None::DoInit(const WriterInfo& info, int num_fields, std::cout << " rotation_interval=" << info.rotation_interval << std::endl; std::cout << " rotation_base=" << info.rotation_base << std::endl; - for ( std::map::const_iterator i = info.config.begin(); i != info.config.end(); i++ ) - std::cout << " config[" << i->first << "] = " << i->second << std::endl; + // Output the config sorted by keys. + + std::vector > keys; + + for ( WriterInfo::config_map::const_iterator i = info.config.begin(); i != info.config.end(); i++ ) + keys.push_back(std::make_pair(i->first, i->second)); + + std::sort(keys.begin(), keys.end()); + + for ( std::vector >::const_iterator i = keys.begin(); i != keys.end(); i++ ) + std::cout << " config[" << (*i).first << "] = " << (*i).second << std::endl; for ( int i = 0; i < num_fields; i++ ) { @@ -31,11 +42,11 @@ bool None::DoInit(const WriterInfo& info, int num_fields, return true; } -bool None::DoRotate(string rotated_path, double open, double close, bool terminating) +bool None::DoRotate(const char* rotated_path, double open, double close, bool terminating) { - if ( ! FinishedRotation(string("/dev/null"), Info().path, open, close, terminating)) + if ( ! FinishedRotation("/dev/null", Info().path, open, close, terminating)) { - Error(Fmt("error rotating %s", Info().path.c_str())); + Error(Fmt("error rotating %s", Info().path)); return false; } diff --git a/src/logging/writers/None.h b/src/logging/writers/None.h index c6d7cba56a..2a6f71a06a 100644 --- a/src/logging/writers/None.h +++ b/src/logging/writers/None.h @@ -24,7 +24,7 @@ protected: virtual bool DoWrite(int num_fields, const threading::Field* const* fields, threading::Value** vals) { return true; } virtual bool DoSetBuf(bool enabled) { return true; } - virtual bool DoRotate(string rotated_path, double open, + virtual bool DoRotate(const char* rotated_path, double open, double close, bool terminating); virtual bool DoFlush(double network_time) { return true; } virtual bool DoFinish(double network_time) { return true; } diff --git a/src/threading/BasicThread.cc b/src/threading/BasicThread.cc index e7fb3f4c84..af57c26939 100644 --- a/src/threading/BasicThread.cc +++ b/src/threading/BasicThread.cc @@ -12,18 +12,23 @@ using namespace threading; +static const int STD_FMT_BUF_LEN = 2048; + uint64_t BasicThread::thread_counter = 0; BasicThread::BasicThread() { started = false; terminating = false; + killed = false; pthread = 0; - buf_len = 2048; + buf_len = STD_FMT_BUF_LEN; buf = (char*) malloc(buf_len); - name = Fmt("thread-%d", ++thread_counter); + strerr_buffer = 0; + + name = copy_string(fmt("thread-%" PRIu64, ++thread_counter)); thread_mgr->AddThread(this); } @@ -32,31 +37,41 @@ BasicThread::~BasicThread() { if ( buf ) free(buf); + + delete [] name; + delete [] strerr_buffer; } -void BasicThread::SetName(const string& arg_name) +void BasicThread::SetName(const char* name) { - // Slight race condition here with reader threads, but shouldn't matter. - name = arg_name; + delete [] name; + name = copy_string(name); } -void BasicThread::SetOSName(const string& name) +void BasicThread::SetOSName(const char* name) { #ifdef HAVE_LINUX - prctl(PR_SET_NAME, name.c_str(), 0, 0, 0); + prctl(PR_SET_NAME, name, 0, 0, 0); #endif #ifdef __APPLE__ - pthread_setname_np(name.c_str()); + pthread_setname_np(name); #endif #ifdef FREEBSD - pthread_set_name_np(pthread_self(), name, name.c_str()); + pthread_set_name_np(pthread_self(), name, name); #endif } const char* BasicThread::Fmt(const char* format, ...) { + if ( buf_len > 10 * STD_FMT_BUF_LEN ) + { + // Shrink back to normal. + buf = (char*) safe_realloc(buf, STD_FMT_BUF_LEN); + buf_len = STD_FMT_BUF_LEN; + } + va_list al; va_start(al, format); int n = safe_vsnprintf(buf, buf_len, format, al); @@ -64,15 +79,13 @@ const char* BasicThread::Fmt(const char* format, ...) if ( (unsigned int) n >= buf_len ) { // Not enough room, grow the buffer. - int tmp_len = n + 32; - char* tmp = (char*) malloc(tmp_len); + buf_len = n + 32; + buf = (char*) safe_realloc(buf, buf_len); // Is it portable to restart? va_start(al, format); - n = safe_vsnprintf(tmp, tmp_len, format, al); + n = safe_vsnprintf(buf, buf_len, format, al); va_end(al); - - free(tmp); } return buf; @@ -94,14 +107,14 @@ void BasicThread::Start() int err = pthread_create(&pthread, 0, BasicThread::launcher, this); if ( err != 0 ) - reporter->FatalError("Cannot create thread %s:%s", name.c_str(), Strerror(err)); + reporter->FatalError("Cannot create thread %s: %s", name, Strerror(err)); - DBG_LOG(DBG_THREADING, "Started thread %s", name.c_str()); + DBG_LOG(DBG_THREADING, "Started thread %s", name); OnStart(); } -void BasicThread::Stop() +void BasicThread::PrepareStop() { if ( ! started ) return; @@ -109,11 +122,28 @@ void BasicThread::Stop() if ( terminating ) return; - DBG_LOG(DBG_THREADING, "Signaling thread %s to terminate ...", name.c_str()); + DBG_LOG(DBG_THREADING, "Preparing thread %s to terminate ...", name); - terminating = true; + OnPrepareStop(); + } + +void BasicThread::Stop() + { + // XX fprintf(stderr, "stop1 %s %d %d\n", name, started, terminating); + + if ( ! started ) + return; + + if ( terminating ) + return; + + // XX fprintf(stderr, "stop2 %s\n", name); + + DBG_LOG(DBG_THREADING, "Signaling thread %s to terminate ...", name); OnStop(); + + terminating = true; } void BasicThread::Join() @@ -123,25 +153,33 @@ void BasicThread::Join() assert(terminating); - DBG_LOG(DBG_THREADING, "Joining thread %s ...", name.c_str()); + DBG_LOG(DBG_THREADING, "Joining thread %s ...", name); if ( pthread && pthread_join(pthread, 0) != 0 ) - reporter->FatalError("Failure joining thread %s", name.c_str()); + reporter->FatalError("Failure joining thread %s", name); - DBG_LOG(DBG_THREADING, "Done with thread %s", name.c_str()); + DBG_LOG(DBG_THREADING, "Joined with thread %s", name); pthread = 0; } void BasicThread::Kill() { + // We don't *really* kill the thread here because that leads to race + // conditions. Instead we set a flag that parts of the the code need + // to check and get out of any loops they might be in. terminating = true; + killed = true; + OnKill(); + } - if ( ! (started && pthread) ) - return; +void BasicThread::Done() + { + // XX fprintf(stderr, "DONE from thread %s\n", name); + DBG_LOG(DBG_THREADING, "Thread %s has finished", name); - pthread = 0; - pthread_kill(pthread, SIGTERM); + terminating = true; + killed = true; } void* BasicThread::launcher(void *arg) @@ -161,11 +199,12 @@ void* BasicThread::launcher(void *arg) sigdelset(&mask_set, SIGSEGV); sigdelset(&mask_set, SIGBUS); int res = pthread_sigmask(SIG_BLOCK, &mask_set, 0); - assert(res == 0); // + assert(res == 0); // Run thread's main function. thread->Run(); + thread->Done(); + return 0; } - diff --git a/src/threading/BasicThread.h b/src/threading/BasicThread.h index d47eb5c3c3..037420b077 100644 --- a/src/threading/BasicThread.h +++ b/src/threading/BasicThread.h @@ -5,7 +5,6 @@ #include #include -#include "Queue.h" #include "util.h" using namespace std; @@ -42,22 +41,25 @@ public: * * This method is safe to call from any thread. */ - const string& Name() const { return name; } + const char* Name() const { return name; } /** * Sets a descriptive name for the thread. This should be a string * that's useful in output presented to the user and uniquely * identifies the thread. * - * This method must be called only from the thread itself. + * This method must be called only from main thread at initialization + * time. */ - void SetName(const string& name); + void SetName(const char* name); /** * Set the name shown by the OS as the thread's description. Not * supported on all OSs. + * + * Must be called only from the child thread. */ - void SetOSName(const string& name); + void SetOSName(const char* name); /** * Starts the thread. Calling this methods will spawn a new OS thread @@ -68,6 +70,18 @@ public: */ void Start(); + /** + * Signals the thread to prepare for stopping. This must be called + * before Stop() and allows the thread to trigger shutting down + * without yet blocking for doing so. + * + * Calling this method has no effect if Start() hasn't been executed + * yet. + * + * Only Bro's main thread must call this method. + */ + void PrepareStop(); + /** * Signals the thread to stop. The method lets Terminating() now * return true. It does however not force the thread to terminate. @@ -88,6 +102,13 @@ public: */ bool Terminating() const { return terminating; } + /** + * Returns true if Kill() has been called. + * + * This method is safe to call from any thread. + */ + bool Killed() const { return killed; } + /** * A version of fmt() that the thread can safely use. * @@ -124,12 +145,24 @@ protected: virtual void OnStart() {} /** - * Executed with Stop(). This is a hook into stopping the thread. It - * will be called from Bro's main thread after the thread has been - * signaled to stop. + * Executed with PrepareStop() (and before OnStop()). This is a hook + * into preparing the thread for stopping. It will be called from + * Bro's main thread before the thread has been signaled to stop. + */ + virtual void OnPrepareStop() {} + + /** + * Executed with Stop() (and after OnPrepareStop()). This is a hook + * into stopping the thread. It will be called from Bro's main thread + * after the thread has been signaled to stop. */ virtual void OnStop() {} + /** + * Executed with Kill(). This is a hook into killing the thread. + */ + virtual void OnKill() {} + /** * Destructor. This will be called by the manager. * @@ -153,14 +186,18 @@ protected: */ void Kill(); + /** Called by child thread's launcher when it's done processing. */ + void Done(); + private: // pthread entry function. static void* launcher(void *arg); - string name; + const char* name; pthread_t pthread; bool started; // Set to to true once running. bool terminating; // Set to to true to signal termination. + bool killed; // Set to true once forcefully killed. // Used as a semaphore to tell the pthread thread when it may // terminate. diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index f1f9307b03..b997aeec47 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -30,6 +30,10 @@ void Manager::Terminate() do Process(); while ( did_process ); // Signal all to stop. + + for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) + (*i)->PrepareStop(); + for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) (*i)->Stop(); @@ -50,14 +54,14 @@ void Manager::Terminate() void Manager::AddThread(BasicThread* thread) { - DBG_LOG(DBG_THREADING, "Adding thread %s ...", thread->Name().c_str()); + DBG_LOG(DBG_THREADING, "Adding thread %s ...", thread->Name()); all_threads.push_back(thread); idle = false; } void Manager::AddMsgThread(MsgThread* thread) { - DBG_LOG(DBG_THREADING, "%s is a MsgThread ...", thread->Name().c_str()); + DBG_LOG(DBG_THREADING, "%s is a MsgThread ...", thread->Name()); msg_threads.push_back(thread); } @@ -114,6 +118,12 @@ void Manager::Process() { Message* msg = t->RetrieveOut(); + if ( ! msg ) + { + assert(t->Killed()); + break; + } + if ( msg->Process() ) { if ( network_time ) @@ -122,10 +132,9 @@ void Manager::Process() else { - string s = msg->Name() + " failed, terminating thread"; - reporter->Error("%s", s.c_str()); + reporter->Error("%s failed, terminating thread", msg->Name()); t->Stop(); - } + } delete msg; } diff --git a/src/threading/MsgThread.cc b/src/threading/MsgThread.cc index 3913624654..3e06a3fe1e 100644 --- a/src/threading/MsgThread.cc +++ b/src/threading/MsgThread.cc @@ -29,16 +29,6 @@ private: double network_time; }; -// A dummy message that's only purpose is unblock the current read operation -// so that the child's Run() methods can check the termination status. -class UnblockMessage : public InputMessage -{ -public: - UnblockMessage(MsgThread* thread) : InputMessage("Unblock", thread) { } - - virtual bool Process() { return true; } -}; - /// Sends a heartbeat to the child thread. class HeartbeatMessage : public InputMessage { @@ -66,14 +56,16 @@ public: INTERNAL_WARNING, INTERNAL_ERROR }; - ReporterMessage(Type arg_type, MsgThread* thread, const string& arg_msg) + ReporterMessage(Type arg_type, MsgThread* thread, const char* arg_msg) : OutputMessage("ReporterMessage", thread) - { type = arg_type; msg = arg_msg; } + { type = arg_type; msg = copy_string(arg_msg); } + + ~ReporterMessage() { delete [] msg; } virtual bool Process(); private: - string msg; + const char* msg; Type type; }; @@ -82,18 +74,19 @@ private: class DebugMessage : public OutputMessage { public: - DebugMessage(DebugStream arg_stream, MsgThread* thread, const string& arg_msg) + DebugMessage(DebugStream arg_stream, MsgThread* thread, const char* arg_msg) : OutputMessage("DebugMessage", thread) - { stream = arg_stream; msg = arg_msg; } + { stream = arg_stream; msg = copy_string(arg_msg); } + + virtual ~DebugMessage() { delete [] msg; } virtual bool Process() { - string s = Object()->Name() + ": " + msg; - debug_logger.Log(stream, "%s", s.c_str()); + debug_logger.Log(stream, "%s: %s", Object()->Name(), msg); return true; } private: - string msg; + const char* msg; DebugStream stream; }; #endif @@ -104,41 +97,39 @@ private: Message::~Message() { + delete [] name; } bool ReporterMessage::Process() { - string s = Object()->Name() + ": " + msg; - const char* cmsg = s.c_str(); - switch ( type ) { case INFO: - reporter->Info("%s", cmsg); + reporter->Info("%s: %s", Object()->Name(), msg); break; case WARNING: - reporter->Warning("%s", cmsg); + reporter->Warning("%s: %s", Object()->Name(), msg); break; case ERROR: - reporter->Error("%s", cmsg); + reporter->Error("%s: %s", Object()->Name(), msg); break; case FATAL_ERROR: - reporter->FatalError("%s", cmsg); + reporter->FatalError("%s: %s", Object()->Name(), msg); break; case FATAL_ERROR_WITH_CORE: - reporter->FatalErrorWithCore("%s", cmsg); + reporter->FatalErrorWithCore("%s: %s", Object()->Name(), msg); break; case INTERNAL_WARNING: - reporter->InternalWarning("%s", cmsg); + reporter->InternalWarning("%s: %s", Object()->Name(), msg); break; case INTERNAL_ERROR : - reporter->InternalError("%s", cmsg); + reporter->InternalError("%s: %s", Object()->Name(), msg); break; default: @@ -148,62 +139,78 @@ bool ReporterMessage::Process() return true; } -MsgThread::MsgThread() : BasicThread() +MsgThread::MsgThread() : BasicThread(), queue_in(this, 0), queue_out(0, this) { cnt_sent_in = cnt_sent_out = 0; finished = false; - stopped = false; thread_mgr->AddMsgThread(this); } // Set by Bro's main signal handler. extern int signal_val; -void MsgThread::OnStop() +void MsgThread::OnPrepareStop() { - if ( stopped ) + if ( finished || Killed() ) return; + // XX fprintf(stderr, "Sending FINISH to thread %s ...\n", Name()); + // Signal thread to terminate and wait until it has acknowledged. SendIn(new FinishMessage(this, network_time), true); + } +void MsgThread::OnStop() + { + int signal_count = 0; int old_signal_val = signal_val; signal_val = 0; int cnt = 0; - bool aborted = 0; + uint64_t last_size = 0; + uint64_t cur_size = 0; - while ( ! finished ) + // XX fprintf(stderr, "WAITING for thread %s to stop ...\n", Name()); + + while ( ! (finished || Killed() ) ) { // Terminate if we get another kill signal. if ( signal_val == SIGTERM || signal_val == SIGINT ) { - // Abort all threads here so that we won't hang next - // on another one. - fprintf(stderr, "received signal while waiting for thread %s, aborting all ...\n", Name().c_str()); - thread_mgr->KillThreads(); - aborted = true; - break; + ++signal_count; + + if ( signal_count == 1 ) + { + // Abort all threads here so that we won't hang next + // on another one. + fprintf(stderr, "received signal while waiting for thread %s, aborting all ...\n", Name()); + thread_mgr->KillThreads(); + } + else + { + // More than one signal. Abort processing + // right away. on another one. + fprintf(stderr, "received another signal while waiting for thread %s, aborting processing\n", Name()); + exit(1); + } + + signal_val = 0; } - if ( ++cnt % 10000 == 0 ) // Insurance against broken threads ... - { - fprintf(stderr, "killing thread %s ...\n", Name().c_str()); - Kill(); - aborted = true; - break; - } + queue_in.WakeUp(); usleep(1000); } - Finished(); - signal_val = old_signal_val; + } - // One more message to make sure the current queue read operation unblocks. - if ( ! aborted ) - SendIn(new UnblockMessage(this), true); +void MsgThread::OnKill() + { + // Send a message to unblock the reader if its currently waiting for + // input. This is just an optimization to make it terminate more + // quickly, even without the message it will eventually time out. + queue_in.WakeUp(); } void MsgThread::Heartbeat() @@ -213,9 +220,7 @@ void MsgThread::Heartbeat() void MsgThread::HeartbeatInChild() { - string n = Name(); - - n = Fmt("bro: %s (%" PRIu64 "/%" PRIu64 ")", n.c_str(), + string n = Fmt("bro: %s (%" PRIu64 "/%" PRIu64 ")", Name(), cnt_sent_in - queue_in.Size(), cnt_sent_out - queue_out.Size()); @@ -283,7 +288,7 @@ void MsgThread::SendIn(BasicInputMessage* msg, bool force) return; } - DBG_LOG(DBG_THREADING, "Sending '%s' to %s ...", msg->Name().c_str(), Name().c_str()); + DBG_LOG(DBG_THREADING, "Sending '%s' to %s ...", msg->Name(), Name()); queue_in.Put(msg); ++cnt_sent_in; @@ -306,9 +311,10 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force) BasicOutputMessage* MsgThread::RetrieveOut() { BasicOutputMessage* msg = queue_out.Get(); - assert(msg); + if ( ! msg ) + return 0; - DBG_LOG(DBG_THREADING, "Retrieved '%s' from %s", msg->Name().c_str(), Name().c_str()); + DBG_LOG(DBG_THREADING, "Retrieved '%s' from %s", msg->Name(), Name()); return msg; } @@ -316,10 +322,12 @@ BasicOutputMessage* MsgThread::RetrieveOut() BasicInputMessage* MsgThread::RetrieveIn() { BasicInputMessage* msg = queue_in.Get(); - assert(msg); + + if ( ! msg ) + return 0; #ifdef DEBUG - string s = Fmt("Retrieved '%s' in %s", msg->Name().c_str(), Name().c_str()); + string s = Fmt("Retrieved '%s' in %s", msg->Name(), Name()); Debug(DBG_THREADING, s.c_str()); #endif @@ -328,15 +336,18 @@ BasicInputMessage* MsgThread::RetrieveIn() void MsgThread::Run() { - while ( ! finished ) + while ( ! (finished || Killed() ) ) { BasicInputMessage* msg = RetrieveIn(); + if ( ! msg ) + continue; + bool result = msg->Process(); if ( ! result ) { - string s = msg->Name() + " failed, terminating thread (MsgThread)"; + string s = Fmt("%s failed, terminating thread (MsgThread)", Name()); Error(s.c_str()); break; } @@ -344,7 +355,7 @@ void MsgThread::Run() delete msg; } - Finished(); + Finished(); } void MsgThread::GetStats(Stats* stats) diff --git a/src/threading/MsgThread.h b/src/threading/MsgThread.h index d929c1f806..1d9b17c7d9 100644 --- a/src/threading/MsgThread.h +++ b/src/threading/MsgThread.h @@ -228,6 +228,8 @@ protected: */ virtual void Run(); virtual void OnStop(); + virtual void OnPrepareStop(); + virtual void OnKill(); private: /** @@ -293,7 +295,6 @@ private: uint64_t cnt_sent_out; // Counts message sent by child. bool finished; // Set to true by Finished message. - bool stopped; // Set to true by OnStop(). }; /** @@ -312,7 +313,7 @@ public: * what's passed into the constructor and used mainly for debugging * purposes. */ - const string& Name() const { return name; } + const char* Name() const { return name; } /** * Callback that must be overriden for processing a message. @@ -326,10 +327,11 @@ protected: * @param arg_name A descriptive name for the type of message. Used * mainly for debugging purposes. */ - Message(const string& arg_name) { name = arg_name; } + Message(const char* arg_name) + { name = copy_string(arg_name); } private: - string name; + const char* name; }; /** @@ -344,7 +346,7 @@ protected: * @param name A descriptive name for the type of message. Used * mainly for debugging purposes. */ - BasicInputMessage(const string& name) : Message(name) {} + BasicInputMessage(const char* name) : Message(name) {} }; /** @@ -359,7 +361,7 @@ protected: * @param name A descriptive name for the type of message. Used * mainly for debugging purposes. */ - BasicOutputMessage(const string& name) : Message(name) {} + BasicOutputMessage(const char* name) : Message(name) {} }; /** @@ -384,7 +386,7 @@ protected: * * @param arg_object: An object to store with the message. */ - InputMessage(const string& name, O* arg_object) : BasicInputMessage(name) + InputMessage(const char* name, O* arg_object) : BasicInputMessage(name) { object = arg_object; } private: @@ -413,7 +415,7 @@ protected: * * @param arg_object An object to store with the message. */ - OutputMessage(const string& name, O* arg_object) : BasicOutputMessage(name) + OutputMessage(const char* name, O* arg_object) : BasicOutputMessage(name) { object = arg_object; } private: diff --git a/src/threading/Queue.h b/src/threading/Queue.h index b2ccd2a0ce..29a8084352 100644 --- a/src/threading/Queue.h +++ b/src/threading/Queue.h @@ -1,4 +1,3 @@ - #ifndef THREADING_QUEUE_H #define THREADING_QUEUE_H @@ -6,11 +5,28 @@ #include #include #include +#include #include "Reporter.h" +#include "BasicThread.h" #undef Queue // Defined elsewhere unfortunately. +#if 1 +// We don't have pthread spinlocks on DARWIN. +# define PTHREAD_MUTEX_T pthread_mutex_t +# define PTHREAD_MUTEX_LOCK(x) pthread_mutex_lock(x) +# define PTHREAD_MUTEX_UNLOCK(x) pthread_mutex_unlock(x) +# define PTHREAD_MUTEX_INIT(x) pthread_mutex_init(x, 0) +# define PTHREAD_MUTEX_DESTROY(x) pthread_mutex_destroy(x) +#else +# define PTHREAD_MUTEX_T pthrea_spinlock_T +# define PTHREAD_MUTEX_LOCK(x) pthrea_spin_lock(x) +# define PTHREAD_MUTEX_UNLOCK(x) pthrea_spin_unlock(x) +# define PTHREAD_MUTEX_INIT(x) pthrea_spin_init(x, PTHREAD_PROCESS_PRIVATE) +# define PTHREAD_MUTEX_DESTROY(x) pthrea_spin_destroy(x) +#endif + namespace threading { /** @@ -30,8 +46,12 @@ class Queue public: /** * Constructor. + * + * reader, writer: The corresponding threads. This is for checking + * whether they have terminated so that we can abort I/O opeations. + * Can be left null for the main thread. */ - Queue(); + Queue(BasicThread* arg_reader, BasicThread* arg_writer); /** * Destructor. @@ -39,7 +59,9 @@ public: ~Queue(); /** - * Retrieves one elment. + * Retrieves one elment. This may block for a little while of no + * input is available and eventually return with a null element if + * nothing shows up. */ T Get(); @@ -60,6 +82,11 @@ public: */ bool MaybeReady() { return ( ( read_ptr - write_ptr) != 0 ); } + /** Wake up the reader if it's currently blocked for input. This is + primarily to give it a chance to check termination quickly. + **/ + void WakeUp(); + /** * Returns the number of queued items not yet retrieved. */ @@ -82,45 +109,50 @@ public: void GetStats(Stats* stats); private: - static const int NUM_QUEUES = 8; + static const int NUM_QUEUES = 15; - pthread_mutex_t mutex[NUM_QUEUES]; // Mutex protected shared accesses. + PTHREAD_MUTEX_T mutex[NUM_QUEUES]; // Mutex protected shared accesses. pthread_cond_t has_data[NUM_QUEUES]; // Signals when data becomes available std::queue messages[NUM_QUEUES]; // Actually holds the queued messages int read_ptr; // Where the next operation will read from int write_ptr; // Where the next operation will write to + BasicThread* reader; + BasicThread* writer; + // Statistics. uint64_t num_reads; uint64_t num_writes; }; -inline static void safe_lock(pthread_mutex_t* mutex) +inline static void safe_lock(PTHREAD_MUTEX_T* mutex) { - if ( pthread_mutex_lock(mutex) != 0 ) + if ( PTHREAD_MUTEX_LOCK(mutex) != 0 ) reporter->FatalErrorWithCore("cannot lock mutex"); } -inline static void safe_unlock(pthread_mutex_t* mutex) +inline static void safe_unlock(PTHREAD_MUTEX_T* mutex) { - if ( pthread_mutex_unlock(mutex) != 0 ) + if ( PTHREAD_MUTEX_UNLOCK(mutex) != 0 ) reporter->FatalErrorWithCore("cannot unlock mutex"); } template -inline Queue::Queue() +inline Queue::Queue(BasicThread* arg_reader, BasicThread* arg_writer) { read_ptr = 0; write_ptr = 0; num_reads = num_writes = 0; + reader = arg_reader; + writer = arg_writer; for( int i = 0; i < NUM_QUEUES; ++i ) { - if ( pthread_cond_init(&has_data[i], NULL) != 0 ) + if ( pthread_cond_init(&has_data[i], 0) != 0 ) reporter->FatalError("cannot init queue condition variable"); - if ( pthread_mutex_init(&mutex[i], NULL) != 0 ) + if ( PTHREAD_MUTEX_INIT(&mutex[i]) != 0 ) reporter->FatalError("cannot init queue mutex"); } } @@ -131,19 +163,30 @@ inline Queue::~Queue() for( int i = 0; i < NUM_QUEUES; ++i ) { pthread_cond_destroy(&has_data[i]); - pthread_mutex_destroy(&mutex[i]); + PTHREAD_MUTEX_DESTROY(&mutex[i]); } } template inline T Queue::Get() { + if ( (reader && reader->Killed()) || (writer && writer->Killed()) ) + return 0; + safe_lock(&mutex[read_ptr]); int old_read_ptr = read_ptr; if ( messages[read_ptr].empty() ) - pthread_cond_wait(&has_data[read_ptr], &mutex[read_ptr]); + { + struct timespec ts; + ts.tv_sec = time(0) + 5; + ts.tv_nsec = 0; + + pthread_cond_timedwait(&has_data[read_ptr], &mutex[read_ptr], &ts); + safe_unlock(&mutex[read_ptr]); + return 0; + } T data = messages[read_ptr].front(); messages[read_ptr].pop(); @@ -222,6 +265,17 @@ inline void Queue::GetStats(Stats* stats) safe_unlock(&mutex[i]); } +template +inline void Queue::WakeUp() + { + for ( int i = 0; i < NUM_QUEUES; i++ ) + { + safe_lock(&mutex[i]); + pthread_cond_signal(&has_data[i]); + safe_unlock(&mutex[i]); + } + } + } diff --git a/src/threading/SerialTypes.cc b/src/threading/SerialTypes.cc index 4494e1b245..c0e26ccb32 100644 --- a/src/threading/SerialTypes.cc +++ b/src/threading/SerialTypes.cc @@ -11,23 +11,54 @@ bool Field::Read(SerializationFormat* fmt) { int t; int st; + string tmp_name; + bool have_2nd; - bool success = (fmt->Read(&name, "name") - && fmt->Read(&secondary_name, "secondary_name") + if ( ! fmt->Read(&have_2nd, "have_2nd") ) + return false; + + if ( have_2nd ) + { + string tmp_secondary_name; + if ( ! fmt->Read(&tmp_secondary_name, "secondary_name") ) + return false; + + secondary_name = copy_string(tmp_secondary_name.c_str()); + } + else + secondary_name = 0; + + bool success = (fmt->Read(&tmp_name, "name") && fmt->Read(&t, "type") && fmt->Read(&st, "subtype") && fmt->Read(&optional, "optional")); + if ( ! success ) + return false; + + name = copy_string(tmp_name.c_str()); + type = (TypeTag) t; subtype = (TypeTag) st; - return success; + return true; } bool Field::Write(SerializationFormat* fmt) const { + assert(name); + + if ( secondary_name ) + { + if ( ! (fmt->Write(true, "have_2nd") + && fmt->Write(secondary_name, "secondary_name")) ) + return false; + } + else + if ( ! fmt->Write(false, "have_2nd") ) + return false; + return (fmt->Write(name, "name") - && fmt->Write(secondary_name, "secondary_name") && fmt->Write((int)type, "type") && fmt->Write((int)subtype, "subtype"), fmt->Write(optional, "optional")); @@ -51,7 +82,7 @@ Value::~Value() { if ( (type == TYPE_ENUM || type == TYPE_STRING || type == TYPE_FILE || type == TYPE_FUNC) && present ) - delete val.string_val; + delete [] val.string_val.data; if ( type == TYPE_TABLE && present ) { @@ -224,10 +255,7 @@ bool Value::Read(SerializationFormat* fmt) case TYPE_STRING: case TYPE_FILE: case TYPE_FUNC: - { - val.string_val = new string; - return fmt->Read(val.string_val, "string"); - } + return fmt->Read(&val.string_val.data, &val.string_val.length, "string"); case TYPE_TABLE: { @@ -339,7 +367,7 @@ bool Value::Write(SerializationFormat* fmt) const case TYPE_STRING: case TYPE_FILE: case TYPE_FUNC: - return fmt->Write(*val.string_val, "string"); + return fmt->Write(val.string_val.data, val.string_val.length, "string"); case TYPE_TABLE: { diff --git a/src/threading/SerialTypes.h b/src/threading/SerialTypes.h index 283d88bf4c..60aee2411e 100644 --- a/src/threading/SerialTypes.h +++ b/src/threading/SerialTypes.h @@ -12,6 +12,7 @@ using namespace std; class SerializationFormat; +class RemoteSerializer; namespace threading { @@ -19,10 +20,10 @@ namespace threading { * Definition of a log file, i.e., one column of a log stream. */ struct Field { - string name; //! Name of the field. + const char* name; //! Name of the field. //! Needed by input framework. Port fields have two names (one for the //! port, one for the type), and this specifies the secondary name. - string secondary_name; + const char* secondary_name; TypeTag type; //! Type of the field. TypeTag subtype; //! Inner type for sets. bool optional; //! True if field is optional. @@ -30,13 +31,24 @@ struct Field { /** * Constructor. */ - Field() { subtype = TYPE_VOID; optional = false; } + Field(const char* name, const char* secondary_name, TypeTag type, TypeTag subtype, bool optional) + : name(name ? copy_string(name) : 0), + secondary_name(secondary_name ? copy_string(secondary_name) : 0), + type(type), subtype(subtype), optional(optional) { } /** * Copy constructor. */ Field(const Field& other) - : name(other.name), type(other.type), subtype(other.subtype), optional(other.optional) { } + : name(other.name ? copy_string(other.name) : 0), + secondary_name(other.secondary_name ? copy_string(other.secondary_name) : 0), + type(other.type), subtype(other.subtype), optional(other.optional) { } + + ~Field() + { + delete [] name; + delete [] secondary_name; + } /** * Unserializes a field. @@ -63,6 +75,12 @@ struct Field { * thread-safe. */ string TypeName() const; + +private: + friend class ::RemoteSerializer; + + // Force usage of constructor above. + Field() {}; }; /** @@ -102,7 +120,11 @@ struct Value { vec_t vector_val; addr_t addr_val; subnet_t subnet_val; - string* string_val; + + struct { + char* data; + int length; + } string_val; } val; /** @@ -147,7 +169,7 @@ struct Value { static bool IsCompatibleType(BroType* t, bool atomic_only=false); private: -friend class ::IPAddr; + friend class ::IPAddr; Value(const Value& other) { } // Disabled. }; diff --git a/testing/btest/istate/events.bro b/testing/btest/istate/events.bro index 1f05dfc729..9298ac1c01 100644 --- a/testing/btest/istate/events.bro +++ b/testing/btest/istate/events.bro @@ -11,8 +11,8 @@ # @TEST-EXEC: cat receiver/http.log $SCRIPTS/diff-remove-timestamps >receiver.http.log # @TEST-EXEC: cmp sender.http.log receiver.http.log # -# @TEST-EXEC: bro -x sender/events.bst | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' >events.snd.log -# @TEST-EXEC: bro -x receiver/events.bst | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' >events.rec.log +# @TEST-EXEC: bro -x sender/events.bst | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' | $SCRIPTS/diff-remove-timestamps >events.snd.log +# @TEST-EXEC: bro -x receiver/events.bst | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' | $SCRIPTS/diff-remove-timestamps >events.rec.log # @TEST-EXEC: btest-diff events.rec.log # @TEST-EXEC: btest-diff events.snd.log # @TEST-EXEC: cmp events.rec.log events.snd.log diff --git a/testing/scripts/diff-remove-timestamps b/testing/scripts/diff-remove-timestamps index 2b029789de..9398c1cb4b 100755 --- a/testing/scripts/diff-remove-timestamps +++ b/testing/scripts/diff-remove-timestamps @@ -3,6 +3,4 @@ # Replace anything which looks like timestamps with XXXs (including the #start/end markers in logs). sed 's/[0-9]\{10\}\.[0-9]\{2,8\}/XXXXXXXXXX.XXXXXX/g' | \ -sed 's/^#\(start\|end\).20..-..-..-..-..-..$/#\1 XXXX-XX-XX-XX-XX-XX/g' | \ -grep -v '#start' | grep -v '#end' - +sed 's/^#\(start\|end\).20..-..-..-..-..-..$/#\1 XXXX-XX-XX-XX-XX-XX/g'