Further threading and API restructuring for logging and input

frameworks.

There were a number of cases that weren't thread-safe. In particular,
we don't use std::string anymore for anything that's passed between
threads (but instead plain old const char*, with manual memmory
managmenet).

This is still a check-point commit, I'll do more testing.
This commit is contained in:
Robin Sommer 2012-07-18 12:47:13 -07:00
parent 490859cfef
commit 87e10b5f97
31 changed files with 692 additions and 381 deletions

View file

@ -2692,12 +2692,12 @@ bool RemoteSerializer::ProcessLogCreateWriter()
int id, writer; int id, writer;
int num_fields; int num_fields;
logging::WriterBackend::WriterInfo info; logging::WriterBackend::WriterInfo* info = new logging::WriterBackend::WriterInfo();
bool success = fmt.Read(&id, "id") && bool success = fmt.Read(&id, "id") &&
fmt.Read(&writer, "writer") && fmt.Read(&writer, "writer") &&
fmt.Read(&num_fields, "num_fields") && fmt.Read(&num_fields, "num_fields") &&
info.Read(&fmt); info->Read(&fmt);
if ( ! success ) if ( ! success )
goto error; goto error;

View file

@ -71,7 +71,7 @@ declare(PDict, InputHash);
class Manager::Stream { class Manager::Stream {
public: public:
string name; string name;
ReaderBackend::ReaderInfo info; ReaderBackend::ReaderInfo* info;
bool removed; bool removed;
StreamType stream_type; // to distinguish between event and table streams StreamType stream_type; // to distinguish between event and table streams
@ -257,7 +257,6 @@ ReaderBackend* Manager::CreateBackend(ReaderFrontend* frontend, bro_int_t type)
assert(ir->factory); assert(ir->factory);
frontend->SetTypeName(ir->name);
ReaderBackend* backend = (*ir->factory)(frontend); ReaderBackend* backend = (*ir->factory)(frontend);
assert(backend); assert(backend);
@ -291,9 +290,6 @@ bool Manager::CreateStream(Stream* info, RecordVal* description)
EnumVal* reader = description->LookupWithDefault(rtype->FieldOffset("reader"))->AsEnumVal(); EnumVal* reader = description->LookupWithDefault(rtype->FieldOffset("reader"))->AsEnumVal();
ReaderFrontend* reader_obj = new ReaderFrontend(reader->InternalInt());
assert(reader_obj);
// get the source ... // get the source ...
Val* sourceval = description->LookupWithDefault(rtype->FieldOffset("source")); Val* sourceval = description->LookupWithDefault(rtype->FieldOffset("source"));
assert ( sourceval != 0 ); assert ( sourceval != 0 );
@ -301,21 +297,22 @@ bool Manager::CreateStream(Stream* info, RecordVal* description)
string source((const char*) bsource->Bytes(), bsource->Len()); string source((const char*) bsource->Bytes(), bsource->Len());
Unref(sourceval); Unref(sourceval);
EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal(); ReaderBackend::ReaderInfo* rinfo = new ReaderBackend::ReaderInfo();
Val* config = description->LookupWithDefault(rtype->FieldOffset("config")); rinfo->source = copy_string(source.c_str());
EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal();
switch ( mode->InternalInt() ) switch ( mode->InternalInt() )
{ {
case 0: case 0:
info->info.mode = MODE_MANUAL; rinfo->mode = MODE_MANUAL;
break; break;
case 1: case 1:
info->info.mode = MODE_REREAD; rinfo->mode = MODE_REREAD;
break; break;
case 2: case 2:
info->info.mode = MODE_STREAM; rinfo->mode = MODE_STREAM;
break; break;
default: default:
@ -324,12 +321,16 @@ bool Manager::CreateStream(Stream* info, RecordVal* description)
Unref(mode); Unref(mode);
Val* config = description->LookupWithDefault(rtype->FieldOffset("config"));
ReaderFrontend* reader_obj = new ReaderFrontend(*rinfo, reader);
assert(reader_obj);
info->reader = reader_obj; info->reader = reader_obj;
info->type = reader->AsEnumVal(); // ref'd by lookupwithdefault info->type = reader->AsEnumVal(); // ref'd by lookupwithdefault
info->name = name; info->name = name;
info->config = config->AsTableVal(); // ref'd by LookupWithDefault info->config = config->AsTableVal(); // ref'd by LookupWithDefault
info->info = rinfo;
info->info.source = source;
Ref(description); Ref(description);
info->description = description; info->description = description;
@ -344,7 +345,7 @@ bool Manager::CreateStream(Stream* info, RecordVal* description)
ListVal* index = info->config->RecoverIndex(k); ListVal* index = info->config->RecoverIndex(k);
string key = index->Index(0)->AsString()->CheckString(); string key = index->Index(0)->AsString()->CheckString();
string value = v->Value()->AsString()->CheckString(); string value = v->Value()->AsString()->CheckString();
info->info.config.insert(std::make_pair(key, value)); info->info->config.insert(std::make_pair(copy_string(key.c_str()), copy_string(value.c_str())));
Unref(index); Unref(index);
delete k; delete k;
} }
@ -475,7 +476,7 @@ bool Manager::CreateEventStream(RecordVal* fval)
assert(stream->reader); assert(stream->reader);
stream->reader->Init(stream->info, stream->num_fields, logf ); stream->reader->Init(stream->num_fields, logf );
readers[stream->reader] = stream; readers[stream->reader] = stream;
@ -652,7 +653,7 @@ bool Manager::CreateTableStream(RecordVal* fval)
assert(stream->reader); assert(stream->reader);
stream->reader->Init(stream->info, fieldsV.size(), fields ); stream->reader->Init(fieldsV.size(), fields );
readers[stream->reader] = stream; readers[stream->reader] = stream;
@ -791,17 +792,19 @@ bool Manager::UnrollRecordType(vector<Field*> *fields,
else else
{ {
Field* field = new Field(); string name = nameprepend + rec->FieldName(i);
field->name = nameprepend + rec->FieldName(i); const char* secondary = 0;
field->type = rec->FieldType(i)->Tag(); TypeTag ty = rec->FieldType(i)->Tag();
TypeTag st = TYPE_VOID;
bool optional = false;
if ( field->type == TYPE_TABLE ) if ( ty == TYPE_TABLE )
field->subtype = rec->FieldType(i)->AsSetType()->Indices()->PureType()->Tag(); st = rec->FieldType(i)->AsSetType()->Indices()->PureType()->Tag();
else if ( field->type == TYPE_VECTOR ) else if ( ty == TYPE_VECTOR )
field->subtype = rec->FieldType(i)->AsVectorType()->YieldType()->Tag(); st = rec->FieldType(i)->AsVectorType()->YieldType()->Tag();
else if ( field->type == TYPE_PORT && else if ( ty == TYPE_PORT &&
rec->FieldDecl(i)->FindAttr(ATTR_TYPE_COLUMN) ) rec->FieldDecl(i)->FindAttr(ATTR_TYPE_COLUMN) )
{ {
// we have an annotation for the second column // we have an annotation for the second column
@ -811,12 +814,13 @@ bool Manager::UnrollRecordType(vector<Field*> *fields,
assert(c); assert(c);
assert(c->Type()->Tag() == TYPE_STRING); assert(c->Type()->Tag() == TYPE_STRING);
field->secondary_name = c->AsStringVal()->AsString()->CheckString(); secondary = c->AsStringVal()->AsString()->CheckString();
} }
if ( rec->FieldDecl(i)->FindAttr(ATTR_OPTIONAL ) ) if ( rec->FieldDecl(i)->FindAttr(ATTR_OPTIONAL ) )
field->optional = true; optional = true;
Field* field = new Field(name.c_str(), secondary, ty, st, optional);
fields->push_back(field); fields->push_back(field);
} }
} }
@ -1230,7 +1234,7 @@ void Manager::EndCurrentSend(ReaderFrontend* reader)
#endif #endif
// Send event that the current update is indeed finished. // Send event that the current update is indeed finished.
SendEvent(update_finished, 2, new StringVal(i->name.c_str()), new StringVal(i->info.source.c_str())); SendEvent(update_finished, 2, new StringVal(i->name.c_str()), new StringVal(i->info->source));
} }
void Manager::Put(ReaderFrontend* reader, Value* *vals) void Manager::Put(ReaderFrontend* reader, Value* *vals)
@ -1707,7 +1711,7 @@ int Manager::GetValueLength(const Value* val) {
case TYPE_STRING: case TYPE_STRING:
case TYPE_ENUM: case TYPE_ENUM:
{ {
length += val->val.string_val->size(); length += val->val.string_val.length;
break; break;
} }
@ -1806,8 +1810,8 @@ int Manager::CopyValue(char *data, const int startpos, const Value* val)
case TYPE_STRING: case TYPE_STRING:
case TYPE_ENUM: case TYPE_ENUM:
{ {
memcpy(data+startpos, val->val.string_val->c_str(), val->val.string_val->length()); memcpy(data+startpos, val->val.string_val.data, val->val.string_val.length);
return val->val.string_val->size(); return val->val.string_val.length;
} }
case TYPE_ADDR: case TYPE_ADDR:
@ -1955,7 +1959,7 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type)
case TYPE_STRING: case TYPE_STRING:
{ {
BroString *s = new BroString(*(val->val.string_val)); BroString *s = new BroString((const u_char*)val->val.string_val.data, val->val.string_val.length, 0);
return new StringVal(s); return new StringVal(s);
} }
@ -2039,8 +2043,8 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type)
case TYPE_ENUM: { case TYPE_ENUM: {
// well, this is kind of stupid, because EnumType just mangles the module name and the var name together again... // well, this is kind of stupid, because EnumType just mangles the module name and the var name together again...
// but well // but well
string module = extract_module_name(val->val.string_val->c_str()); string module = extract_module_name(val->val.string_val.data);
string var = extract_var_name(val->val.string_val->c_str()); string var = extract_var_name(val->val.string_val.data);
bro_int_t index = request_type->AsEnumType()->Lookup(module, var.c_str()); bro_int_t index = request_type->AsEnumType()->Lookup(module, var.c_str());
if ( index == -1 ) if ( index == -1 )
reporter->InternalError("Value not found in enum mappimg. Module: %s, var: %s", reporter->InternalError("Value not found in enum mappimg. Module: %s, var: %s",

View file

@ -56,22 +56,24 @@ private:
class SendEventMessage : public threading::OutputMessage<ReaderFrontend> { class SendEventMessage : public threading::OutputMessage<ReaderFrontend> {
public: public:
SendEventMessage(ReaderFrontend* reader, const string& name, const int num_vals, Value* *val) SendEventMessage(ReaderFrontend* reader, const char* name, const int num_vals, Value* *val)
: threading::OutputMessage<ReaderFrontend>("SendEvent", reader), : threading::OutputMessage<ReaderFrontend>("SendEvent", reader),
name(name), num_vals(num_vals), val(val) {} name(copy_string(name)), num_vals(num_vals), val(val) {}
virtual ~SendEventMessage() { delete [] name; }
virtual bool Process() virtual bool Process()
{ {
bool success = input_mgr->SendEvent(name, num_vals, val); bool success = input_mgr->SendEvent(name, num_vals, val);
if ( ! success ) if ( ! success )
reporter->Error("SendEvent for event %s failed", name.c_str()); reporter->Error("SendEvent for event %s failed", name);
return true; // We do not want to die if sendEvent fails because the event did not return. return true; // We do not want to die if sendEvent fails because the event did not return.
} }
private: private:
const string name; const char* name;
const int num_vals; const int num_vals;
Value* *val; Value* *val;
}; };
@ -146,12 +148,14 @@ ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread()
{ {
disabled = true; // disabled will be set correcty in init. disabled = true; // disabled will be set correcty in init.
frontend = arg_frontend; frontend = arg_frontend;
info = new ReaderInfo(frontend->Info());
SetName(frontend->Name()); SetName(frontend->Name());
} }
ReaderBackend::~ReaderBackend() ReaderBackend::~ReaderBackend()
{ {
delete info;
} }
void ReaderBackend::Put(Value* *val) void ReaderBackend::Put(Value* *val)
@ -169,7 +173,7 @@ void ReaderBackend::Clear()
SendOut(new ClearMessage(frontend)); SendOut(new ClearMessage(frontend));
} }
void ReaderBackend::SendEvent(const string& name, const int num_vals, Value* *vals) void ReaderBackend::SendEvent(const char* name, const int num_vals, Value* *vals)
{ {
SendOut(new SendEventMessage(frontend, name, num_vals, vals)); SendOut(new SendEventMessage(frontend, name, num_vals, vals));
} }
@ -184,17 +188,14 @@ void ReaderBackend::SendEntry(Value* *vals)
SendOut(new SendEntryMessage(frontend, vals)); SendOut(new SendEntryMessage(frontend, vals));
} }
bool ReaderBackend::Init(const ReaderInfo& arg_info, const int arg_num_fields, bool ReaderBackend::Init(const int arg_num_fields,
const threading::Field* const* arg_fields) const threading::Field* const* arg_fields)
{ {
info = arg_info;
num_fields = arg_num_fields; num_fields = arg_num_fields;
fields = arg_fields; fields = arg_fields;
SetName("InputReader/"+info.source);
// disable if DoInit returns error. // disable if DoInit returns error.
int success = DoInit(arg_info, arg_num_fields, arg_fields); int success = DoInit(*info, arg_num_fields, arg_fields);
if ( ! success ) if ( ! success )
{ {

View file

@ -34,7 +34,10 @@ enum ReaderMode {
* for new appended data. When new data is appended is has to be sent * for new appended data. When new data is appended is has to be sent
* using the Put api functions. * using the Put api functions.
*/ */
MODE_STREAM MODE_STREAM,
/** Internal dummy mode for initialization. */
MODE_NONE
}; };
class ReaderFrontend; class ReaderFrontend;
@ -70,14 +73,17 @@ public:
*/ */
struct ReaderInfo struct ReaderInfo
{ {
typedef std::map<string, string> config_map; // Structure takes ownership of the strings.
typedef std::map<const char*, const char*> config_map;
/** /**
* A string left to the interpretation of the reader * A string left to the interpretation of the reader
* implementation; it corresponds to the value configured on * implementation; it corresponds to the value configured on
* the script-level for the logging filter. * the script-level for the logging filter.
*
* Structure takes ownership of the string.
*/ */
string source; const char* source;
/** /**
* A map of key/value pairs corresponding to the relevant * A map of key/value pairs corresponding to the relevant
@ -89,6 +95,35 @@ public:
* The opening mode for the input source. * The opening mode for the input source.
*/ */
ReaderMode mode; ReaderMode mode;
ReaderInfo()
{
source = 0;
mode = MODE_NONE;
}
ReaderInfo(const ReaderInfo& other)
{
source = other.source ? copy_string(other.source) : 0;
mode = other.mode;
for ( config_map::const_iterator i = other.config.begin(); i != other.config.end(); i++ )
config.insert(std::make_pair(copy_string(i->first), copy_string(i->second)));
}
~ReaderInfo()
{
delete [] source;
for ( config_map::iterator i = config.begin(); i != config.end(); i++ )
{
delete [] i->first;
delete [] i->second;
}
}
private:
const ReaderInfo& operator=(const ReaderInfo& other); // Disable.
}; };
/** /**
@ -106,7 +141,7 @@ public:
* *
* @return False if an error occured. * @return False if an error occured.
*/ */
bool Init(const ReaderInfo& info, int num_fields, const threading::Field* const* fields); bool Init(int num_fields, const threading::Field* const* fields);
/** /**
* Force trigger an update of the input stream. The action that will * Force trigger an update of the input stream. The action that will
@ -133,7 +168,7 @@ public:
/** /**
* Returns the additional reader information into the constructor. * Returns the additional reader information into the constructor.
*/ */
const ReaderInfo& Info() const { return info; } const ReaderInfo& Info() const { return *info; }
/** /**
* Returns the number of log fields as passed into the constructor. * Returns the number of log fields as passed into the constructor.
@ -209,7 +244,7 @@ protected:
* *
* @param vals the values to be given to the event * @param vals the values to be given to the event
*/ */
void SendEvent(const string& name, const int num_vals, threading::Value* *vals); void SendEvent(const char* name, const int num_vals, threading::Value* *vals);
// Content-sending-functions (simple mode). Include table-specific // Content-sending-functions (simple mode). Include table-specific
// functionality that simply is not used if we have no table. // functionality that simply is not used if we have no table.
@ -291,7 +326,7 @@ private:
// from this class, it's running in a different thread! // from this class, it's running in a different thread!
ReaderFrontend* frontend; ReaderFrontend* frontend;
ReaderInfo info; ReaderInfo* info;
unsigned int num_fields; unsigned int num_fields;
const threading::Field* const * fields; // raw mapping const threading::Field* const * fields; // raw mapping

View file

@ -11,18 +11,17 @@ namespace input {
class InitMessage : public threading::InputMessage<ReaderBackend> class InitMessage : public threading::InputMessage<ReaderBackend>
{ {
public: public:
InitMessage(ReaderBackend* backend, const ReaderBackend::ReaderInfo& info, InitMessage(ReaderBackend* backend,
const int num_fields, const threading::Field* const* fields) const int num_fields, const threading::Field* const* fields)
: threading::InputMessage<ReaderBackend>("Init", backend), : threading::InputMessage<ReaderBackend>("Init", backend),
info(info), num_fields(num_fields), fields(fields) { } num_fields(num_fields), fields(fields) { }
virtual bool Process() virtual bool Process()
{ {
return Object()->Init(info, num_fields, fields); return Object()->Init(num_fields, fields);
} }
private: private:
const ReaderBackend::ReaderInfo info;
const int num_fields; const int num_fields;
const threading::Field* const* fields; const threading::Field* const* fields;
}; };
@ -37,21 +36,26 @@ public:
virtual bool Process() { return Object()->Update(); } virtual bool Process() { return Object()->Update(); }
}; };
ReaderFrontend::ReaderFrontend(bro_int_t type) ReaderFrontend::ReaderFrontend(const ReaderBackend::ReaderInfo& arg_info, EnumVal* type)
{ {
disabled = initialized = false; disabled = initialized = false;
ty_name = "<not set>"; info = new ReaderBackend::ReaderInfo(arg_info);
backend = input_mgr->CreateBackend(this, type);
const char* t = type->Type()->AsEnumType()->Lookup(type->InternalInt());
name = copy_string(fmt("%s/%s", arg_info.source, t));
backend = input_mgr->CreateBackend(this, type->InternalInt());
assert(backend); assert(backend);
backend->Start(); backend->Start();
} }
ReaderFrontend::~ReaderFrontend() ReaderFrontend::~ReaderFrontend()
{ {
delete [] name;
delete info;
} }
void ReaderFrontend::Init(const ReaderBackend::ReaderInfo& arg_info, const int arg_num_fields, void ReaderFrontend::Init(const int arg_num_fields,
const threading::Field* const* arg_fields) const threading::Field* const* arg_fields)
{ {
if ( disabled ) if ( disabled )
@ -60,12 +64,11 @@ void ReaderFrontend::Init(const ReaderBackend::ReaderInfo& arg_info, const int a
if ( initialized ) if ( initialized )
reporter->InternalError("reader initialize twice"); reporter->InternalError("reader initialize twice");
info = arg_info;
num_fields = arg_num_fields; num_fields = arg_num_fields;
fields = arg_fields; fields = arg_fields;
initialized = true; initialized = true;
backend->SendIn(new InitMessage(backend, info, num_fields, fields)); backend->SendIn(new InitMessage(backend, num_fields, fields));
} }
void ReaderFrontend::Update() void ReaderFrontend::Update()
@ -82,12 +85,9 @@ void ReaderFrontend::Update()
backend->SendIn(new UpdateMessage(backend)); backend->SendIn(new UpdateMessage(backend));
} }
string ReaderFrontend::Name() const const char* ReaderFrontend::Name() const
{ {
if ( ! info.source.size() ) return name;
return ty_name;
return ty_name + "/" + info.source;
} }
} }

View file

@ -4,10 +4,11 @@
#define INPUT_READERFRONTEND_H #define INPUT_READERFRONTEND_H
#include "ReaderBackend.h" #include "ReaderBackend.h"
#include "threading/MsgThread.h" #include "threading/MsgThread.h"
#include "threading/SerialTypes.h" #include "threading/SerialTypes.h"
#include "Val.h"
namespace input { namespace input {
class Manager; class Manager;
@ -25,6 +26,8 @@ public:
/** /**
* Constructor. * Constructor.
* *
* info: The meta information struct for the writer.
*
* type: The backend writer type, with the value corresponding to the * type: The backend writer type, with the value corresponding to the
* script-level \c Input::Reader enum (e.g., \a READER_ASCII). The * script-level \c Input::Reader enum (e.g., \a READER_ASCII). The
* frontend will internally instantiate a ReaderBackend of the * frontend will internally instantiate a ReaderBackend of the
@ -32,7 +35,7 @@ public:
* *
* Frontends must only be instantiated by the main thread. * Frontends must only be instantiated by the main thread.
*/ */
ReaderFrontend(bro_int_t type); ReaderFrontend(const ReaderBackend::ReaderInfo& info, EnumVal* type);
/** /**
* Destructor. * Destructor.
@ -52,7 +55,7 @@ public:
* *
* This method must only be called from the main thread. * This method must only be called from the main thread.
*/ */
void Init(const ReaderBackend::ReaderInfo& info, const int arg_num_fields, const threading::Field* const* fields); void Init(const int arg_num_fields, const threading::Field* const* fields);
/** /**
* Force an update of the current input source. Actual action depends * Force an update of the current input source. Actual action depends
@ -100,12 +103,12 @@ public:
* *
* This method is safe to call from any thread. * This method is safe to call from any thread.
*/ */
string Name() const; const char* Name() const;
/** /**
* Returns the additional reader information passed into the constructor. * Returns the additional reader information passed into the constructor.
*/ */
const ReaderBackend::ReaderInfo& Info() const { return info; } const ReaderBackend::ReaderInfo& Info() const { assert(info); return *info; }
/** /**
* Returns the number of log fields as passed into the constructor. * Returns the number of log fields as passed into the constructor.
@ -120,24 +123,14 @@ public:
protected: protected:
friend class Manager; friend class Manager;
/**
* Returns the name of the backend's type.
*/
const string& TypeName() const { return ty_name; }
/**
* Sets the name of the backend's type.
*/
void SetTypeName(const string& name) { ty_name = name; }
private: private:
ReaderBackend* backend; // The backend we have instanatiated. ReaderBackend* backend; // The backend we have instanatiated.
ReaderBackend::ReaderInfo info; // Meta information as passed to Init(). ReaderBackend::ReaderInfo* info; // Meta information.
const threading::Field* const* fields; // The input fields. const threading::Field* const* fields; // The input fields.
int num_fields; // Information as passed to Init(). int num_fields; // Information as passed to Init().
string ty_name; // Backend type, set by manager.
bool disabled; // True if disabled. bool disabled; // True if disabled.
bool initialized; // True if initialized. bool initialized; // True if initialized.
const char* name; // Descriptive name.
}; };
} }

View file

@ -87,10 +87,10 @@ bool Ascii::DoInit(const ReaderInfo& info, int num_fields, const Field* const* f
{ {
mtime = 0; mtime = 0;
file = new ifstream(info.source.c_str()); file = new ifstream(info.source);
if ( ! file->is_open() ) if ( ! file->is_open() )
{ {
Error(Fmt("Init: cannot open %s", info.source.c_str())); Error(Fmt("Init: cannot open %s", info.source));
delete(file); delete(file);
file = 0; file = 0;
return false; return false;
@ -98,7 +98,7 @@ bool Ascii::DoInit(const ReaderInfo& info, int num_fields, const Field* const* f
if ( ReadHeader(false) == false ) if ( ReadHeader(false) == false )
{ {
Error(Fmt("Init: cannot open %s; headers are incorrect", info.source.c_str())); Error(Fmt("Init: cannot open %s; headers are incorrect", info.source));
file->close(); file->close();
delete(file); delete(file);
file = 0; file = 0;
@ -164,20 +164,20 @@ bool Ascii::ReadHeader(bool useCached)
} }
Error(Fmt("Did not find requested field %s in input data file %s.", Error(Fmt("Did not find requested field %s in input data file %s.",
field->name.c_str(), Info().source.c_str())); field->name, Info().source));
return false; return false;
} }
FieldMapping f(field->name, field->type, field->subtype, ifields[field->name]); FieldMapping f(field->name, field->type, field->subtype, ifields[field->name]);
if ( field->secondary_name != "" ) if ( field->secondary_name && strlen(field->secondary_name) != 0 )
{ {
map<string, uint32_t>::iterator fit2 = ifields.find(field->secondary_name); map<string, uint32_t>::iterator fit2 = ifields.find(field->secondary_name);
if ( fit2 == ifields.end() ) if ( fit2 == ifields.end() )
{ {
Error(Fmt("Could not find requested port type field %s in input data file.", Error(Fmt("Could not find requested port type field %s in input data file.",
field->secondary_name.c_str())); field->secondary_name));
return false; return false;
} }
@ -220,7 +220,8 @@ Value* Ascii::EntryToVal(string s, FieldMapping field)
switch ( field.type ) { switch ( field.type ) {
case TYPE_ENUM: case TYPE_ENUM:
case TYPE_STRING: case TYPE_STRING:
val->val.string_val = new string(s); val->val.string_val.length = s.size();
val->val.string_val.data = copy_string(s.c_str());
break; break;
case TYPE_BOOL: case TYPE_BOOL:
@ -367,9 +368,9 @@ bool Ascii::DoUpdate()
{ {
// check if the file has changed // check if the file has changed
struct stat sb; struct stat sb;
if ( stat(Info().source.c_str(), &sb) == -1 ) if ( stat(Info().source, &sb) == -1 )
{ {
Error(Fmt("Could not get stat for %s", Info().source.c_str())); Error(Fmt("Could not get stat for %s", Info().source));
return false; return false;
} }
@ -403,10 +404,10 @@ bool Ascii::DoUpdate()
file = 0; file = 0;
} }
file = new ifstream(Info().source.c_str()); file = new ifstream(Info().source);
if ( ! file->is_open() ) if ( ! file->is_open() )
{ {
Error(Fmt("cannot open %s", Info().source.c_str())); Error(Fmt("cannot open %s", Info().source));
return false; return false;
} }

View file

@ -38,7 +38,7 @@ void Benchmark::DoClose()
bool Benchmark::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fields) bool Benchmark::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fields)
{ {
num_lines = atoi(info.source.c_str()); num_lines = atoi(info.source);
if ( autospread != 0.0 ) if ( autospread != 0.0 )
autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) ); autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) );
@ -126,8 +126,12 @@ threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype)
assert(false); // no enums, please. assert(false); // no enums, please.
case TYPE_STRING: case TYPE_STRING:
val->val.string_val = new string(RandomString(10)); {
string rnd = RandomString(10);
val->val.string_val.data = copy_string(rnd.c_str());
val->val.string_val.length = rnd.size();
break; break;
}
case TYPE_BOOL: case TYPE_BOOL:
val->val.int_val = 1; // we never lie. val->val.int_val = 1; // we never lie.

View file

@ -108,7 +108,7 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie
firstrun = true; firstrun = true;
bool result; bool result;
if ( info.source.length() == 0 ) if ( ! info.source || strlen(info.source) == 0 )
{ {
Error("No source path provided"); Error("No source path provided");
return false; return false;
@ -129,11 +129,12 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie
} }
// do Initialization // do Initialization
char last = info.source[info.source.length()-1]; string source = string(info.source);
char last = info.source[source.length() - 1];
if ( last == '|' ) if ( last == '|' )
{ {
execute = true; execute = true;
fname = info.source.substr(0, fname.length() - 1); fname = source.substr(0, fname.length() - 1);
if ( (info.mode != MODE_MANUAL) ) if ( (info.mode != MODE_MANUAL) )
{ {
@ -237,7 +238,8 @@ bool Raw::DoUpdate()
// filter has exactly one text field. convert to it. // filter has exactly one text field. convert to it.
Value* val = new Value(TYPE_STRING, true); Value* val = new Value(TYPE_STRING, true);
val->val.string_val = new string(line); val->val.string_val.data = copy_string(line.c_str());
val->val.string_val.length = line.size();
fields[0] = val; fields[0] = val;
Put(fields); Put(fields);

View file

@ -6,6 +6,7 @@
#include "../EventHandler.h" #include "../EventHandler.h"
#include "../NetVar.h" #include "../NetVar.h"
#include "../Net.h" #include "../Net.h"
#include "../Type.h"
#include "threading/Manager.h" #include "threading/Manager.h"
#include "threading/SerialTypes.h" #include "threading/SerialTypes.h"
@ -75,7 +76,7 @@ struct Manager::WriterInfo {
double interval; double interval;
Func* postprocessor; Func* postprocessor;
WriterFrontend* writer; WriterFrontend* writer;
WriterBackend::WriterInfo info; WriterBackend::WriterInfo* info;
}; };
struct Manager::Stream { struct Manager::Stream {
@ -118,6 +119,7 @@ Manager::Stream::~Stream()
Unref(winfo->type); Unref(winfo->type);
delete winfo->writer; delete winfo->writer;
delete winfo->info;
delete winfo; delete winfo;
} }
@ -193,7 +195,6 @@ WriterBackend* Manager::CreateBackend(WriterFrontend* frontend, bro_int_t type)
assert(ld->factory); assert(ld->factory);
frontend->ty_name = ld->name;
WriterBackend* backend = (*ld->factory)(frontend); WriterBackend* backend = (*ld->factory)(frontend);
assert(backend); assert(backend);
@ -476,18 +477,17 @@ bool Manager::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt,
return false; return false;
} }
threading::Field* field = new threading::Field(); TypeTag st = TYPE_VOID;
field->name = new_path;
field->type = t->Tag();
field->optional = rt->FieldDecl(i)->FindAttr(ATTR_OPTIONAL);
if ( field->type == TYPE_TABLE ) if ( t->Tag() == TYPE_TABLE )
field->subtype = t->AsSetType()->Indices()->PureType()->Tag(); st = t->AsSetType()->Indices()->PureType()->Tag();
else if ( field->type == TYPE_VECTOR ) else if ( t->Tag() == TYPE_VECTOR )
field->subtype = t->AsVectorType()->YieldType()->Tag(); st = t->AsVectorType()->YieldType()->Tag();
filter->fields[filter->num_fields - 1] = field; bool optional = rt->FieldDecl(i)->FindAttr(ATTR_OPTIONAL);
filter->fields[filter->num_fields - 1] = new threading::Field(new_path.c_str(), 0, t->Tag(), st, optional);
} }
return true; return true;
@ -594,7 +594,7 @@ bool Manager::AddFilter(EnumVal* id, RecordVal* fval)
{ {
threading::Field* field = filter->fields[i]; threading::Field* field = filter->fields[i];
DBG_LOG(DBG_LOGGING, " field %10s: %s", DBG_LOG(DBG_LOGGING, " field %10s: %s",
field->name.c_str(), type_name(field->type)); field->name, type_name(field->type));
} }
#endif #endif
@ -769,9 +769,9 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
for ( int j = 0; j < filter->num_fields; ++j ) for ( int j = 0; j < filter->num_fields; ++j )
arg_fields[j] = new threading::Field(*filter->fields[j]); arg_fields[j] = new threading::Field(*filter->fields[j]);
WriterBackend::WriterInfo info; WriterBackend::WriterInfo* info = new WriterBackend::WriterInfo;
info.path = path; info->path = copy_string(path.c_str());
info.network_time = network_time; info->network_time = network_time;
HashKey* k; HashKey* k;
IterCookie* c = filter->config->AsTable()->InitForIteration(); IterCookie* c = filter->config->AsTable()->InitForIteration();
@ -782,7 +782,7 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
ListVal* index = filter->config->RecoverIndex(k); ListVal* index = filter->config->RecoverIndex(k);
string key = index->Index(0)->AsString()->CheckString(); string key = index->Index(0)->AsString()->CheckString();
string value = v->Value()->AsString()->CheckString(); string value = v->Value()->AsString()->CheckString();
info.config.insert(std::make_pair(key, value)); info->config.insert(std::make_pair(copy_string(key.c_str()), copy_string(value.c_str())));
Unref(index); Unref(index);
delete k; delete k;
} }
@ -844,11 +844,16 @@ threading::Value* Manager::ValToLogVal(Val* val, BroType* ty)
val->Type()->AsEnumType()->Lookup(val->InternalInt()); val->Type()->AsEnumType()->Lookup(val->InternalInt());
if ( s ) if ( s )
lval->val.string_val = new string(s); {
lval->val.string_val.data = copy_string(s);
lval->val.string_val.length = strlen(s);
}
else else
{ {
val->Type()->Error("enum type does not contain value", val); val->Type()->Error("enum type does not contain value", val);
lval->val.string_val = new string(); lval->val.string_val.data = copy_string("");
lval->val.string_val.length = 0;
} }
break; break;
} }
@ -880,15 +885,20 @@ threading::Value* Manager::ValToLogVal(Val* val, BroType* ty)
case TYPE_STRING: case TYPE_STRING:
{ {
const BroString* s = val->AsString(); const BroString* s = val->AsString();
lval->val.string_val = char* buf = new char[s->Len()];
new string((const char*) s->Bytes(), s->Len()); memcpy(buf, s->Bytes(), s->Len());
lval->val.string_val.data = buf;
lval->val.string_val.length = s->Len();
break; break;
} }
case TYPE_FILE: case TYPE_FILE:
{ {
const BroFile* f = val->AsFile(); const BroFile* f = val->AsFile();
lval->val.string_val = new string(f->Name()); string s = f->Name();
lval->val.string_val.data = copy_string(s.c_str());
lval->val.string_val.length = s.size();
break; break;
} }
@ -897,7 +907,9 @@ threading::Value* Manager::ValToLogVal(Val* val, BroType* ty)
ODesc d; ODesc d;
const Func* f = val->AsFunc(); const Func* f = val->AsFunc();
f->Describe(&d); f->Describe(&d);
lval->val.string_val = new string(d.Description()); const char* s = d.Description();
lval->val.string_val.data = copy_string(s);
lval->val.string_val.length = strlen(s);
break; break;
} }
@ -977,7 +989,7 @@ threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
return vals; return vals;
} }
WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, const WriterBackend::WriterInfo& info, WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info,
int num_fields, const threading::Field* const* fields, bool local, bool remote) int num_fields, const threading::Field* const* fields, bool local, bool remote)
{ {
Stream* stream = FindStream(id); Stream* stream = FindStream(id);
@ -987,7 +999,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, const Writer
return 0; return 0;
Stream::WriterMap::iterator w = Stream::WriterMap::iterator w =
stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), info.path)); stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), info->path));
if ( w != stream->writers.end() ) if ( w != stream->writers.end() )
// If we already have a writer for this. That's fine, we just // If we already have a writer for this. That's fine, we just
@ -1013,7 +1025,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, const Writer
{ {
Filter* f = *it; Filter* f = *it;
if ( f->writer->AsEnum() == writer->AsEnum() && if ( f->writer->AsEnum() == writer->AsEnum() &&
f->path == info.path ) f->path == info->path )
{ {
found_filter_match = true; found_filter_match = true;
winfo->interval = f->interval; winfo->interval = f->interval;
@ -1030,7 +1042,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, const Writer
} }
stream->writers.insert( stream->writers.insert(
Stream::WriterMap::value_type(Stream::WriterPathPair(writer->AsEnum(), info.path), Stream::WriterMap::value_type(Stream::WriterPathPair(writer->AsEnum(), info->path),
winfo)); winfo));
// Still need to set the WriterInfo's rotation parameters, which we // Still need to set the WriterInfo's rotation parameters, which we
@ -1038,11 +1050,11 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, const Writer
const char* base_time = log_rotate_base_time ? const char* base_time = log_rotate_base_time ?
log_rotate_base_time->AsString()->CheckString() : 0; log_rotate_base_time->AsString()->CheckString() : 0;
winfo->info.rotation_interval = winfo->interval; winfo->info->rotation_interval = winfo->interval;
winfo->info.rotation_base = parse_rotate_base_time(base_time); winfo->info->rotation_base = parse_rotate_base_time(base_time);
winfo->writer = new WriterFrontend(id, writer, local, remote); winfo->writer = new WriterFrontend(*winfo->info, id, writer, local, remote);
winfo->writer->Init(winfo->info, num_fields, fields); winfo->writer->Init(num_fields, fields);
InstallRotationTimer(winfo); InstallRotationTimer(winfo);
@ -1124,7 +1136,7 @@ void Manager::SendAllWritersTo(RemoteSerializer::PeerID peer)
EnumVal writer_val(i->first.first, BifType::Enum::Log::Writer); EnumVal writer_val(i->first.first, BifType::Enum::Log::Writer);
remote_serializer->SendLogCreateWriter(peer, (*s)->id, remote_serializer->SendLogCreateWriter(peer, (*s)->id,
&writer_val, &writer_val,
i->second->info, *i->second->info,
writer->NumFields(), writer->NumFields(),
writer->Fields()); writer->Fields());
} }
@ -1260,14 +1272,14 @@ void Manager::InstallRotationTimer(WriterInfo* winfo)
timer_mgr->Add(winfo->rotation_timer); timer_mgr->Add(winfo->rotation_timer);
DBG_LOG(DBG_LOGGING, "Scheduled rotation timer for %s to %.6f", DBG_LOG(DBG_LOGGING, "Scheduled rotation timer for %s to %.6f",
winfo->writer->Name().c_str(), winfo->rotation_timer->Time()); winfo->writer->Name(), winfo->rotation_timer->Time());
} }
} }
void Manager::Rotate(WriterInfo* winfo) void Manager::Rotate(WriterInfo* winfo)
{ {
DBG_LOG(DBG_LOGGING, "Rotating %s at %.6f", DBG_LOG(DBG_LOGGING, "Rotating %s at %.6f",
winfo->writer->Name().c_str(), network_time); winfo->writer->Name(), network_time);
// Build a temporary path for the writer to move the file to. // Build a temporary path for the writer to move the file to.
struct tm tm; struct tm tm;
@ -1278,15 +1290,14 @@ void Manager::Rotate(WriterInfo* winfo)
localtime_r(&teatime, &tm); localtime_r(&teatime, &tm);
strftime(buf, sizeof(buf), date_fmt, &tm); strftime(buf, sizeof(buf), date_fmt, &tm);
string tmp = string(fmt("%s-%s", winfo->writer->Info().path.c_str(), buf));
// Trigger the rotation. // Trigger the rotation.
const char* tmp = fmt("%s-%s", winfo->writer->Info().path, buf);
winfo->writer->Rotate(tmp, winfo->open_time, network_time, terminating); winfo->writer->Rotate(tmp, winfo->open_time, network_time, terminating);
++rotations_pending; ++rotations_pending;
} }
bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string old_name, bool Manager::FinishedRotation(WriterFrontend* writer, const char* new_name, const char* old_name,
double open, double close, bool terminating) double open, double close, bool terminating)
{ {
--rotations_pending; --rotations_pending;
@ -1296,7 +1307,7 @@ bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string o
return true; return true;
DBG_LOG(DBG_LOGGING, "Finished rotating %s at %.6f, new name %s", DBG_LOG(DBG_LOGGING, "Finished rotating %s at %.6f, new name %s",
writer->Name().c_str(), network_time, new_name.c_str()); writer->Name(), network_time, new_name);
WriterInfo* winfo = FindWriter(writer); WriterInfo* winfo = FindWriter(writer);
if ( ! winfo ) if ( ! winfo )
@ -1305,8 +1316,8 @@ bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string o
// Create the RotationInfo record. // Create the RotationInfo record.
RecordVal* info = new RecordVal(BifType::Record::Log::RotationInfo); RecordVal* info = new RecordVal(BifType::Record::Log::RotationInfo);
info->Assign(0, winfo->type->Ref()); info->Assign(0, winfo->type->Ref());
info->Assign(1, new StringVal(new_name.c_str())); info->Assign(1, new StringVal(new_name));
info->Assign(2, new StringVal(winfo->writer->Info().path.c_str())); info->Assign(2, new StringVal(winfo->writer->Info().path));
info->Assign(3, new Val(open, TYPE_TIME)); info->Assign(3, new Val(open, TYPE_TIME));
info->Assign(4, new Val(close, TYPE_TIME)); info->Assign(4, new Val(close, TYPE_TIME));
info->Assign(5, new Val(terminating, TYPE_BOOL)); info->Assign(5, new Val(terminating, TYPE_BOOL));

View file

@ -162,8 +162,8 @@ protected:
//// Function also used by the RemoteSerializer. //// Function also used by the RemoteSerializer.
// Takes ownership of fields. // Takes ownership of fields and info.
WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, const WriterBackend::WriterInfo& info, WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info,
int num_fields, const threading::Field* const* fields, int num_fields, const threading::Field* const* fields,
bool local, bool remote); bool local, bool remote);
@ -175,7 +175,7 @@ protected:
void SendAllWritersTo(RemoteSerializer::PeerID peer); void SendAllWritersTo(RemoteSerializer::PeerID peer);
// Signals that a file has been rotated. // Signals that a file has been rotated.
bool FinishedRotation(WriterFrontend* writer, string new_name, string old_name, bool FinishedRotation(WriterFrontend* writer, const char* new_name, const char* old_name,
double open, double close, bool terminating); double open, double close, bool terminating);
// Deletes the values as passed into Write(). // Deletes the values as passed into Write().

View file

@ -18,20 +18,26 @@ namespace logging {
class RotationFinishedMessage : public threading::OutputMessage<WriterFrontend> class RotationFinishedMessage : public threading::OutputMessage<WriterFrontend>
{ {
public: public:
RotationFinishedMessage(WriterFrontend* writer, string new_name, string old_name, RotationFinishedMessage(WriterFrontend* writer, const char* new_name, const char* old_name,
double open, double close, bool terminating) double open, double close, bool terminating)
: threading::OutputMessage<WriterFrontend>("RotationFinished", writer), : threading::OutputMessage<WriterFrontend>("RotationFinished", writer),
new_name(new_name), old_name(old_name), open(open), new_name(copy_string(new_name)), old_name(copy_string(old_name)), open(open),
close(close), terminating(terminating) { } close(close), terminating(terminating) { }
virtual ~RotationFinishedMessage()
{
delete [] new_name;
delete [] old_name;
}
virtual bool Process() virtual bool Process()
{ {
return log_mgr->FinishedRotation(Object(), new_name, old_name, open, close, terminating); return log_mgr->FinishedRotation(Object(), new_name, old_name, open, close, terminating);
} }
private: private:
string new_name; const char* new_name;
string old_name; const char* old_name;
double open; double open;
double close; double close;
bool terminating; bool terminating;
@ -65,12 +71,16 @@ bool WriterBackend::WriterInfo::Read(SerializationFormat* fmt)
{ {
int size; int size;
if ( ! (fmt->Read(&path, "path") && string tmp_path;
if ( ! (fmt->Read(&tmp_path, "path") &&
fmt->Read(&rotation_base, "rotation_base") && fmt->Read(&rotation_base, "rotation_base") &&
fmt->Read(&rotation_interval, "rotation_interval") && fmt->Read(&rotation_interval, "rotation_interval") &&
fmt->Read(&size, "config_size")) ) fmt->Read(&size, "config_size")) )
return false; return false;
path = copy_string(tmp_path.c_str());
config.clear(); config.clear();
while ( size ) while ( size )
@ -81,7 +91,7 @@ bool WriterBackend::WriterInfo::Read(SerializationFormat* fmt)
if ( ! (fmt->Read(&value, "config-value") && fmt->Read(&value, "config-key")) ) if ( ! (fmt->Read(&value, "config-value") && fmt->Read(&value, "config-key")) )
return false; return false;
config.insert(std::make_pair(value, key)); config.insert(std::make_pair(copy_string(value.c_str()), copy_string(key.c_str())));
} }
return true; return true;
@ -113,8 +123,7 @@ WriterBackend::WriterBackend(WriterFrontend* arg_frontend) : MsgThread()
fields = 0; fields = 0;
buffering = true; buffering = true;
frontend = arg_frontend; frontend = arg_frontend;
info = new WriterInfo(frontend->Info());
info.path = "<path not yet set>";
SetName(frontend->Name()); SetName(frontend->Name());
} }
@ -128,6 +137,8 @@ WriterBackend::~WriterBackend()
delete [] fields; delete [] fields;
} }
delete info;
} }
void WriterBackend::DeleteVals(int num_writes, Value*** vals) void WriterBackend::DeleteVals(int num_writes, Value*** vals)
@ -144,7 +155,7 @@ void WriterBackend::DeleteVals(int num_writes, Value*** vals)
delete [] vals; delete [] vals;
} }
bool WriterBackend::FinishedRotation(string new_name, string old_name, bool WriterBackend::FinishedRotation(const char* new_name, const char* old_name,
double open, double close, bool terminating) double open, double close, bool terminating)
{ {
SendOut(new RotationFinishedMessage(frontend, new_name, old_name, open, close, terminating)); SendOut(new RotationFinishedMessage(frontend, new_name, old_name, open, close, terminating));
@ -156,15 +167,12 @@ void WriterBackend::DisableFrontend()
SendOut(new DisableMessage(frontend)); SendOut(new DisableMessage(frontend));
} }
bool WriterBackend::Init(const WriterInfo& arg_info, int arg_num_fields, const Field* const* arg_fields, const string& frontend_name) bool WriterBackend::Init(int arg_num_fields, const Field* const* arg_fields)
{ {
info = arg_info;
num_fields = arg_num_fields; num_fields = arg_num_fields;
fields = arg_fields; fields = arg_fields;
SetName(frontend->Name()); if ( ! DoInit(*info, arg_num_fields, arg_fields) )
if ( ! DoInit(arg_info, arg_num_fields, arg_fields) )
{ {
DisableFrontend(); DisableFrontend();
return false; return false;
@ -246,7 +254,7 @@ bool WriterBackend::SetBuf(bool enabled)
return true; return true;
} }
bool WriterBackend::Rotate(string rotated_path, double open, bool WriterBackend::Rotate(const char* rotated_path, double open,
double close, bool terminating) double close, bool terminating)
{ {
if ( ! DoRotate(rotated_path, open, close, terminating) ) if ( ! DoRotate(rotated_path, open, close, terminating) )

View file

@ -48,14 +48,17 @@ public:
*/ */
struct WriterInfo struct WriterInfo
{ {
typedef std::map<string, string> config_map; // Structure takes ownership of these strings.
typedef std::map<const char*, const char*> config_map;
/** /**
* A string left to the interpretation of the writer * A string left to the interpretation of the writer
* implementation; it corresponds to the 'path' value configured * implementation; it corresponds to the 'path' value configured
* on the script-level for the logging filter. * on the script-level for the logging filter.
*
* Structure takes ownership of string.
*/ */
string path; const char* path;
/** /**
* The rotation interval as configured for this writer. * The rotation interval as configured for this writer.
@ -76,9 +79,38 @@ public:
* A map of key/value pairs corresponding to the relevant * A map of key/value pairs corresponding to the relevant
* filter's "config" table. * filter's "config" table.
*/ */
std::map<string, string> config; config_map config;
WriterInfo()
{
path = 0;
}
WriterInfo(const WriterInfo& other)
{
path = other.path ? copy_string(other.path) : 0;
rotation_interval = other.rotation_interval;
rotation_base = other.rotation_base;
network_time = other.network_time;
for ( config_map::const_iterator i = other.config.begin(); i != other.config.end(); i++ )
config.insert(std::make_pair(copy_string(i->first), copy_string(i->second)));
}
~WriterInfo()
{
delete [] path;
for ( config_map::iterator i = config.begin(); i != config.end(); i++ )
{
delete [] i->first;
delete [] i->second;
}
}
private: private:
const WriterInfo& operator=(const WriterInfo& other); // Disable.
friend class ::RemoteSerializer; friend class ::RemoteSerializer;
// Note, these need to be adapted when changing the struct's // Note, these need to be adapted when changing the struct's
@ -90,7 +122,6 @@ public:
/** /**
* One-time initialization of the writer to define the logged fields. * One-time initialization of the writer to define the logged fields.
* *
* @param info Meta information for the writer.
* @param num_fields * @param num_fields
* *
* @param fields An array of size \a num_fields with the log fields. * @param fields An array of size \a num_fields with the log fields.
@ -100,7 +131,7 @@ public:
* *
* @return False if an error occured. * @return False if an error occured.
*/ */
bool Init(const WriterInfo& info, int num_fields, const threading::Field* const* fields, const string& frontend_name); bool Init(int num_fields, const threading::Field* const* fields);
/** /**
* Writes one log entry. * Writes one log entry.
@ -146,7 +177,7 @@ public:
* *
* @return False if an error occured. * @return False if an error occured.
*/ */
bool Rotate(string rotated_path, double open, double close, bool terminating); bool Rotate(const char* rotated_path, double open, double close, bool terminating);
/** /**
* Disables the frontend that has instantiated this backend. Once * Disables the frontend that has instantiated this backend. Once
@ -157,7 +188,7 @@ public:
/** /**
* Returns the additional writer information passed into the constructor. * Returns the additional writer information passed into the constructor.
*/ */
const WriterInfo& Info() const { return info; } const WriterInfo& Info() const { return *info; }
/** /**
* Returns the number of log fields as passed into the constructor. * Returns the number of log fields as passed into the constructor.
@ -193,7 +224,7 @@ public:
* @param terminating: True if the original rotation request occured * @param terminating: True if the original rotation request occured
* due to the main Bro process shutting down. * due to the main Bro process shutting down.
*/ */
bool FinishedRotation(string new_name, string old_name, bool FinishedRotation(const char* new_name, const char* old_name,
double open, double close, bool terminating); double open, double close, bool terminating);
/** Helper method to render an IP address as a string. /** Helper method to render an IP address as a string.
@ -322,7 +353,7 @@ protected:
* due the main Bro prcoess terminating (and not because we've * due the main Bro prcoess terminating (and not because we've
* reached a regularly scheduled time for rotation). * reached a regularly scheduled time for rotation).
*/ */
virtual bool DoRotate(string rotated_path, double open, double close, virtual bool DoRotate(const char* rotated_path, double open, double close,
bool terminating) = 0; bool terminating) = 0;
/** /**
@ -351,7 +382,7 @@ private:
// this class, it's running in a different thread! // this class, it's running in a different thread!
WriterFrontend* frontend; WriterFrontend* frontend;
WriterInfo info; // Meta information as passed to Init(). const WriterInfo* info; // Meta information.
int num_fields; // Number of log fields. int num_fields; // Number of log fields.
const threading::Field* const* fields; // Log fields. const threading::Field* const* fields; // Log fields.
bool buffering; // True if buffering is enabled. bool buffering; // True if buffering is enabled.

View file

@ -16,35 +16,36 @@ namespace logging {
class InitMessage : public threading::InputMessage<WriterBackend> class InitMessage : public threading::InputMessage<WriterBackend>
{ {
public: public:
InitMessage(WriterBackend* backend, const WriterBackend::WriterInfo& info, const int num_fields, const Field* const* fields, const string& frontend_name) InitMessage(WriterBackend* backend, const int num_fields, const Field* const* fields)
: threading::InputMessage<WriterBackend>("Init", backend), : threading::InputMessage<WriterBackend>("Init", backend),
info(info), num_fields(num_fields), fields(fields), num_fields(num_fields), fields(fields)
frontend_name(frontend_name) { } {}
virtual bool Process() { return Object()->Init(info, num_fields, fields, frontend_name); }
virtual bool Process() { return Object()->Init(num_fields, fields); }
private: private:
WriterBackend::WriterInfo info;
const int num_fields; const int num_fields;
const Field * const* fields; const Field * const* fields;
const string frontend_name;
}; };
class RotateMessage : public threading::InputMessage<WriterBackend> class RotateMessage : public threading::InputMessage<WriterBackend>
{ {
public: public:
RotateMessage(WriterBackend* backend, WriterFrontend* frontend, const string rotated_path, const double open, RotateMessage(WriterBackend* backend, WriterFrontend* frontend, const char* rotated_path, const double open,
const double close, const bool terminating) const double close, const bool terminating)
: threading::InputMessage<WriterBackend>("Rotate", backend), : threading::InputMessage<WriterBackend>("Rotate", backend),
frontend(frontend), frontend(frontend),
rotated_path(rotated_path), open(open), rotated_path(copy_string(rotated_path)), open(open),
close(close), terminating(terminating) { } close(close), terminating(terminating) { }
virtual ~RotateMessage() { delete [] rotated_path; }
virtual bool Process() { return Object()->Rotate(rotated_path, open, close, terminating); } virtual bool Process() { return Object()->Rotate(rotated_path, open, close, terminating); }
private: private:
WriterFrontend* frontend; WriterFrontend* frontend;
const string rotated_path; const char* rotated_path;
const double open; const double open;
const double close; const double close;
const bool terminating; const bool terminating;
@ -96,7 +97,7 @@ private:
using namespace logging; using namespace logging;
WriterFrontend::WriterFrontend(EnumVal* arg_stream, EnumVal* arg_writer, bool arg_local, bool arg_remote) WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVal* arg_stream, EnumVal* arg_writer, bool arg_local, bool arg_remote)
{ {
stream = arg_stream; stream = arg_stream;
writer = arg_writer; writer = arg_writer;
@ -109,7 +110,10 @@ WriterFrontend::WriterFrontend(EnumVal* arg_stream, EnumVal* arg_writer, bool ar
remote = arg_remote; remote = arg_remote;
write_buffer = 0; write_buffer = 0;
write_buffer_pos = 0; write_buffer_pos = 0;
ty_name = "<not set>"; info = new WriterBackend::WriterInfo(arg_info);
const char* w = arg_writer->Type()->AsEnumType()->Lookup(arg_stream->InternalInt());
name = copy_string(fmt("%s/%s", arg_info.path, w));
if ( local ) if ( local )
{ {
@ -127,14 +131,7 @@ WriterFrontend::~WriterFrontend()
{ {
Unref(stream); Unref(stream);
Unref(writer); Unref(writer);
} delete info;
string WriterFrontend::Name() const
{
if ( ! info.path.size() )
return ty_name;
return ty_name + "/" + info.path;
} }
void WriterFrontend::Stop() void WriterFrontend::Stop()
@ -143,7 +140,7 @@ void WriterFrontend::Stop()
SetDisable(); SetDisable();
} }
void WriterFrontend::Init(const WriterBackend::WriterInfo& arg_info, int arg_num_fields, const Field* const * arg_fields) void WriterFrontend::Init(int arg_num_fields, const Field* const * arg_fields)
{ {
if ( disabled ) if ( disabled )
return; return;
@ -151,19 +148,18 @@ void WriterFrontend::Init(const WriterBackend::WriterInfo& arg_info, int arg_num
if ( initialized ) if ( initialized )
reporter->InternalError("writer initialize twice"); reporter->InternalError("writer initialize twice");
info = arg_info;
num_fields = arg_num_fields; num_fields = arg_num_fields;
fields = arg_fields; fields = arg_fields;
initialized = true; initialized = true;
if ( backend ) if ( backend )
backend->SendIn(new InitMessage(backend, arg_info, arg_num_fields, arg_fields, Name())); backend->SendIn(new InitMessage(backend, arg_num_fields, arg_fields));
if ( remote ) if ( remote )
remote_serializer->SendLogCreateWriter(stream, remote_serializer->SendLogCreateWriter(stream,
writer, writer,
arg_info, *info,
arg_num_fields, arg_num_fields,
arg_fields); arg_fields);
@ -177,7 +173,7 @@ void WriterFrontend::Write(int num_fields, Value** vals)
if ( remote ) if ( remote )
remote_serializer->SendLogWrite(stream, remote_serializer->SendLogWrite(stream,
writer, writer,
info.path, info->path,
num_fields, num_fields,
vals); vals);
@ -242,7 +238,7 @@ void WriterFrontend::Flush(double network_time)
backend->SendIn(new FlushMessage(backend, network_time)); backend->SendIn(new FlushMessage(backend, network_time));
} }
void WriterFrontend::Rotate(string rotated_path, double open, double close, bool terminating) void WriterFrontend::Rotate(const char* rotated_path, double open, double close, bool terminating)
{ {
if ( disabled ) if ( disabled )
return; return;

View file

@ -32,6 +32,10 @@ public:
* frontend will internally instantiate a WriterBackend of the * frontend will internally instantiate a WriterBackend of the
* corresponding type. * corresponding type.
* *
* info: The meta information struct for the writer.
*
* writer_name: A descriptive name for the writer's type.
*
* local: If true, the writer will instantiate a local backend. * local: If true, the writer will instantiate a local backend.
* *
* remote: If true, the writer will forward all data to remote * remote: If true, the writer will forward all data to remote
@ -39,7 +43,7 @@ public:
* *
* Frontends must only be instantiated by the main thread. * Frontends must only be instantiated by the main thread.
*/ */
WriterFrontend(EnumVal* stream, EnumVal* writer, bool local, bool remote); WriterFrontend(const WriterBackend::WriterInfo& info, EnumVal* stream, EnumVal* writer, bool local, bool remote);
/** /**
* Destructor. * Destructor.
@ -68,7 +72,7 @@ public:
* *
* This method must only be called from the main thread. * This method must only be called from the main thread.
*/ */
void Init(const WriterBackend::WriterInfo& info, int num_fields, const threading::Field* const* fields); void Init(int num_fields, const threading::Field* const* fields);
/** /**
* Write out a record. * Write out a record.
@ -130,7 +134,7 @@ public:
* *
* This method must only be called from the main thread. * This method must only be called from the main thread.
*/ */
void Rotate(string rotated_path, double open, double close, bool terminating); void Rotate(const char* rotated_path, double open, double close, bool terminating);
/** /**
* Finalizes writing to this tream. * Finalizes writing to this tream.
@ -175,7 +179,7 @@ public:
/** /**
* Returns the additional writer information as passed into the constructor. * Returns the additional writer information as passed into the constructor.
*/ */
const WriterBackend::WriterInfo& Info() const { return info; } const WriterBackend::WriterInfo& Info() const { return *info; }
/** /**
* Returns the number of log fields as passed into the constructor. * Returns the number of log fields as passed into the constructor.
@ -188,7 +192,7 @@ public:
* *
* This method is safe to call from any thread. * This method is safe to call from any thread.
*/ */
string Name() const; const char* Name() const { return name; }
/** /**
* Returns the log fields as passed into the constructor. * Returns the log fields as passed into the constructor.
@ -210,8 +214,8 @@ protected:
bool local; // True if logging locally. bool local; // True if logging locally.
bool remote; // True if loggin remotely. bool remote; // True if loggin remotely.
string ty_name; // Name of the backend type. Set by the manager. const char* name; // Descriptive name of the
WriterBackend::WriterInfo info; // The writer information. WriterBackend::WriterInfo* info; // The writer information.
int num_fields; // The number of log fields. int num_fields; // The number of log fields.
const threading::Field* const* fields; // The log fields. const threading::Field* const* fields; // The log fields.

View file

@ -52,6 +52,8 @@ Ascii::Ascii(WriterFrontend* frontend) : WriterBackend(frontend)
Ascii::~Ascii() Ascii::~Ascii()
{ {
//fprintf(stderr, "DTOR %p\n", this);
// Normally, the file will be closed here already via the Finish() // Normally, the file will be closed here already via the Finish()
// message. But when we terminate abnormally, we may still have it open. // message. But when we terminate abnormally, we may still have it open.
if ( fd ) if ( fd )
@ -78,7 +80,10 @@ void Ascii::CloseFile(double t)
return; return;
if ( include_meta ) if ( include_meta )
WriteHeaderField("end", t ? Timestamp(t) : "<abnormal termination>"); {
string ts = t ? Timestamp(t) : string("<abnormal termination>");
WriteHeaderField("end", ts);
}
close(fd); close(fd);
fd = 0; fd = 0;
@ -118,6 +123,8 @@ bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const *
if ( ! safe_write(fd, str.c_str(), str.length()) ) if ( ! safe_write(fd, str.c_str(), str.length()) )
goto write_error; goto write_error;
string ts = Timestamp(info.network_time);
if ( ! (WriteHeaderField("set_separator", get_escaped_string( if ( ! (WriteHeaderField("set_separator", get_escaped_string(
string(set_separator, set_separator_len), false)) && string(set_separator, set_separator_len), false)) &&
WriteHeaderField("empty_field", get_escaped_string( WriteHeaderField("empty_field", get_escaped_string(
@ -125,7 +132,7 @@ bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const *
WriteHeaderField("unset_field", get_escaped_string( WriteHeaderField("unset_field", get_escaped_string(
string(unset_field, unset_field_len), false)) && string(unset_field, unset_field_len), false)) &&
WriteHeaderField("path", get_escaped_string(path, false)) && WriteHeaderField("path", get_escaped_string(path, false)) &&
WriteHeaderField("start", Timestamp(info.network_time))) ) WriteHeaderField("start", ts)) )
goto write_error; goto write_error;
for ( int i = 0; i < num_fields; ++i ) for ( int i = 0; i < num_fields; ++i )
@ -136,8 +143,8 @@ bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const *
types += string(separator, separator_len); types += string(separator, separator_len);
} }
names += fields[i]->name; names += string(fields[i]->name);
types += fields[i]->TypeName(); types += fields[i]->TypeName().c_str();
} }
if ( ! (WriteHeaderField("fields", names) if ( ! (WriteHeaderField("fields", names)
@ -229,8 +236,8 @@ bool Ascii::DoWriteOne(ODesc* desc, Value* val, const Field* field)
case TYPE_FILE: case TYPE_FILE:
case TYPE_FUNC: case TYPE_FUNC:
{ {
int size = val->val.string_val->size(); int size = val->val.string_val.length;
const char* data = val->val.string_val->data(); const char* data = val->val.string_val.data;
if ( ! size ) if ( ! size )
{ {
@ -311,8 +318,7 @@ bool Ascii::DoWriteOne(ODesc* desc, Value* val, const Field* field)
} }
default: default:
Error(Fmt("unsupported field format %d for %s", val->type, Error(Fmt("unsupported field format %d for %s", val->type, field->name));
field->name.c_str()));
return false; return false;
} }
@ -366,7 +372,7 @@ write_error:
return false; return false;
} }
bool Ascii::DoRotate(string rotated_path, double open, double close, bool terminating) bool Ascii::DoRotate(const char* rotated_path, double open, double close, bool terminating)
{ {
// Don't rotate special files or if there's not one currently open. // Don't rotate special files or if there's not one currently open.
if ( ! fd || IsSpecial(Info().path) ) if ( ! fd || IsSpecial(Info().path) )
@ -374,10 +380,10 @@ bool Ascii::DoRotate(string rotated_path, double open, double close, bool termin
CloseFile(close); CloseFile(close);
string nname = rotated_path + "." + LogExt(); string nname = string(rotated_path) + "." + LogExt();
rename(fname.c_str(), nname.c_str()); rename(fname.c_str(), nname.c_str());
if ( ! FinishedRotation(nname, fname, open, close, terminating) ) if ( ! FinishedRotation(nname.c_str(), fname.c_str(), open, close, terminating) )
{ {
Error(Fmt("error rotating %s to %s", fname.c_str(), nname.c_str())); Error(Fmt("error rotating %s to %s", fname.c_str(), nname.c_str()));
return false; return false;
@ -401,19 +407,22 @@ bool Ascii::DoHeartbeat(double network_time, double current_time)
string Ascii::LogExt() string Ascii::LogExt()
{ {
const char* ext = getenv("BRO_LOG_SUFFIX"); const char* ext = getenv("BRO_LOG_SUFFIX");
if ( ! ext ) ext = "log"; if ( ! ext )
ext = "log";
return ext; return ext;
} }
string Ascii::Timestamp(double t) string Ascii::Timestamp(double t)
{ {
struct tm tm;
char buf[128];
const char* const date_fmt = "%Y-%m-%d-%H-%M-%S";
time_t teatime = time_t(t); time_t teatime = time_t(t);
localtime_r(&teatime, &tm); struct tm tmbuf;
strftime(buf, sizeof(buf), date_fmt, &tm); struct tm* tm = localtime_r(&teatime, &tmbuf);
char buf[128];
const char* const date_fmt = "%Y-%m-%d-%H-%M-%S";
strftime(buf, sizeof(buf), date_fmt, tm);
return buf; return buf;
} }

View file

@ -24,7 +24,7 @@ protected:
virtual bool DoWrite(int num_fields, const threading::Field* const* fields, virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
threading::Value** vals); threading::Value** vals);
virtual bool DoSetBuf(bool enabled); virtual bool DoSetBuf(bool enabled);
virtual bool DoRotate(string rotated_path, double open, virtual bool DoRotate(const char* rotated_path, double open,
double close, bool terminating); double close, bool terminating);
virtual bool DoFlush(double network_time); virtual bool DoFlush(double network_time);
virtual bool DoFinish(double network_time); virtual bool DoFinish(double network_time);

View file

@ -78,10 +78,10 @@ std::string DataSeries::LogValueToString(threading::Value *val)
case TYPE_STRING: case TYPE_STRING:
case TYPE_FILE: case TYPE_FILE:
case TYPE_FUNC: case TYPE_FUNC:
if ( ! val->val.string_val->size() ) if ( ! val->val.string_val.length )
return ""; return "";
return string(val->val.string_val->data(), val->val.string_val->size()); return string(val->val.string_val.data, val->val.string_val.length);
case TYPE_TABLE: case TYPE_TABLE:
{ {
@ -302,7 +302,8 @@ bool DataSeries::DoInit(const WriterInfo& info, int num_fields, const threading:
if( ds_dump_schema ) if( ds_dump_schema )
{ {
FILE* pFile = fopen ( string(info.path + ".ds.xml").c_str() , "wb" ); string name = string(info.path) + ".ds.xml";
FILE* pFile = fopen(name.c_str(), "wb" );
if( pFile ) if( pFile )
{ {
@ -394,17 +395,17 @@ bool DataSeries::DoWrite(int num_fields, const threading::Field* const * fields,
return true; return true;
} }
bool DataSeries::DoRotate(string rotated_path, double open, double close, bool terminating) bool DataSeries::DoRotate(const char* rotated_path, double open, double close, bool terminating)
{ {
// Note that if DS files are rotated too often, the aggregate log // Note that if DS files are rotated too often, the aggregate log
// size will be (much) larger. // size will be (much) larger.
CloseLog(); CloseLog();
string dsname = Info().path + ".ds"; string dsname = string(Info().path) + ".ds";
string nname = rotated_path + ".ds"; string nname = string(rotated_path) + ".ds";
rename(dsname.c_str(), nname.c_str()); rename(dsname.c_str(), nname.c_str());
if ( ! FinishedRotation(nname, dsname, open, close, terminating) ) if ( ! FinishedRotation(nname.c_str(), dsname.c_str(), open, close, terminating) )
{ {
Error(Fmt("error rotating %s to %s", dsname.c_str(), nname.c_str())); Error(Fmt("error rotating %s to %s", dsname.c_str(), nname.c_str()));
return false; return false;

View file

@ -32,7 +32,7 @@ protected:
virtual bool DoWrite(int num_fields, const threading::Field* const* fields, virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
threading::Value** vals); threading::Value** vals);
virtual bool DoSetBuf(bool enabled); virtual bool DoSetBuf(bool enabled);
virtual bool DoRotate(string rotated_path, double open, virtual bool DoRotate(const char* rotated_path, double open,
double close, bool terminating); double close, bool terminating);
virtual bool DoFlush(double network_time); virtual bool DoFlush(double network_time);
virtual bool DoFinish(double network_time); virtual bool DoFinish(double network_time);

View file

@ -1,4 +1,6 @@
#include <algorithm>
#include "None.h" #include "None.h"
#include "NetVar.h" #include "NetVar.h"
@ -15,8 +17,17 @@ bool None::DoInit(const WriterInfo& info, int num_fields,
std::cout << " rotation_interval=" << info.rotation_interval << std::endl; std::cout << " rotation_interval=" << info.rotation_interval << std::endl;
std::cout << " rotation_base=" << info.rotation_base << std::endl; std::cout << " rotation_base=" << info.rotation_base << std::endl;
for ( std::map<string,string>::const_iterator i = info.config.begin(); i != info.config.end(); i++ ) // Output the config sorted by keys.
std::cout << " config[" << i->first << "] = " << i->second << std::endl;
std::vector<std::pair<string, string> > keys;
for ( WriterInfo::config_map::const_iterator i = info.config.begin(); i != info.config.end(); i++ )
keys.push_back(std::make_pair(i->first, i->second));
std::sort(keys.begin(), keys.end());
for ( std::vector<std::pair<string,string> >::const_iterator i = keys.begin(); i != keys.end(); i++ )
std::cout << " config[" << (*i).first << "] = " << (*i).second << std::endl;
for ( int i = 0; i < num_fields; i++ ) for ( int i = 0; i < num_fields; i++ )
{ {
@ -31,11 +42,11 @@ bool None::DoInit(const WriterInfo& info, int num_fields,
return true; return true;
} }
bool None::DoRotate(string rotated_path, double open, double close, bool terminating) bool None::DoRotate(const char* rotated_path, double open, double close, bool terminating)
{ {
if ( ! FinishedRotation(string("/dev/null"), Info().path, open, close, terminating)) if ( ! FinishedRotation("/dev/null", Info().path, open, close, terminating))
{ {
Error(Fmt("error rotating %s", Info().path.c_str())); Error(Fmt("error rotating %s", Info().path));
return false; return false;
} }

View file

@ -24,7 +24,7 @@ protected:
virtual bool DoWrite(int num_fields, const threading::Field* const* fields, virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
threading::Value** vals) { return true; } threading::Value** vals) { return true; }
virtual bool DoSetBuf(bool enabled) { return true; } virtual bool DoSetBuf(bool enabled) { return true; }
virtual bool DoRotate(string rotated_path, double open, virtual bool DoRotate(const char* rotated_path, double open,
double close, bool terminating); double close, bool terminating);
virtual bool DoFlush(double network_time) { return true; } virtual bool DoFlush(double network_time) { return true; }
virtual bool DoFinish(double network_time) { return true; } virtual bool DoFinish(double network_time) { return true; }

View file

@ -12,18 +12,23 @@
using namespace threading; using namespace threading;
static const int STD_FMT_BUF_LEN = 2048;
uint64_t BasicThread::thread_counter = 0; uint64_t BasicThread::thread_counter = 0;
BasicThread::BasicThread() BasicThread::BasicThread()
{ {
started = false; started = false;
terminating = false; terminating = false;
killed = false;
pthread = 0; pthread = 0;
buf_len = 2048; buf_len = STD_FMT_BUF_LEN;
buf = (char*) malloc(buf_len); buf = (char*) malloc(buf_len);
name = Fmt("thread-%d", ++thread_counter); strerr_buffer = 0;
name = copy_string(fmt("thread-%" PRIu64, ++thread_counter));
thread_mgr->AddThread(this); thread_mgr->AddThread(this);
} }
@ -32,31 +37,41 @@ BasicThread::~BasicThread()
{ {
if ( buf ) if ( buf )
free(buf); free(buf);
delete [] name;
delete [] strerr_buffer;
} }
void BasicThread::SetName(const string& arg_name) void BasicThread::SetName(const char* name)
{ {
// Slight race condition here with reader threads, but shouldn't matter. delete [] name;
name = arg_name; name = copy_string(name);
} }
void BasicThread::SetOSName(const string& name) void BasicThread::SetOSName(const char* name)
{ {
#ifdef HAVE_LINUX #ifdef HAVE_LINUX
prctl(PR_SET_NAME, name.c_str(), 0, 0, 0); prctl(PR_SET_NAME, name, 0, 0, 0);
#endif #endif
#ifdef __APPLE__ #ifdef __APPLE__
pthread_setname_np(name.c_str()); pthread_setname_np(name);
#endif #endif
#ifdef FREEBSD #ifdef FREEBSD
pthread_set_name_np(pthread_self(), name, name.c_str()); pthread_set_name_np(pthread_self(), name, name);
#endif #endif
} }
const char* BasicThread::Fmt(const char* format, ...) const char* BasicThread::Fmt(const char* format, ...)
{ {
if ( buf_len > 10 * STD_FMT_BUF_LEN )
{
// Shrink back to normal.
buf = (char*) safe_realloc(buf, STD_FMT_BUF_LEN);
buf_len = STD_FMT_BUF_LEN;
}
va_list al; va_list al;
va_start(al, format); va_start(al, format);
int n = safe_vsnprintf(buf, buf_len, format, al); int n = safe_vsnprintf(buf, buf_len, format, al);
@ -64,15 +79,13 @@ const char* BasicThread::Fmt(const char* format, ...)
if ( (unsigned int) n >= buf_len ) if ( (unsigned int) n >= buf_len )
{ // Not enough room, grow the buffer. { // Not enough room, grow the buffer.
int tmp_len = n + 32; buf_len = n + 32;
char* tmp = (char*) malloc(tmp_len); buf = (char*) safe_realloc(buf, buf_len);
// Is it portable to restart? // Is it portable to restart?
va_start(al, format); va_start(al, format);
n = safe_vsnprintf(tmp, tmp_len, format, al); n = safe_vsnprintf(buf, buf_len, format, al);
va_end(al); va_end(al);
free(tmp);
} }
return buf; return buf;
@ -94,14 +107,14 @@ void BasicThread::Start()
int err = pthread_create(&pthread, 0, BasicThread::launcher, this); int err = pthread_create(&pthread, 0, BasicThread::launcher, this);
if ( err != 0 ) if ( err != 0 )
reporter->FatalError("Cannot create thread %s:%s", name.c_str(), Strerror(err)); reporter->FatalError("Cannot create thread %s: %s", name, Strerror(err));
DBG_LOG(DBG_THREADING, "Started thread %s", name.c_str()); DBG_LOG(DBG_THREADING, "Started thread %s", name);
OnStart(); OnStart();
} }
void BasicThread::Stop() void BasicThread::PrepareStop()
{ {
if ( ! started ) if ( ! started )
return; return;
@ -109,11 +122,28 @@ void BasicThread::Stop()
if ( terminating ) if ( terminating )
return; return;
DBG_LOG(DBG_THREADING, "Signaling thread %s to terminate ...", name.c_str()); DBG_LOG(DBG_THREADING, "Preparing thread %s to terminate ...", name);
terminating = true; OnPrepareStop();
}
void BasicThread::Stop()
{
// XX fprintf(stderr, "stop1 %s %d %d\n", name, started, terminating);
if ( ! started )
return;
if ( terminating )
return;
// XX fprintf(stderr, "stop2 %s\n", name);
DBG_LOG(DBG_THREADING, "Signaling thread %s to terminate ...", name);
OnStop(); OnStop();
terminating = true;
} }
void BasicThread::Join() void BasicThread::Join()
@ -123,25 +153,33 @@ void BasicThread::Join()
assert(terminating); assert(terminating);
DBG_LOG(DBG_THREADING, "Joining thread %s ...", name.c_str()); DBG_LOG(DBG_THREADING, "Joining thread %s ...", name);
if ( pthread && pthread_join(pthread, 0) != 0 ) if ( pthread && pthread_join(pthread, 0) != 0 )
reporter->FatalError("Failure joining thread %s", name.c_str()); reporter->FatalError("Failure joining thread %s", name);
DBG_LOG(DBG_THREADING, "Done with thread %s", name.c_str()); DBG_LOG(DBG_THREADING, "Joined with thread %s", name);
pthread = 0; pthread = 0;
} }
void BasicThread::Kill() void BasicThread::Kill()
{ {
// We don't *really* kill the thread here because that leads to race
// conditions. Instead we set a flag that parts of the the code need
// to check and get out of any loops they might be in.
terminating = true; terminating = true;
killed = true;
OnKill();
}
if ( ! (started && pthread) ) void BasicThread::Done()
return; {
// XX fprintf(stderr, "DONE from thread %s\n", name);
DBG_LOG(DBG_THREADING, "Thread %s has finished", name);
pthread = 0; terminating = true;
pthread_kill(pthread, SIGTERM); killed = true;
} }
void* BasicThread::launcher(void *arg) void* BasicThread::launcher(void *arg)
@ -161,11 +199,12 @@ void* BasicThread::launcher(void *arg)
sigdelset(&mask_set, SIGSEGV); sigdelset(&mask_set, SIGSEGV);
sigdelset(&mask_set, SIGBUS); sigdelset(&mask_set, SIGBUS);
int res = pthread_sigmask(SIG_BLOCK, &mask_set, 0); int res = pthread_sigmask(SIG_BLOCK, &mask_set, 0);
assert(res == 0); // assert(res == 0);
// Run thread's main function. // Run thread's main function.
thread->Run(); thread->Run();
thread->Done();
return 0; return 0;
} }

View file

@ -5,7 +5,6 @@
#include <pthread.h> #include <pthread.h>
#include <semaphore.h> #include <semaphore.h>
#include "Queue.h"
#include "util.h" #include "util.h"
using namespace std; using namespace std;
@ -42,22 +41,25 @@ public:
* *
* This method is safe to call from any thread. * This method is safe to call from any thread.
*/ */
const string& Name() const { return name; } const char* Name() const { return name; }
/** /**
* Sets a descriptive name for the thread. This should be a string * Sets a descriptive name for the thread. This should be a string
* that's useful in output presented to the user and uniquely * that's useful in output presented to the user and uniquely
* identifies the thread. * identifies the thread.
* *
* This method must be called only from the thread itself. * This method must be called only from main thread at initialization
* time.
*/ */
void SetName(const string& name); void SetName(const char* name);
/** /**
* Set the name shown by the OS as the thread's description. Not * Set the name shown by the OS as the thread's description. Not
* supported on all OSs. * supported on all OSs.
*
* Must be called only from the child thread.
*/ */
void SetOSName(const string& name); void SetOSName(const char* name);
/** /**
* Starts the thread. Calling this methods will spawn a new OS thread * Starts the thread. Calling this methods will spawn a new OS thread
@ -68,6 +70,18 @@ public:
*/ */
void Start(); void Start();
/**
* Signals the thread to prepare for stopping. This must be called
* before Stop() and allows the thread to trigger shutting down
* without yet blocking for doing so.
*
* Calling this method has no effect if Start() hasn't been executed
* yet.
*
* Only Bro's main thread must call this method.
*/
void PrepareStop();
/** /**
* Signals the thread to stop. The method lets Terminating() now * Signals the thread to stop. The method lets Terminating() now
* return true. It does however not force the thread to terminate. * return true. It does however not force the thread to terminate.
@ -88,6 +102,13 @@ public:
*/ */
bool Terminating() const { return terminating; } bool Terminating() const { return terminating; }
/**
* Returns true if Kill() has been called.
*
* This method is safe to call from any thread.
*/
bool Killed() const { return killed; }
/** /**
* A version of fmt() that the thread can safely use. * A version of fmt() that the thread can safely use.
* *
@ -124,12 +145,24 @@ protected:
virtual void OnStart() {} virtual void OnStart() {}
/** /**
* Executed with Stop(). This is a hook into stopping the thread. It * Executed with PrepareStop() (and before OnStop()). This is a hook
* will be called from Bro's main thread after the thread has been * into preparing the thread for stopping. It will be called from
* signaled to stop. * Bro's main thread before the thread has been signaled to stop.
*/
virtual void OnPrepareStop() {}
/**
* Executed with Stop() (and after OnPrepareStop()). This is a hook
* into stopping the thread. It will be called from Bro's main thread
* after the thread has been signaled to stop.
*/ */
virtual void OnStop() {} virtual void OnStop() {}
/**
* Executed with Kill(). This is a hook into killing the thread.
*/
virtual void OnKill() {}
/** /**
* Destructor. This will be called by the manager. * Destructor. This will be called by the manager.
* *
@ -153,14 +186,18 @@ protected:
*/ */
void Kill(); void Kill();
/** Called by child thread's launcher when it's done processing. */
void Done();
private: private:
// pthread entry function. // pthread entry function.
static void* launcher(void *arg); static void* launcher(void *arg);
string name; const char* name;
pthread_t pthread; pthread_t pthread;
bool started; // Set to to true once running. bool started; // Set to to true once running.
bool terminating; // Set to to true to signal termination. bool terminating; // Set to to true to signal termination.
bool killed; // Set to true once forcefully killed.
// Used as a semaphore to tell the pthread thread when it may // Used as a semaphore to tell the pthread thread when it may
// terminate. // terminate.

View file

@ -30,6 +30,10 @@ void Manager::Terminate()
do Process(); while ( did_process ); do Process(); while ( did_process );
// Signal all to stop. // Signal all to stop.
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
(*i)->PrepareStop();
for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ ) for ( all_thread_list::iterator i = all_threads.begin(); i != all_threads.end(); i++ )
(*i)->Stop(); (*i)->Stop();
@ -50,14 +54,14 @@ void Manager::Terminate()
void Manager::AddThread(BasicThread* thread) void Manager::AddThread(BasicThread* thread)
{ {
DBG_LOG(DBG_THREADING, "Adding thread %s ...", thread->Name().c_str()); DBG_LOG(DBG_THREADING, "Adding thread %s ...", thread->Name());
all_threads.push_back(thread); all_threads.push_back(thread);
idle = false; idle = false;
} }
void Manager::AddMsgThread(MsgThread* thread) void Manager::AddMsgThread(MsgThread* thread)
{ {
DBG_LOG(DBG_THREADING, "%s is a MsgThread ...", thread->Name().c_str()); DBG_LOG(DBG_THREADING, "%s is a MsgThread ...", thread->Name());
msg_threads.push_back(thread); msg_threads.push_back(thread);
} }
@ -114,6 +118,12 @@ void Manager::Process()
{ {
Message* msg = t->RetrieveOut(); Message* msg = t->RetrieveOut();
if ( ! msg )
{
assert(t->Killed());
break;
}
if ( msg->Process() ) if ( msg->Process() )
{ {
if ( network_time ) if ( network_time )
@ -122,8 +132,7 @@ void Manager::Process()
else else
{ {
string s = msg->Name() + " failed, terminating thread"; reporter->Error("%s failed, terminating thread", msg->Name());
reporter->Error("%s", s.c_str());
t->Stop(); t->Stop();
} }

View file

@ -29,16 +29,6 @@ private:
double network_time; double network_time;
}; };
// A dummy message that's only purpose is unblock the current read operation
// so that the child's Run() methods can check the termination status.
class UnblockMessage : public InputMessage<MsgThread>
{
public:
UnblockMessage(MsgThread* thread) : InputMessage<MsgThread>("Unblock", thread) { }
virtual bool Process() { return true; }
};
/// Sends a heartbeat to the child thread. /// Sends a heartbeat to the child thread.
class HeartbeatMessage : public InputMessage<MsgThread> class HeartbeatMessage : public InputMessage<MsgThread>
{ {
@ -66,14 +56,16 @@ public:
INTERNAL_WARNING, INTERNAL_ERROR INTERNAL_WARNING, INTERNAL_ERROR
}; };
ReporterMessage(Type arg_type, MsgThread* thread, const string& arg_msg) ReporterMessage(Type arg_type, MsgThread* thread, const char* arg_msg)
: OutputMessage<MsgThread>("ReporterMessage", thread) : OutputMessage<MsgThread>("ReporterMessage", thread)
{ type = arg_type; msg = arg_msg; } { type = arg_type; msg = copy_string(arg_msg); }
~ReporterMessage() { delete [] msg; }
virtual bool Process(); virtual bool Process();
private: private:
string msg; const char* msg;
Type type; Type type;
}; };
@ -82,18 +74,19 @@ private:
class DebugMessage : public OutputMessage<MsgThread> class DebugMessage : public OutputMessage<MsgThread>
{ {
public: public:
DebugMessage(DebugStream arg_stream, MsgThread* thread, const string& arg_msg) DebugMessage(DebugStream arg_stream, MsgThread* thread, const char* arg_msg)
: OutputMessage<MsgThread>("DebugMessage", thread) : OutputMessage<MsgThread>("DebugMessage", thread)
{ stream = arg_stream; msg = arg_msg; } { stream = arg_stream; msg = copy_string(arg_msg); }
virtual ~DebugMessage() { delete [] msg; }
virtual bool Process() virtual bool Process()
{ {
string s = Object()->Name() + ": " + msg; debug_logger.Log(stream, "%s: %s", Object()->Name(), msg);
debug_logger.Log(stream, "%s", s.c_str());
return true; return true;
} }
private: private:
string msg; const char* msg;
DebugStream stream; DebugStream stream;
}; };
#endif #endif
@ -104,41 +97,39 @@ private:
Message::~Message() Message::~Message()
{ {
delete [] name;
} }
bool ReporterMessage::Process() bool ReporterMessage::Process()
{ {
string s = Object()->Name() + ": " + msg;
const char* cmsg = s.c_str();
switch ( type ) { switch ( type ) {
case INFO: case INFO:
reporter->Info("%s", cmsg); reporter->Info("%s: %s", Object()->Name(), msg);
break; break;
case WARNING: case WARNING:
reporter->Warning("%s", cmsg); reporter->Warning("%s: %s", Object()->Name(), msg);
break; break;
case ERROR: case ERROR:
reporter->Error("%s", cmsg); reporter->Error("%s: %s", Object()->Name(), msg);
break; break;
case FATAL_ERROR: case FATAL_ERROR:
reporter->FatalError("%s", cmsg); reporter->FatalError("%s: %s", Object()->Name(), msg);
break; break;
case FATAL_ERROR_WITH_CORE: case FATAL_ERROR_WITH_CORE:
reporter->FatalErrorWithCore("%s", cmsg); reporter->FatalErrorWithCore("%s: %s", Object()->Name(), msg);
break; break;
case INTERNAL_WARNING: case INTERNAL_WARNING:
reporter->InternalWarning("%s", cmsg); reporter->InternalWarning("%s: %s", Object()->Name(), msg);
break; break;
case INTERNAL_ERROR : case INTERNAL_ERROR :
reporter->InternalError("%s", cmsg); reporter->InternalError("%s: %s", Object()->Name(), msg);
break; break;
default: default:
@ -148,62 +139,78 @@ bool ReporterMessage::Process()
return true; return true;
} }
MsgThread::MsgThread() : BasicThread() MsgThread::MsgThread() : BasicThread(), queue_in(this, 0), queue_out(0, this)
{ {
cnt_sent_in = cnt_sent_out = 0; cnt_sent_in = cnt_sent_out = 0;
finished = false; finished = false;
stopped = false;
thread_mgr->AddMsgThread(this); thread_mgr->AddMsgThread(this);
} }
// Set by Bro's main signal handler. // Set by Bro's main signal handler.
extern int signal_val; extern int signal_val;
void MsgThread::OnStop() void MsgThread::OnPrepareStop()
{ {
if ( stopped ) if ( finished || Killed() )
return; return;
// XX fprintf(stderr, "Sending FINISH to thread %s ...\n", Name());
// Signal thread to terminate and wait until it has acknowledged. // Signal thread to terminate and wait until it has acknowledged.
SendIn(new FinishMessage(this, network_time), true); SendIn(new FinishMessage(this, network_time), true);
}
void MsgThread::OnStop()
{
int signal_count = 0;
int old_signal_val = signal_val; int old_signal_val = signal_val;
signal_val = 0; signal_val = 0;
int cnt = 0; int cnt = 0;
bool aborted = 0; uint64_t last_size = 0;
uint64_t cur_size = 0;
while ( ! finished ) // XX fprintf(stderr, "WAITING for thread %s to stop ...\n", Name());
while ( ! (finished || Killed() ) )
{ {
// Terminate if we get another kill signal. // Terminate if we get another kill signal.
if ( signal_val == SIGTERM || signal_val == SIGINT ) if ( signal_val == SIGTERM || signal_val == SIGINT )
{
++signal_count;
if ( signal_count == 1 )
{ {
// Abort all threads here so that we won't hang next // Abort all threads here so that we won't hang next
// on another one. // on another one.
fprintf(stderr, "received signal while waiting for thread %s, aborting all ...\n", Name().c_str()); fprintf(stderr, "received signal while waiting for thread %s, aborting all ...\n", Name());
thread_mgr->KillThreads(); thread_mgr->KillThreads();
aborted = true; }
break; else
{
// More than one signal. Abort processing
// right away. on another one.
fprintf(stderr, "received another signal while waiting for thread %s, aborting processing\n", Name());
exit(1);
} }
if ( ++cnt % 10000 == 0 ) // Insurance against broken threads ... signal_val = 0;
{
fprintf(stderr, "killing thread %s ...\n", Name().c_str());
Kill();
aborted = true;
break;
} }
queue_in.WakeUp();
usleep(1000); usleep(1000);
} }
Finished();
signal_val = old_signal_val; signal_val = old_signal_val;
}
// One more message to make sure the current queue read operation unblocks. void MsgThread::OnKill()
if ( ! aborted ) {
SendIn(new UnblockMessage(this), true); // Send a message to unblock the reader if its currently waiting for
// input. This is just an optimization to make it terminate more
// quickly, even without the message it will eventually time out.
queue_in.WakeUp();
} }
void MsgThread::Heartbeat() void MsgThread::Heartbeat()
@ -213,9 +220,7 @@ void MsgThread::Heartbeat()
void MsgThread::HeartbeatInChild() void MsgThread::HeartbeatInChild()
{ {
string n = Name(); string n = Fmt("bro: %s (%" PRIu64 "/%" PRIu64 ")", Name(),
n = Fmt("bro: %s (%" PRIu64 "/%" PRIu64 ")", n.c_str(),
cnt_sent_in - queue_in.Size(), cnt_sent_in - queue_in.Size(),
cnt_sent_out - queue_out.Size()); cnt_sent_out - queue_out.Size());
@ -283,7 +288,7 @@ void MsgThread::SendIn(BasicInputMessage* msg, bool force)
return; return;
} }
DBG_LOG(DBG_THREADING, "Sending '%s' to %s ...", msg->Name().c_str(), Name().c_str()); DBG_LOG(DBG_THREADING, "Sending '%s' to %s ...", msg->Name(), Name());
queue_in.Put(msg); queue_in.Put(msg);
++cnt_sent_in; ++cnt_sent_in;
@ -306,9 +311,10 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force)
BasicOutputMessage* MsgThread::RetrieveOut() BasicOutputMessage* MsgThread::RetrieveOut()
{ {
BasicOutputMessage* msg = queue_out.Get(); BasicOutputMessage* msg = queue_out.Get();
assert(msg); if ( ! msg )
return 0;
DBG_LOG(DBG_THREADING, "Retrieved '%s' from %s", msg->Name().c_str(), Name().c_str()); DBG_LOG(DBG_THREADING, "Retrieved '%s' from %s", msg->Name(), Name());
return msg; return msg;
} }
@ -316,10 +322,12 @@ BasicOutputMessage* MsgThread::RetrieveOut()
BasicInputMessage* MsgThread::RetrieveIn() BasicInputMessage* MsgThread::RetrieveIn()
{ {
BasicInputMessage* msg = queue_in.Get(); BasicInputMessage* msg = queue_in.Get();
assert(msg);
if ( ! msg )
return 0;
#ifdef DEBUG #ifdef DEBUG
string s = Fmt("Retrieved '%s' in %s", msg->Name().c_str(), Name().c_str()); string s = Fmt("Retrieved '%s' in %s", msg->Name(), Name());
Debug(DBG_THREADING, s.c_str()); Debug(DBG_THREADING, s.c_str());
#endif #endif
@ -328,15 +336,18 @@ BasicInputMessage* MsgThread::RetrieveIn()
void MsgThread::Run() void MsgThread::Run()
{ {
while ( ! finished ) while ( ! (finished || Killed() ) )
{ {
BasicInputMessage* msg = RetrieveIn(); BasicInputMessage* msg = RetrieveIn();
if ( ! msg )
continue;
bool result = msg->Process(); bool result = msg->Process();
if ( ! result ) if ( ! result )
{ {
string s = msg->Name() + " failed, terminating thread (MsgThread)"; string s = Fmt("%s failed, terminating thread (MsgThread)", Name());
Error(s.c_str()); Error(s.c_str());
break; break;
} }

View file

@ -228,6 +228,8 @@ protected:
*/ */
virtual void Run(); virtual void Run();
virtual void OnStop(); virtual void OnStop();
virtual void OnPrepareStop();
virtual void OnKill();
private: private:
/** /**
@ -293,7 +295,6 @@ private:
uint64_t cnt_sent_out; // Counts message sent by child. uint64_t cnt_sent_out; // Counts message sent by child.
bool finished; // Set to true by Finished message. bool finished; // Set to true by Finished message.
bool stopped; // Set to true by OnStop().
}; };
/** /**
@ -312,7 +313,7 @@ public:
* what's passed into the constructor and used mainly for debugging * what's passed into the constructor and used mainly for debugging
* purposes. * purposes.
*/ */
const string& Name() const { return name; } const char* Name() const { return name; }
/** /**
* Callback that must be overriden for processing a message. * Callback that must be overriden for processing a message.
@ -326,10 +327,11 @@ protected:
* @param arg_name A descriptive name for the type of message. Used * @param arg_name A descriptive name for the type of message. Used
* mainly for debugging purposes. * mainly for debugging purposes.
*/ */
Message(const string& arg_name) { name = arg_name; } Message(const char* arg_name)
{ name = copy_string(arg_name); }
private: private:
string name; const char* name;
}; };
/** /**
@ -344,7 +346,7 @@ protected:
* @param name A descriptive name for the type of message. Used * @param name A descriptive name for the type of message. Used
* mainly for debugging purposes. * mainly for debugging purposes.
*/ */
BasicInputMessage(const string& name) : Message(name) {} BasicInputMessage(const char* name) : Message(name) {}
}; };
/** /**
@ -359,7 +361,7 @@ protected:
* @param name A descriptive name for the type of message. Used * @param name A descriptive name for the type of message. Used
* mainly for debugging purposes. * mainly for debugging purposes.
*/ */
BasicOutputMessage(const string& name) : Message(name) {} BasicOutputMessage(const char* name) : Message(name) {}
}; };
/** /**
@ -384,7 +386,7 @@ protected:
* *
* @param arg_object: An object to store with the message. * @param arg_object: An object to store with the message.
*/ */
InputMessage(const string& name, O* arg_object) : BasicInputMessage(name) InputMessage(const char* name, O* arg_object) : BasicInputMessage(name)
{ object = arg_object; } { object = arg_object; }
private: private:
@ -413,7 +415,7 @@ protected:
* *
* @param arg_object An object to store with the message. * @param arg_object An object to store with the message.
*/ */
OutputMessage(const string& name, O* arg_object) : BasicOutputMessage(name) OutputMessage(const char* name, O* arg_object) : BasicOutputMessage(name)
{ object = arg_object; } { object = arg_object; }
private: private:

View file

@ -1,4 +1,3 @@
#ifndef THREADING_QUEUE_H #ifndef THREADING_QUEUE_H
#define THREADING_QUEUE_H #define THREADING_QUEUE_H
@ -6,11 +5,28 @@
#include <queue> #include <queue>
#include <deque> #include <deque>
#include <stdint.h> #include <stdint.h>
#include <sys/time.h>
#include "Reporter.h" #include "Reporter.h"
#include "BasicThread.h"
#undef Queue // Defined elsewhere unfortunately. #undef Queue // Defined elsewhere unfortunately.
#if 1
// We don't have pthread spinlocks on DARWIN.
# define PTHREAD_MUTEX_T pthread_mutex_t
# define PTHREAD_MUTEX_LOCK(x) pthread_mutex_lock(x)
# define PTHREAD_MUTEX_UNLOCK(x) pthread_mutex_unlock(x)
# define PTHREAD_MUTEX_INIT(x) pthread_mutex_init(x, 0)
# define PTHREAD_MUTEX_DESTROY(x) pthread_mutex_destroy(x)
#else
# define PTHREAD_MUTEX_T pthrea_spinlock_T
# define PTHREAD_MUTEX_LOCK(x) pthrea_spin_lock(x)
# define PTHREAD_MUTEX_UNLOCK(x) pthrea_spin_unlock(x)
# define PTHREAD_MUTEX_INIT(x) pthrea_spin_init(x, PTHREAD_PROCESS_PRIVATE)
# define PTHREAD_MUTEX_DESTROY(x) pthrea_spin_destroy(x)
#endif
namespace threading { namespace threading {
/** /**
@ -30,8 +46,12 @@ class Queue
public: public:
/** /**
* Constructor. * Constructor.
*
* reader, writer: The corresponding threads. This is for checking
* whether they have terminated so that we can abort I/O opeations.
* Can be left null for the main thread.
*/ */
Queue(); Queue(BasicThread* arg_reader, BasicThread* arg_writer);
/** /**
* Destructor. * Destructor.
@ -39,7 +59,9 @@ public:
~Queue(); ~Queue();
/** /**
* Retrieves one elment. * Retrieves one elment. This may block for a little while of no
* input is available and eventually return with a null element if
* nothing shows up.
*/ */
T Get(); T Get();
@ -60,6 +82,11 @@ public:
*/ */
bool MaybeReady() { return ( ( read_ptr - write_ptr) != 0 ); } bool MaybeReady() { return ( ( read_ptr - write_ptr) != 0 ); }
/** Wake up the reader if it's currently blocked for input. This is
primarily to give it a chance to check termination quickly.
**/
void WakeUp();
/** /**
* Returns the number of queued items not yet retrieved. * Returns the number of queued items not yet retrieved.
*/ */
@ -82,45 +109,50 @@ public:
void GetStats(Stats* stats); void GetStats(Stats* stats);
private: private:
static const int NUM_QUEUES = 8; static const int NUM_QUEUES = 15;
pthread_mutex_t mutex[NUM_QUEUES]; // Mutex protected shared accesses. PTHREAD_MUTEX_T mutex[NUM_QUEUES]; // Mutex protected shared accesses.
pthread_cond_t has_data[NUM_QUEUES]; // Signals when data becomes available pthread_cond_t has_data[NUM_QUEUES]; // Signals when data becomes available
std::queue<T> messages[NUM_QUEUES]; // Actually holds the queued messages std::queue<T> messages[NUM_QUEUES]; // Actually holds the queued messages
int read_ptr; // Where the next operation will read from int read_ptr; // Where the next operation will read from
int write_ptr; // Where the next operation will write to int write_ptr; // Where the next operation will write to
BasicThread* reader;
BasicThread* writer;
// Statistics. // Statistics.
uint64_t num_reads; uint64_t num_reads;
uint64_t num_writes; uint64_t num_writes;
}; };
inline static void safe_lock(pthread_mutex_t* mutex) inline static void safe_lock(PTHREAD_MUTEX_T* mutex)
{ {
if ( pthread_mutex_lock(mutex) != 0 ) if ( PTHREAD_MUTEX_LOCK(mutex) != 0 )
reporter->FatalErrorWithCore("cannot lock mutex"); reporter->FatalErrorWithCore("cannot lock mutex");
} }
inline static void safe_unlock(pthread_mutex_t* mutex) inline static void safe_unlock(PTHREAD_MUTEX_T* mutex)
{ {
if ( pthread_mutex_unlock(mutex) != 0 ) if ( PTHREAD_MUTEX_UNLOCK(mutex) != 0 )
reporter->FatalErrorWithCore("cannot unlock mutex"); reporter->FatalErrorWithCore("cannot unlock mutex");
} }
template<typename T> template<typename T>
inline Queue<T>::Queue() inline Queue<T>::Queue(BasicThread* arg_reader, BasicThread* arg_writer)
{ {
read_ptr = 0; read_ptr = 0;
write_ptr = 0; write_ptr = 0;
num_reads = num_writes = 0; num_reads = num_writes = 0;
reader = arg_reader;
writer = arg_writer;
for( int i = 0; i < NUM_QUEUES; ++i ) for( int i = 0; i < NUM_QUEUES; ++i )
{ {
if ( pthread_cond_init(&has_data[i], NULL) != 0 ) if ( pthread_cond_init(&has_data[i], 0) != 0 )
reporter->FatalError("cannot init queue condition variable"); reporter->FatalError("cannot init queue condition variable");
if ( pthread_mutex_init(&mutex[i], NULL) != 0 ) if ( PTHREAD_MUTEX_INIT(&mutex[i]) != 0 )
reporter->FatalError("cannot init queue mutex"); reporter->FatalError("cannot init queue mutex");
} }
} }
@ -131,19 +163,30 @@ inline Queue<T>::~Queue()
for( int i = 0; i < NUM_QUEUES; ++i ) for( int i = 0; i < NUM_QUEUES; ++i )
{ {
pthread_cond_destroy(&has_data[i]); pthread_cond_destroy(&has_data[i]);
pthread_mutex_destroy(&mutex[i]); PTHREAD_MUTEX_DESTROY(&mutex[i]);
} }
} }
template<typename T> template<typename T>
inline T Queue<T>::Get() inline T Queue<T>::Get()
{ {
if ( (reader && reader->Killed()) || (writer && writer->Killed()) )
return 0;
safe_lock(&mutex[read_ptr]); safe_lock(&mutex[read_ptr]);
int old_read_ptr = read_ptr; int old_read_ptr = read_ptr;
if ( messages[read_ptr].empty() ) if ( messages[read_ptr].empty() )
pthread_cond_wait(&has_data[read_ptr], &mutex[read_ptr]); {
struct timespec ts;
ts.tv_sec = time(0) + 5;
ts.tv_nsec = 0;
pthread_cond_timedwait(&has_data[read_ptr], &mutex[read_ptr], &ts);
safe_unlock(&mutex[read_ptr]);
return 0;
}
T data = messages[read_ptr].front(); T data = messages[read_ptr].front();
messages[read_ptr].pop(); messages[read_ptr].pop();
@ -222,6 +265,17 @@ inline void Queue<T>::GetStats(Stats* stats)
safe_unlock(&mutex[i]); safe_unlock(&mutex[i]);
} }
template<typename T>
inline void Queue<T>::WakeUp()
{
for ( int i = 0; i < NUM_QUEUES; i++ )
{
safe_lock(&mutex[i]);
pthread_cond_signal(&has_data[i]);
safe_unlock(&mutex[i]);
}
}
} }

View file

@ -11,23 +11,54 @@ bool Field::Read(SerializationFormat* fmt)
{ {
int t; int t;
int st; int st;
string tmp_name;
bool have_2nd;
bool success = (fmt->Read(&name, "name") if ( ! fmt->Read(&have_2nd, "have_2nd") )
&& fmt->Read(&secondary_name, "secondary_name") return false;
if ( have_2nd )
{
string tmp_secondary_name;
if ( ! fmt->Read(&tmp_secondary_name, "secondary_name") )
return false;
secondary_name = copy_string(tmp_secondary_name.c_str());
}
else
secondary_name = 0;
bool success = (fmt->Read(&tmp_name, "name")
&& fmt->Read(&t, "type") && fmt->Read(&t, "type")
&& fmt->Read(&st, "subtype") && fmt->Read(&st, "subtype")
&& fmt->Read(&optional, "optional")); && fmt->Read(&optional, "optional"));
if ( ! success )
return false;
name = copy_string(tmp_name.c_str());
type = (TypeTag) t; type = (TypeTag) t;
subtype = (TypeTag) st; subtype = (TypeTag) st;
return success; return true;
} }
bool Field::Write(SerializationFormat* fmt) const bool Field::Write(SerializationFormat* fmt) const
{ {
assert(name);
if ( secondary_name )
{
if ( ! (fmt->Write(true, "have_2nd")
&& fmt->Write(secondary_name, "secondary_name")) )
return false;
}
else
if ( ! fmt->Write(false, "have_2nd") )
return false;
return (fmt->Write(name, "name") return (fmt->Write(name, "name")
&& fmt->Write(secondary_name, "secondary_name")
&& fmt->Write((int)type, "type") && fmt->Write((int)type, "type")
&& fmt->Write((int)subtype, "subtype"), && fmt->Write((int)subtype, "subtype"),
fmt->Write(optional, "optional")); fmt->Write(optional, "optional"));
@ -51,7 +82,7 @@ Value::~Value()
{ {
if ( (type == TYPE_ENUM || type == TYPE_STRING || type == TYPE_FILE || type == TYPE_FUNC) if ( (type == TYPE_ENUM || type == TYPE_STRING || type == TYPE_FILE || type == TYPE_FUNC)
&& present ) && present )
delete val.string_val; delete [] val.string_val.data;
if ( type == TYPE_TABLE && present ) if ( type == TYPE_TABLE && present )
{ {
@ -224,10 +255,7 @@ bool Value::Read(SerializationFormat* fmt)
case TYPE_STRING: case TYPE_STRING:
case TYPE_FILE: case TYPE_FILE:
case TYPE_FUNC: case TYPE_FUNC:
{ return fmt->Read(&val.string_val.data, &val.string_val.length, "string");
val.string_val = new string;
return fmt->Read(val.string_val, "string");
}
case TYPE_TABLE: case TYPE_TABLE:
{ {
@ -339,7 +367,7 @@ bool Value::Write(SerializationFormat* fmt) const
case TYPE_STRING: case TYPE_STRING:
case TYPE_FILE: case TYPE_FILE:
case TYPE_FUNC: case TYPE_FUNC:
return fmt->Write(*val.string_val, "string"); return fmt->Write(val.string_val.data, val.string_val.length, "string");
case TYPE_TABLE: case TYPE_TABLE:
{ {

View file

@ -12,6 +12,7 @@
using namespace std; using namespace std;
class SerializationFormat; class SerializationFormat;
class RemoteSerializer;
namespace threading { namespace threading {
@ -19,10 +20,10 @@ namespace threading {
* Definition of a log file, i.e., one column of a log stream. * Definition of a log file, i.e., one column of a log stream.
*/ */
struct Field { struct Field {
string name; //! Name of the field. const char* name; //! Name of the field.
//! Needed by input framework. Port fields have two names (one for the //! Needed by input framework. Port fields have two names (one for the
//! port, one for the type), and this specifies the secondary name. //! port, one for the type), and this specifies the secondary name.
string secondary_name; const char* secondary_name;
TypeTag type; //! Type of the field. TypeTag type; //! Type of the field.
TypeTag subtype; //! Inner type for sets. TypeTag subtype; //! Inner type for sets.
bool optional; //! True if field is optional. bool optional; //! True if field is optional.
@ -30,13 +31,24 @@ struct Field {
/** /**
* Constructor. * Constructor.
*/ */
Field() { subtype = TYPE_VOID; optional = false; } Field(const char* name, const char* secondary_name, TypeTag type, TypeTag subtype, bool optional)
: name(name ? copy_string(name) : 0),
secondary_name(secondary_name ? copy_string(secondary_name) : 0),
type(type), subtype(subtype), optional(optional) { }
/** /**
* Copy constructor. * Copy constructor.
*/ */
Field(const Field& other) Field(const Field& other)
: name(other.name), type(other.type), subtype(other.subtype), optional(other.optional) { } : name(other.name ? copy_string(other.name) : 0),
secondary_name(other.secondary_name ? copy_string(other.secondary_name) : 0),
type(other.type), subtype(other.subtype), optional(other.optional) { }
~Field()
{
delete [] name;
delete [] secondary_name;
}
/** /**
* Unserializes a field. * Unserializes a field.
@ -63,6 +75,12 @@ struct Field {
* thread-safe. * thread-safe.
*/ */
string TypeName() const; string TypeName() const;
private:
friend class ::RemoteSerializer;
// Force usage of constructor above.
Field() {};
}; };
/** /**
@ -102,7 +120,11 @@ struct Value {
vec_t vector_val; vec_t vector_val;
addr_t addr_val; addr_t addr_val;
subnet_t subnet_val; subnet_t subnet_val;
string* string_val;
struct {
char* data;
int length;
} string_val;
} val; } val;
/** /**
@ -147,7 +169,7 @@ struct Value {
static bool IsCompatibleType(BroType* t, bool atomic_only=false); static bool IsCompatibleType(BroType* t, bool atomic_only=false);
private: private:
friend class ::IPAddr; friend class ::IPAddr;
Value(const Value& other) { } // Disabled. Value(const Value& other) { } // Disabled.
}; };

View file

@ -11,8 +11,8 @@
# @TEST-EXEC: cat receiver/http.log $SCRIPTS/diff-remove-timestamps >receiver.http.log # @TEST-EXEC: cat receiver/http.log $SCRIPTS/diff-remove-timestamps >receiver.http.log
# @TEST-EXEC: cmp sender.http.log receiver.http.log # @TEST-EXEC: cmp sender.http.log receiver.http.log
# #
# @TEST-EXEC: bro -x sender/events.bst | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' >events.snd.log # @TEST-EXEC: bro -x sender/events.bst | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' | $SCRIPTS/diff-remove-timestamps >events.snd.log
# @TEST-EXEC: bro -x receiver/events.bst | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' >events.rec.log # @TEST-EXEC: bro -x receiver/events.bst | sed 's/^Event \[[-0-9.]*\] //g' | grep '^http_' | grep -v http_stats | sed 's/(.*$//g' | $SCRIPTS/diff-remove-timestamps >events.rec.log
# @TEST-EXEC: btest-diff events.rec.log # @TEST-EXEC: btest-diff events.rec.log
# @TEST-EXEC: btest-diff events.snd.log # @TEST-EXEC: btest-diff events.snd.log
# @TEST-EXEC: cmp events.rec.log events.snd.log # @TEST-EXEC: cmp events.rec.log events.snd.log

View file

@ -3,6 +3,4 @@
# Replace anything which looks like timestamps with XXXs (including the #start/end markers in logs). # Replace anything which looks like timestamps with XXXs (including the #start/end markers in logs).
sed 's/[0-9]\{10\}\.[0-9]\{2,8\}/XXXXXXXXXX.XXXXXX/g' | \ sed 's/[0-9]\{10\}\.[0-9]\{2,8\}/XXXXXXXXXX.XXXXXX/g' | \
sed 's/^#\(start\|end\).20..-..-..-..-..-..$/#\1 XXXX-XX-XX-XX-XX-XX/g' | \ sed 's/^#\(start\|end\).20..-..-..-..-..-..$/#\1 XXXX-XX-XX-XX-XX-XX/g'
grep -v '#start' | grep -v '#end'