make input framework source (hopefully) adhere to the usual indentation

style. No functional changes.
This commit is contained in:
Bernhard Amann 2012-05-25 10:33:22 -07:00
parent 82a6f3832a
commit 2034c10e97
6 changed files with 1102 additions and 897 deletions

File diff suppressed because it is too large Load diff

View file

@ -15,7 +15,8 @@ public:
: threading::OutputMessage<ReaderFrontend>("Put", reader), : threading::OutputMessage<ReaderFrontend>("Put", reader),
val(val) {} val(val) {}
virtual bool Process() { virtual bool Process()
{
input_mgr->Put(Object(), val); input_mgr->Put(Object(), val);
return true; return true;
} }
@ -30,7 +31,8 @@ public:
: threading::OutputMessage<ReaderFrontend>("Delete", reader), : threading::OutputMessage<ReaderFrontend>("Delete", reader),
val(val) {} val(val) {}
virtual bool Process() { virtual bool Process()
{
return input_mgr->Delete(Object(), val); return input_mgr->Delete(Object(), val);
} }
@ -43,7 +45,8 @@ public:
ClearMessage(ReaderFrontend* reader) ClearMessage(ReaderFrontend* reader)
: threading::OutputMessage<ReaderFrontend>("Clear", reader) {} : threading::OutputMessage<ReaderFrontend>("Clear", reader) {}
virtual bool Process() { virtual bool Process()
{
input_mgr->Clear(Object()); input_mgr->Clear(Object());
return true; return true;
} }
@ -57,7 +60,8 @@ public:
: threading::OutputMessage<ReaderFrontend>("SendEvent", reader), : threading::OutputMessage<ReaderFrontend>("SendEvent", reader),
name(name), num_vals(num_vals), val(val) {} name(name), num_vals(num_vals), val(val) {}
virtual bool Process() { virtual bool Process()
{
bool success = input_mgr->SendEvent(name, num_vals, val); bool success = input_mgr->SendEvent(name, num_vals, val);
if ( !success ) if ( !success )
@ -78,7 +82,8 @@ public:
: threading::OutputMessage<ReaderFrontend>("SendEntry", reader), : threading::OutputMessage<ReaderFrontend>("SendEntry", reader),
val(val) { } val(val) { }
virtual bool Process() { virtual bool Process()
{
input_mgr->SendEntry(Object(), val); input_mgr->SendEntry(Object(), val);
return true; return true;
} }
@ -92,7 +97,8 @@ public:
EndCurrentSendMessage(ReaderFrontend* reader) EndCurrentSendMessage(ReaderFrontend* reader)
: threading::OutputMessage<ReaderFrontend>("EndCurrentSend", reader) {} : threading::OutputMessage<ReaderFrontend>("EndCurrentSend", reader) {}
virtual bool Process() { virtual bool Process()
{
input_mgr->EndCurrentSend(Object()); input_mgr->EndCurrentSend(Object());
return true; return true;
} }
@ -105,7 +111,8 @@ public:
ReaderClosedMessage(ReaderFrontend* reader) ReaderClosedMessage(ReaderFrontend* reader)
: threading::OutputMessage<ReaderFrontend>("ReaderClosed", reader) {} : threading::OutputMessage<ReaderFrontend>("ReaderClosed", reader) {}
virtual bool Process() { virtual bool Process()
{
return input_mgr->RemoveStreamContinuation(Object()); return input_mgr->RemoveStreamContinuation(Object());
} }
@ -119,12 +126,16 @@ public:
DisableMessage(ReaderFrontend* writer) DisableMessage(ReaderFrontend* writer)
: threading::OutputMessage<ReaderFrontend>("Disable", writer) {} : threading::OutputMessage<ReaderFrontend>("Disable", writer) {}
virtual bool Process() { Object()->SetDisable(); return true; } virtual bool Process()
{
Object()->SetDisable();
return true;
}
}; };
ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread() ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread()
{ {
buf = 0; buf = 0;
buf_len = 1024; buf_len = 1024;
disabled = true; // disabled will be set correcty in init. disabled = true; // disabled will be set correcty in init.
@ -132,45 +143,45 @@ ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread()
frontend = arg_frontend; frontend = arg_frontend;
SetName(frontend->Name()); SetName(frontend->Name());
} }
ReaderBackend::~ReaderBackend() ReaderBackend::~ReaderBackend()
{ {
}
}
void ReaderBackend::Put(Value* *val) void ReaderBackend::Put(Value* *val)
{ {
SendOut(new PutMessage(frontend, val)); SendOut(new PutMessage(frontend, val));
} }
void ReaderBackend::Delete(Value* *val) void ReaderBackend::Delete(Value* *val)
{ {
SendOut(new DeleteMessage(frontend, val)); SendOut(new DeleteMessage(frontend, val));
} }
void ReaderBackend::Clear() void ReaderBackend::Clear()
{ {
SendOut(new ClearMessage(frontend)); SendOut(new ClearMessage(frontend));
} }
void ReaderBackend::SendEvent(const string& name, const int num_vals, Value* *vals) void ReaderBackend::SendEvent(const string& name, const int num_vals, Value* *vals)
{ {
SendOut(new SendEventMessage(frontend, name, num_vals, vals)); SendOut(new SendEventMessage(frontend, name, num_vals, vals));
} }
void ReaderBackend::EndCurrentSend() void ReaderBackend::EndCurrentSend()
{ {
SendOut(new EndCurrentSendMessage(frontend)); SendOut(new EndCurrentSendMessage(frontend));
} }
void ReaderBackend::SendEntry(Value* *vals) void ReaderBackend::SendEntry(Value* *vals)
{ {
SendOut(new SendEntryMessage(frontend, vals)); SendOut(new SendEntryMessage(frontend, vals));
} }
bool ReaderBackend::Init(string arg_source, int mode, const int arg_num_fields, const threading::Field* const* arg_fields) bool ReaderBackend::Init(string arg_source, int mode, const int arg_num_fields,
{ const threading::Field* const* arg_fields)
{
source = arg_source; source = arg_source;
SetName("InputReader/"+source); SetName("InputReader/"+source);
@ -180,7 +191,8 @@ bool ReaderBackend::Init(string arg_source, int mode, const int arg_num_fields,
// disable if DoInit returns error. // disable if DoInit returns error.
int success = DoInit(arg_source, mode, arg_num_fields, arg_fields); int success = DoInit(arg_source, mode, arg_num_fields, arg_fields);
if ( !success ) { if ( !success )
{
Error("Init failed"); Error("Init failed");
DisableFrontend(); DisableFrontend();
} }
@ -188,78 +200,78 @@ bool ReaderBackend::Init(string arg_source, int mode, const int arg_num_fields,
disabled = !success; disabled = !success;
return success; return success;
} }
void ReaderBackend::Close() void ReaderBackend::Close()
{ {
DoClose(); DoClose();
disabled = true; disabled = true;
DisableFrontend(); DisableFrontend();
SendOut(new ReaderClosedMessage(frontend)); SendOut(new ReaderClosedMessage(frontend));
if ( fields != 0 ) { if ( fields != 0 )
{
for ( unsigned int i = 0; i < num_fields; i++ ) { for ( unsigned int i = 0; i < num_fields; i++ )
delete(fields[i]); delete(fields[i]);
}
delete[] (fields); delete[] (fields);
fields = 0; fields = 0;
} }
} }
bool ReaderBackend::Update() bool ReaderBackend::Update()
{ {
if ( disabled ) if ( disabled )
return false; return false;
bool success = DoUpdate(); bool success = DoUpdate();
if ( !success ) { if ( !success )
DisableFrontend(); DisableFrontend();
}
return success; return success;
} }
void ReaderBackend::DisableFrontend() void ReaderBackend::DisableFrontend()
{ {
disabled = true; // we also set disabled here, because there still may be other messages queued and we will dutifully ignore these from now disabled = true;
// we also set disabled here, because there still may be other messages queued and we will dutifully ignore these from now
SendOut(new DisableMessage(frontend)); SendOut(new DisableMessage(frontend));
} }
bool ReaderBackend::DoHeartbeat(double network_time, double current_time) bool ReaderBackend::DoHeartbeat(double network_time, double current_time)
{ {
MsgThread::DoHeartbeat(network_time, current_time); MsgThread::DoHeartbeat(network_time, current_time);
return true; return true;
}
TransportProto ReaderBackend::StringToProto(const string &proto) {
if ( proto == "unknown" ) {
return TRANSPORT_UNKNOWN;
} else if ( proto == "tcp" ) {
return TRANSPORT_TCP;
} else if ( proto == "udp" ) {
return TRANSPORT_UDP;
} else if ( proto == "icmp" ) {
return TRANSPORT_ICMP;
} }
TransportProto ReaderBackend::StringToProto(const string &proto)
{
if ( proto == "unknown" )
return TRANSPORT_UNKNOWN;
else if ( proto == "tcp" )
return TRANSPORT_TCP;
else if ( proto == "udp" )
return TRANSPORT_UDP;
else if ( proto == "icmp" )
return TRANSPORT_ICMP;
Error(Fmt("Tried to parse invalid/unknown protocol: %s", proto.c_str())); Error(Fmt("Tried to parse invalid/unknown protocol: %s", proto.c_str()));
return TRANSPORT_UNKNOWN; return TRANSPORT_UNKNOWN;
} }
// more or less verbose copy from IPAddr.cc -- which uses reporter // more or less verbose copy from IPAddr.cc -- which uses reporter
Value::addr_t ReaderBackend::StringToAddr(const string &s) { Value::addr_t ReaderBackend::StringToAddr(const string &s)
{
Value::addr_t val; Value::addr_t val;
if ( s.find(':') == std::string::npos ) // IPv4. if ( s.find(':') == std::string::npos ) // IPv4.
{ {
val.family = IPv4; val.family = IPv4;
if ( inet_aton(s.c_str(), &(val.in.in4)) <= 0 ) { if ( inet_aton(s.c_str(), &(val.in.in4)) <= 0 )
{
Error(Fmt("Bad addres: %s", s.c_str())); Error(Fmt("Bad addres: %s", s.c_str()));
memset(&val.in.in4.s_addr, 0, sizeof(val.in.in4.s_addr)); memset(&val.in.in4.s_addr, 0, sizeof(val.in.in4.s_addr));
} }
@ -277,6 +289,6 @@ Value::addr_t ReaderBackend::StringToAddr(const string &s) {
} }
return val; return val;
} }
} }

View file

@ -46,19 +46,23 @@ public:
}; };
ReaderFrontend::ReaderFrontend(bro_int_t type) { ReaderFrontend::ReaderFrontend(bro_int_t type)
{
disabled = initialized = false; disabled = initialized = false;
ty_name = "<not set>"; ty_name = "<not set>";
backend = input_mgr->CreateBackend(this, type); backend = input_mgr->CreateBackend(this, type);
assert(backend); assert(backend);
backend->Start(); backend->Start();
} }
ReaderFrontend::~ReaderFrontend() { ReaderFrontend::~ReaderFrontend()
} {
}
void ReaderFrontend::Init(string arg_source, int mode, const int num_fields, const threading::Field* const* fields) { void ReaderFrontend::Init(string arg_source, int mode, const int num_fields,
const threading::Field* const* fields)
{
if ( disabled ) if ( disabled )
return; return;
@ -69,39 +73,43 @@ void ReaderFrontend::Init(string arg_source, int mode, const int num_fields, con
initialized = true; initialized = true;
backend->SendIn(new InitMessage(backend, arg_source, mode, num_fields, fields)); backend->SendIn(new InitMessage(backend, arg_source, mode, num_fields, fields));
} }
void ReaderFrontend::Update() { void ReaderFrontend::Update()
{
if ( disabled ) if ( disabled )
return; return;
if ( !initialized ) { if ( !initialized )
{
reporter->Error("Tried to call update on uninitialized reader"); reporter->Error("Tried to call update on uninitialized reader");
return; return;
} }
backend->SendIn(new UpdateMessage(backend)); backend->SendIn(new UpdateMessage(backend));
} }
void ReaderFrontend::Close() { void ReaderFrontend::Close()
{
if ( disabled ) if ( disabled )
return; return;
if ( !initialized ) { if ( !initialized )
{
reporter->Error("Tried to call finish on uninitialized reader"); reporter->Error("Tried to call finish on uninitialized reader");
return; return;
} }
backend->SendIn(new CloseMessage(backend)); backend->SendIn(new CloseMessage(backend));
} }
string ReaderFrontend::Name() const string ReaderFrontend::Name() const
{ {
if ( source.size() ) if ( source.size() )
return ty_name; return ty_name;
return ty_name + "/" + source; return ty_name + "/" + source;
} }
} }

View file

@ -23,69 +23,73 @@ using threading::Field;
FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, int arg_position) FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, int arg_position)
: name(arg_name), type(arg_type) : name(arg_name), type(arg_type)
{ {
position = arg_position; position = arg_position;
secondary_position = -1; secondary_position = -1;
present = true; present = true;
} }
FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, const TypeTag& arg_subtype, int arg_position) FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type,
const TypeTag& arg_subtype, int arg_position)
: name(arg_name), type(arg_type), subtype(arg_subtype) : name(arg_name), type(arg_type), subtype(arg_subtype)
{ {
position = arg_position; position = arg_position;
secondary_position = -1; secondary_position = -1;
present = true; present = true;
} }
FieldMapping::FieldMapping(const FieldMapping& arg) FieldMapping::FieldMapping(const FieldMapping& arg)
: name(arg.name), type(arg.type), subtype(arg.subtype), present(arg.present) : name(arg.name), type(arg.type), subtype(arg.subtype), present(arg.present)
{ {
position = arg.position; position = arg.position;
secondary_position = arg.secondary_position; secondary_position = arg.secondary_position;
} }
FieldMapping FieldMapping::subType() { FieldMapping FieldMapping::subType()
{
return FieldMapping(name, subtype, position); return FieldMapping(name, subtype, position);
} }
Ascii::Ascii(ReaderFrontend *frontend) : ReaderBackend(frontend) Ascii::Ascii(ReaderFrontend *frontend) : ReaderBackend(frontend)
{ {
file = 0; file = 0;
//keyMap = new map<string, string>();
separator.assign( (const char*) BifConst::InputAscii::separator->Bytes(), BifConst::InputAscii::separator->Len()); separator.assign( (const char*) BifConst::InputAscii::separator->Bytes(),
if ( separator.size() != 1 ) { BifConst::InputAscii::separator->Len());
if ( separator.size() != 1 )
Error("separator length has to be 1. Separator will be truncated."); Error("separator length has to be 1. Separator will be truncated.");
}
set_separator.assign( (const char*) BifConst::InputAscii::set_separator->Bytes(), BifConst::InputAscii::set_separator->Len()); set_separator.assign( (const char*) BifConst::InputAscii::set_separator->Bytes(),
if ( set_separator.size() != 1 ) { BifConst::InputAscii::set_separator->Len());
if ( set_separator.size() != 1 )
Error("set_separator length has to be 1. Separator will be truncated."); Error("set_separator length has to be 1. Separator will be truncated.");
}
empty_field.assign( (const char*) BifConst::InputAscii::empty_field->Bytes(), BifConst::InputAscii::empty_field->Len()); empty_field.assign( (const char*) BifConst::InputAscii::empty_field->Bytes(),
BifConst::InputAscii::empty_field->Len());
unset_field.assign( (const char*) BifConst::InputAscii::unset_field->Bytes(), BifConst::InputAscii::unset_field->Len()); unset_field.assign( (const char*) BifConst::InputAscii::unset_field->Bytes(),
BifConst::InputAscii::unset_field->Len());
} }
Ascii::~Ascii() Ascii::~Ascii()
{ {
DoClose(); DoClose();
} }
void Ascii::DoClose() void Ascii::DoClose()
{ {
if ( file != 0 ) { if ( file != 0 )
{
file->close(); file->close();
delete(file); delete(file);
file = 0; file = 0;
} }
} }
bool Ascii::DoInit(string path, int arg_mode, int arg_num_fields, const Field* const* arg_fields) bool Ascii::DoInit(string path, int arg_mode, int arg_num_fields, const Field* const* arg_fields)
{ {
fname = path; fname = path;
mode = arg_mode; mode = arg_mode;
mtime = 0; mtime = 0;
@ -93,20 +97,23 @@ bool Ascii::DoInit(string path, int arg_mode, int arg_num_fields, const Field* c
num_fields = arg_num_fields; num_fields = arg_num_fields;
fields = arg_fields; fields = arg_fields;
if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) { if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) )
{
Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str())); Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str()));
return false; return false;
} }
file = new ifstream(path.c_str()); file = new ifstream(path.c_str());
if ( !file->is_open() ) { if ( !file->is_open() )
{
Error(Fmt("Init: cannot open %s", fname.c_str())); Error(Fmt("Init: cannot open %s", fname.c_str()));
delete(file); delete(file);
file = 0; file = 0;
return false; return false;
} }
if ( ReadHeader(false) == false ) { if ( ReadHeader(false) == false )
{
Error(Fmt("Init: cannot open %s; headers are incorrect", fname.c_str())); Error(Fmt("Init: cannot open %s; headers are incorrect", fname.c_str()));
file->close(); file->close();
delete(file); delete(file);
@ -117,32 +124,33 @@ bool Ascii::DoInit(string path, int arg_mode, int arg_num_fields, const Field* c
DoUpdate(); DoUpdate();
return true; return true;
} }
bool Ascii::ReadHeader(bool useCached) { bool Ascii::ReadHeader(bool useCached)
{
// try to read the header line... // try to read the header line...
string line; string line;
map<string, uint32_t> ifields; map<string, uint32_t> ifields;
if ( !useCached ) { if ( !useCached )
if ( !GetLine(line) ) { {
if ( !GetLine(line) )
{
Error("could not read first line"); Error("could not read first line");
return false; return false;
} }
headerline = line; headerline = line;
} else {
line = headerline;
} }
else
line = headerline;
// construct list of field names. // construct list of field names.
istringstream splitstream(line); istringstream splitstream(line);
int pos=0; int pos=0;
while ( splitstream ) { while ( splitstream )
{
string s; string s;
if ( !getline(splitstream, s, separator[0])) if ( !getline(splitstream, s, separator[0]))
break; break;
@ -154,12 +162,15 @@ bool Ascii::ReadHeader(bool useCached) {
//printf("Updating fields from description %s\n", line.c_str()); //printf("Updating fields from description %s\n", line.c_str());
columnMap.clear(); columnMap.clear();
for ( unsigned int i = 0; i < num_fields; i++ ) { for ( unsigned int i = 0; i < num_fields; i++ )
{
const Field* field = fields[i]; const Field* field = fields[i];
map<string, uint32_t>::iterator fit = ifields.find(field->name); map<string, uint32_t>::iterator fit = ifields.find(field->name);
if ( fit == ifields.end() ) { if ( fit == ifields.end() )
if ( field->optional ) { {
if ( field->optional )
{
// we do not really need this field. mark it as not present and always send an undef back. // we do not really need this field. mark it as not present and always send an undef back.
FieldMapping f(field->name, field->type, field->subtype, -1); FieldMapping f(field->name, field->type, field->subtype, -1);
f.present = false; f.present = false;
@ -173,9 +184,11 @@ bool Ascii::ReadHeader(bool useCached) {
FieldMapping f(field->name, field->type, field->subtype, ifields[field->name]); FieldMapping f(field->name, field->type, field->subtype, ifields[field->name]);
if ( field->secondary_name != "" ) { if ( field->secondary_name != "" )
{
map<string, uint32_t>::iterator fit2 = ifields.find(field->secondary_name); map<string, uint32_t>::iterator fit2 = ifields.find(field->secondary_name);
if ( fit2 == ifields.end() ) { if ( fit2 == ifields.end() )
{
Error(Fmt("Could not find requested port type field %s in input data file.", field->secondary_name.c_str())); Error(Fmt("Could not find requested port type field %s in input data file.", field->secondary_name.c_str()));
return false; return false;
} }
@ -187,30 +200,32 @@ bool Ascii::ReadHeader(bool useCached) {
// well, that seems to have worked... // well, that seems to have worked...
return true; return true;
}
bool Ascii::GetLine(string& str) {
while ( getline(*file, str) ) {
if ( str[0] != '#' ) {
return true;
} }
if ( str.compare(0,8, "#fields\t") == 0 ) { bool Ascii::GetLine(string& str)
{
while ( getline(*file, str) )
{
if ( str[0] != '#' )
return true;
if ( str.compare(0,8, "#fields\t") == 0 )
{
str = str.substr(8); str = str.substr(8);
return true; return true;
} }
} }
return false; return false;
}
Value* Ascii::EntryToVal(string s, FieldMapping field) {
if ( s.compare(unset_field) == 0 ) { // field is not set...
return new Value(field.type, false);
} }
Value* Ascii::EntryToVal(string s, FieldMapping field)
{
if ( s.compare(unset_field) == 0 ) // field is not set...
return new Value(field.type, false);
Value* val = new Value(field.type, true); Value* val = new Value(field.type, true);
switch ( field.type ) { switch ( field.type ) {
@ -220,11 +235,12 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) {
break; break;
case TYPE_BOOL: case TYPE_BOOL:
if ( s == "T" ) { if ( s == "T" )
val->val.int_val = 1; val->val.int_val = 1;
} else if ( s == "F" ) { else if ( s == "F" )
val->val.int_val = 0; val->val.int_val = 0;
} else { else
{
Error(Fmt("Field: %s Invalid value for boolean: %s", field.name.c_str(), s.c_str())); Error(Fmt("Field: %s Invalid value for boolean: %s", field.name.c_str(), s.c_str()));
return false; return false;
} }
@ -250,7 +266,8 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) {
val->val.port_val.proto = TRANSPORT_UNKNOWN; val->val.port_val.proto = TRANSPORT_UNKNOWN;
break; break;
case TYPE_SUBNET: { case TYPE_SUBNET:
{
size_t pos = s.find("/"); size_t pos = s.find("/");
if ( pos == s.npos ) { if ( pos == s.npos ) {
Error(Fmt("Invalid value for subnet: %s", s.c_str())); Error(Fmt("Invalid value for subnet: %s", s.c_str()));
@ -261,8 +278,8 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) {
val->val.subnet_val.prefix = StringToAddr(addr); val->val.subnet_val.prefix = StringToAddr(addr);
val->val.subnet_val.length = width; val->val.subnet_val.length = width;
}
break; break;
}
case TYPE_ADDR: case TYPE_ADDR:
val->val.addr_val = StringToAddr(s); val->val.addr_val = StringToAddr(s);
@ -287,13 +304,18 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) {
Value** lvals = new Value* [length]; Value** lvals = new Value* [length];
if ( field.type == TYPE_TABLE ) { if ( field.type == TYPE_TABLE )
{
val->val.set_val.vals = lvals; val->val.set_val.vals = lvals;
val->val.set_val.size = length; val->val.set_val.size = length;
} else if ( field.type == TYPE_VECTOR ) { }
else if ( field.type == TYPE_VECTOR )
{
val->val.vector_val.vals = lvals; val->val.vector_val.vals = lvals;
val->val.vector_val.size = length; val->val.vector_val.size = length;
} else { }
else
{
assert(false); assert(false);
} }
@ -301,30 +323,34 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) {
break; //empty break; //empty
istringstream splitstream(s); istringstream splitstream(s);
while ( splitstream ) { while ( splitstream )
{
string element; string element;
if ( !getline(splitstream, element, set_separator[0]) ) if ( !getline(splitstream, element, set_separator[0]) )
break; break;
if ( pos >= length ) { if ( pos >= length )
Error(Fmt("Internal error while parsing set. pos %d >= length %d. Element: %s", pos, length, element.c_str())); {
Error(Fmt("Internal error while parsing set. pos %d >= length %d."
" Element: %s", pos, length, element.c_str()));
break; break;
} }
Value* newval = EntryToVal(element, field.subType()); Value* newval = EntryToVal(element, field.subType());
if ( newval == 0 ) { if ( newval == 0 )
{
Error("Error while reading set"); Error("Error while reading set");
return 0; return 0;
} }
lvals[pos] = newval; lvals[pos] = newval;
pos++; pos++;
} }
if ( pos != length ) { if ( pos != length )
{
Error("Internal error while parsing set: did not find all elements"); Error("Internal error while parsing set: did not find all elements");
return 0; return 0;
} }
@ -340,24 +366,23 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) {
} }
return val; return val;
}
}
// read the entire file and send appropriate thingies back to InputMgr // read the entire file and send appropriate thingies back to InputMgr
bool Ascii::DoUpdate() { bool Ascii::DoUpdate()
{
switch ( mode ) { switch ( mode ) {
case REREAD: case REREAD:
// check if the file has changed // check if the file has changed
struct stat sb; struct stat sb;
if ( stat(fname.c_str(), &sb) == -1 ) { if ( stat(fname.c_str(), &sb) == -1 )
{
Error(Fmt("Could not get stat for %s", fname.c_str())); Error(Fmt("Could not get stat for %s", fname.c_str()));
return false; return false;
} }
if ( sb.st_mtime <= mtime ) { if ( sb.st_mtime <= mtime ) // no change
// no change
return true; return true;
}
mtime = sb.st_mtime; mtime = sb.st_mtime;
// file changed. reread. // file changed. reread.
@ -366,26 +391,30 @@ bool Ascii::DoUpdate() {
case MANUAL: case MANUAL:
case STREAM: case STREAM:
// dirty, fix me. (well, apparently after trying seeking, etc - this is not that bad) // dirty, fix me. (well, apparently after trying seeking, etc
if ( file && file->is_open() ) { // - this is not that bad)
if ( mode == STREAM ) { if ( file && file->is_open() )
{
if ( mode == STREAM )
{
file->clear(); // remove end of file evil bits file->clear(); // remove end of file evil bits
if ( !ReadHeader(true) ) // in case filters changed if ( !ReadHeader(true) ) // in case filters changed
{
return false; // header reading failed return false; // header reading failed
}
break; break;
} }
file->close(); file->close();
} }
file = new ifstream(fname.c_str()); file = new ifstream(fname.c_str());
if ( !file->is_open() ) { if ( !file->is_open() )
{
Error(Fmt("cannot open %s", fname.c_str())); Error(Fmt("cannot open %s", fname.c_str()));
return false; return false;
} }
if ( ReadHeader(false) == false ) { if ( ReadHeader(false) == false )
{
return false; return false;
} }
@ -395,21 +424,16 @@ bool Ascii::DoUpdate() {
} }
//
// file->seekg(0, ios::beg); // do not forget clear.
string line; string line;
while ( GetLine(line ) ) { while ( GetLine(line ) )
{
// split on tabs // split on tabs
istringstream splitstream(line); istringstream splitstream(line);
map<int, string> stringfields; map<int, string> stringfields;
int pos = 0; int pos = 0;
while ( splitstream ) { while ( splitstream )
{
string s; string s;
if ( !getline(splitstream, s, separator[0]) ) if ( !getline(splitstream, s, separator[0]) )
break; break;
@ -426,9 +450,11 @@ bool Ascii::DoUpdate() {
int fpos = 0; int fpos = 0;
for ( vector<FieldMapping>::iterator fit = columnMap.begin(); for ( vector<FieldMapping>::iterator fit = columnMap.begin();
fit != columnMap.end(); fit != columnMap.end();
fit++ ){ fit++ )
{
if ( ! fit->present ) { if ( ! fit->present )
{
// add non-present field // add non-present field
fields[fpos] = new Value((*fit).type, false); fields[fpos] = new Value((*fit).type, false);
fpos++; fpos++;
@ -437,18 +463,21 @@ bool Ascii::DoUpdate() {
assert(fit->position >= 0 ); assert(fit->position >= 0 );
if ( (*fit).position > pos || (*fit).secondary_position > pos ) { if ( (*fit).position > pos || (*fit).secondary_position > pos )
{
Error(Fmt("Not enough fields in line %s. Found %d fields, want positions %d and %d", line.c_str(), pos, (*fit).position, (*fit).secondary_position)); Error(Fmt("Not enough fields in line %s. Found %d fields, want positions %d and %d", line.c_str(), pos, (*fit).position, (*fit).secondary_position));
return false; return false;
} }
Value* val = EntryToVal(stringfields[(*fit).position], *fit); Value* val = EntryToVal(stringfields[(*fit).position], *fit);
if ( val == 0 ) { if ( val == 0 )
{
Error("Could not convert String value to Val"); Error("Could not convert String value to Val");
return false; return false;
} }
if ( (*fit).secondary_position != -1 ) { if ( (*fit).secondary_position != -1 )
{
// we have a port definition :) // we have a port definition :)
assert(val->type == TYPE_PORT ); assert(val->type == TYPE_PORT );
// Error(Fmt("Got type %d != PORT with secondary position!", val->type)); // Error(Fmt("Got type %d != PORT with secondary position!", val->type));
@ -464,31 +493,17 @@ bool Ascii::DoUpdate() {
//printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields); //printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields);
assert ( (unsigned int) fpos == num_fields ); assert ( (unsigned int) fpos == num_fields );
if ( mode == STREAM ) { if ( mode == STREAM )
Put(fields); Put(fields);
} else { else
SendEntry(fields); SendEntry(fields);
} }
/* Do not do this, ownership changes to other thread if ( mode != STREAM )
* for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) {
delete fields[i];
}
delete [] fields;
*/
}
//file->clear(); // remove end of file evil bits
//file->seekg(0, ios::beg); // and seek to start.
if ( mode != STREAM ) {
EndCurrentSend(); EndCurrentSend();
}
return true; return true;
} }
bool Ascii::DoHeartbeat(double network_time, double current_time) bool Ascii::DoHeartbeat(double network_time, double current_time)
{ {
@ -500,12 +515,13 @@ bool Ascii::DoHeartbeat(double network_time, double current_time)
break; break;
case REREAD: case REREAD:
case STREAM: case STREAM:
Update(); // call update and not DoUpdate, because update actually checks disabled. Update(); // call update and not DoUpdate, because update
// checks disabled.
break; break;
default: default:
assert(false); assert(false);
} }
return true; return true;
} }

View file

@ -22,7 +22,7 @@ using threading::Field;
Benchmark::Benchmark(ReaderFrontend *frontend) : ReaderBackend(frontend) Benchmark::Benchmark(ReaderFrontend *frontend) : ReaderBackend(frontend)
{ {
multiplication_factor = double(BifConst::InputBenchmark::factor); multiplication_factor = double(BifConst::InputBenchmark::factor);
autospread = double(BifConst::InputBenchmark::autospread); autospread = double(BifConst::InputBenchmark::autospread);
spread = int(BifConst::InputBenchmark::spread); spread = int(BifConst::InputBenchmark::spread);
@ -32,19 +32,19 @@ Benchmark::Benchmark(ReaderFrontend *frontend) : ReaderBackend(frontend)
timedspread = double(BifConst::InputBenchmark::timedspread); timedspread = double(BifConst::InputBenchmark::timedspread);
heart_beat_interval = double(BifConst::Threading::heart_beat_interval); heart_beat_interval = double(BifConst::Threading::heart_beat_interval);
} }
Benchmark::~Benchmark() Benchmark::~Benchmark()
{ {
DoClose(); DoClose();
} }
void Benchmark::DoClose() void Benchmark::DoClose()
{ {
} }
bool Benchmark::DoInit(string path, int arg_mode, int arg_num_fields, const Field* const* arg_fields) bool Benchmark::DoInit(string path, int arg_mode, int arg_num_fields, const Field* const* arg_fields)
{ {
mode = arg_mode; mode = arg_mode;
num_fields = arg_num_fields; num_fields = arg_num_fields;
@ -54,7 +54,8 @@ bool Benchmark::DoInit(string path, int arg_mode, int arg_num_fields, const Fiel
if ( autospread != 0.0 ) if ( autospread != 0.0 )
autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) ); autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) );
if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) { if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) )
{
Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str())); Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str()));
return false; return false;
} }
@ -63,9 +64,10 @@ bool Benchmark::DoInit(string path, int arg_mode, int arg_num_fields, const Fiel
DoUpdate(); DoUpdate();
return true; return true;
} }
string Benchmark::RandomString(const int len) { string Benchmark::RandomString(const int len)
{
string s(len, ' '); string s(len, ' ');
static const char values[] = static const char values[] =
@ -73,38 +75,39 @@ string Benchmark::RandomString(const int len) {
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"; "abcdefghijklmnopqrstuvwxyz";
for (int i = 0; i < len; ++i) { for (int i = 0; i < len; ++i)
s[i] = values[rand() / (RAND_MAX / sizeof(values))]; s[i] = values[rand() / (RAND_MAX / sizeof(values))];
}
return s; return s;
} }
double Benchmark::CurrTime() { double Benchmark::CurrTime()
{
struct timeval tv; struct timeval tv;
assert ( gettimeofday(&tv, 0) >= 0 ); assert ( gettimeofday(&tv, 0) >= 0 );
return double(tv.tv_sec) + double(tv.tv_usec) / 1e6; return double(tv.tv_sec) + double(tv.tv_usec) / 1e6;
} }
// read the entire file and send appropriate thingies back to InputMgr // read the entire file and send appropriate thingies back to InputMgr
bool Benchmark::DoUpdate() { bool Benchmark::DoUpdate()
{
int linestosend = num_lines * heart_beat_interval; int linestosend = num_lines * heart_beat_interval;
for ( int i = 0; i < linestosend; i++ ) { for ( int i = 0; i < linestosend; i++ )
{
Value** field = new Value*[num_fields]; Value** field = new Value*[num_fields];
for (unsigned int j = 0; j < num_fields; j++ ) { for (unsigned int j = 0; j < num_fields; j++ )
field[j] = EntryToVal(fields[j]->type, fields[j]->subtype); field[j] = EntryToVal(fields[j]->type, fields[j]->subtype);
}
if ( mode == STREAM ) { if ( mode == STREAM )
// do not do tracking, spread out elements over the second that we have... // do not do tracking, spread out elements over the second that we have...
Put(field); Put(field);
} else { else
SendEntry(field); SendEntry(field);
}
if ( stopspreadat == 0 || num_lines < stopspreadat ) { if ( stopspreadat == 0 || num_lines < stopspreadat )
{
if ( spread != 0 ) if ( spread != 0 )
usleep(spread); usleep(spread);
@ -112,26 +115,25 @@ bool Benchmark::DoUpdate() {
usleep( autospread_time ); usleep( autospread_time );
} }
if ( timedspread != 0.0 ) { if ( timedspread != 0.0 )
{
double diff; double diff;
do { do
diff = CurrTime() - heartbeatstarttime; diff = CurrTime() - heartbeatstarttime;
//printf("%d %f\n", i, diff); while ( diff/heart_beat_interval < i/(linestosend
//} while ( diff < i/threading::Manager::HEART_BEAT_INTERVAL*(num_lines + (num_lines * timedspread) ) ); + (linestosend * timedspread) ) );
} while ( diff/heart_beat_interval < i/(linestosend + (linestosend * timedspread) ) );
//} while ( diff < 0.8);
} }
} }
if ( mode != STREAM ) { if ( mode != STREAM )
EndCurrentSend(); EndCurrentSend();
}
return true; return true;
} }
threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype) { threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype)
{
Value* val = new Value(type, true); Value* val = new Value(type, true);
// basically construct something random from the fields that we want. // basically construct something random from the fields that we want.
@ -170,7 +172,8 @@ threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype) {
val->val.port_val.proto = TRANSPORT_UNKNOWN; val->val.port_val.proto = TRANSPORT_UNKNOWN;
break; break;
case TYPE_SUBNET: { case TYPE_SUBNET:
{
val->val.subnet_val.prefix = StringToAddr("192.168.17.1"); val->val.subnet_val.prefix = StringToAddr("192.168.17.1");
val->val.subnet_val.length = 16; val->val.subnet_val.length = 16;
} }
@ -192,23 +195,27 @@ threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype) {
Value** lvals = new Value* [length]; Value** lvals = new Value* [length];
if ( type == TYPE_TABLE ) { if ( type == TYPE_TABLE )
{
val->val.set_val.vals = lvals; val->val.set_val.vals = lvals;
val->val.set_val.size = length; val->val.set_val.size = length;
} else if ( type == TYPE_VECTOR ) { }
else if ( type == TYPE_VECTOR )
{
val->val.vector_val.vals = lvals; val->val.vector_val.vals = lvals;
val->val.vector_val.size = length; val->val.vector_val.size = length;
} else {
assert(false);
} }
else
assert(false);
if ( length == 0 ) if ( length == 0 )
break; //empty break; //empty
for ( unsigned int pos = 0; pos < length; pos++ ) { for ( unsigned int pos = 0; pos < length; pos++ )
{
Value* newval = EntryToVal(subtype, TYPE_ENUM); Value* newval = EntryToVal(subtype, TYPE_ENUM);
if ( newval == 0 ) { if ( newval == 0 )
{
Error("Error while reading set"); Error("Error while reading set");
return 0; return 0;
} }
@ -226,20 +233,11 @@ threading::Value* Benchmark::EntryToVal(TypeTag type, TypeTag subtype) {
return val; return val;
} }
bool Benchmark::DoHeartbeat(double network_time, double current_time) bool Benchmark::DoHeartbeat(double network_time, double current_time)
{ {
/*
* This does not work the way I envisioned it, because the queueing is the problem.
printf("%f\n", CurrTime() - current_time);
if ( CurrTime() - current_time > 0.25 ) {
// event has hung for a time. refuse.
SendEvent("EndBenchmark", 0, 0);
return true;
} */
ReaderBackend::DoHeartbeat(network_time, current_time); ReaderBackend::DoHeartbeat(network_time, current_time);
num_lines = (int) ( (double) num_lines*multiplication_factor); num_lines = (int) ( (double) num_lines*multiplication_factor);
num_lines += add; num_lines += add;
@ -251,7 +249,8 @@ bool Benchmark::DoHeartbeat(double network_time, double current_time)
break; break;
case REREAD: case REREAD:
case STREAM: case STREAM:
if ( multiplication_factor != 1 || add != 0 ) { if ( multiplication_factor != 1 || add != 0 )
{
// we have to document at what time we changed the factor to what value. // we have to document at what time we changed the factor to what value.
Value** v = new Value*[2]; Value** v = new Value*[2];
v[0] = new Value(TYPE_COUNT, true); v[0] = new Value(TYPE_COUNT, true);
@ -262,10 +261,9 @@ bool Benchmark::DoHeartbeat(double network_time, double current_time)
SendEvent("lines_changed", 2, v); SendEvent("lines_changed", 2, v);
} }
if ( autospread != 0.0 ) { if ( autospread != 0.0 )
autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) );
// because executing this in every loop is apparently too expensive. // because executing this in every loop is apparently too expensive.
} autospread_time = (int) ( (double) 1000000 / (autospread * (double) num_lines) );
Update(); // call update and not DoUpdate, because update actually checks disabled. Update(); // call update and not DoUpdate, because update actually checks disabled.

View file

@ -24,42 +24,44 @@ using threading::Value;
using threading::Field; using threading::Field;
Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend)
{ {
file = 0; file = 0;
in = 0; in = 0;
//keyMap = new map<string, string>();
separator.assign( (const char*) BifConst::InputRaw::record_separator->Bytes(), BifConst::InputRaw::record_separator->Len()); separator.assign( (const char*) BifConst::InputRaw::record_separator->Bytes(), BifConst::InputRaw::record_separator->Len());
if ( separator.size() != 1 ) { if ( separator.size() != 1 )
Error("separator length has to be 1. Separator will be truncated."); Error("separator length has to be 1. Separator will be truncated.");
} }
}
Raw::~Raw() Raw::~Raw()
{ {
DoClose(); DoClose();
} }
void Raw::DoClose() void Raw::DoClose()
{ {
if ( file != 0 ) { if ( file != 0 )
{
Close(); Close();
} }
} }
bool Raw::Open() bool Raw::Open()
{ {
if ( execute ) { if ( execute )
{
file = popen(fname.c_str(), "r"); file = popen(fname.c_str(), "r");
if ( file == NULL ) { if ( file == NULL )
{
Error(Fmt("Could not execute command %s", fname.c_str())); Error(Fmt("Could not execute command %s", fname.c_str()));
return false; return false;
} }
} else { }
else
{
file = fopen(fname.c_str(), "r"); file = fopen(fname.c_str(), "r");
if ( file == NULL ) { if ( file == NULL )
{
Error(Fmt("Init: cannot open %s", fname.c_str())); Error(Fmt("Init: cannot open %s", fname.c_str()));
return false; return false;
} }
@ -67,24 +69,29 @@ bool Raw::Open()
in = new boost::fdistream(fileno(file)); in = new boost::fdistream(fileno(file));
if ( execute && mode == STREAM ) { if ( execute && mode == STREAM )
{
fcntl(fileno(file), F_SETFL, O_NONBLOCK); fcntl(fileno(file), F_SETFL, O_NONBLOCK);
} }
return true; return true;
} }
bool Raw::Close() bool Raw::Close()
{ {
if ( file == NULL ) { if ( file == NULL )
{
InternalError(Fmt("Trying to close closed file for stream %s", fname.c_str())); InternalError(Fmt("Trying to close closed file for stream %s", fname.c_str()));
return false; return false;
} }
if ( execute ) { if ( execute )
{
delete(in); delete(in);
pclose(file); pclose(file);
} else { }
else
{
delete(in); delete(in);
fclose(file); fclose(file);
} }
@ -93,10 +100,10 @@ bool Raw::Close()
file = NULL; file = NULL;
return true; return true;
} }
bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* const* arg_fields) bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* const* arg_fields)
{ {
fname = path; fname = path;
mode = arg_mode; mode = arg_mode;
mtime = 0; mtime = 0;
@ -107,24 +114,30 @@ bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* con
num_fields = arg_num_fields; num_fields = arg_num_fields;
fields = arg_fields; fields = arg_fields;
if ( path.length() == 0 ) { if ( path.length() == 0 )
{
Error("No source path provided"); Error("No source path provided");
return false; return false;
} }
if ( arg_num_fields != 1 ) { if ( arg_num_fields != 1 )
Error("Filter for raw reader contains more than one field. Filters for the raw reader may only contain exactly one string field. Filter ignored."); {
Error("Filter for raw reader contains more than one field. "
"Filters for the raw reader may only contain exactly one string field. "
"Filter ignored.");
return false; return false;
} }
if ( fields[0]->type != TYPE_STRING ) { if ( fields[0]->type != TYPE_STRING )
{
Error("Filter for raw reader contains a field that is not of type string."); Error("Filter for raw reader contains a field that is not of type string.");
return false; return false;
} }
// do Initialization // do Initialization
char last = path[path.length()-1]; char last = path[path.length()-1];
if ( last == '|' ) { if ( last == '|' )
{
execute = true; execute = true;
fname = path.substr(0, fname.length() - 1); fname = path.substr(0, fname.length() - 1);
@ -137,19 +150,17 @@ bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* con
} else { } else {
execute = false; execute = false;
if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) { if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) )
{
Error(Fmt("Unsupported read mode %d for source %s", mode, fname.c_str())); Error(Fmt("Unsupported read mode %d for source %s", mode, fname.c_str()));
return false; return false;
} }
result = Open(); result = Open();
} }
if ( result == false ) { if ( result == false )
return result; return result;
}
#ifdef DEBUG #ifdef DEBUG
Debug(DBG_INPUT, "Raw reader created, will perform first update"); Debug(DBG_INPUT, "Raw reader created, will perform first update");
@ -162,53 +173,58 @@ bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* con
Debug(DBG_INPUT, "First update went through"); Debug(DBG_INPUT, "First update went through");
#endif #endif
return true; return true;
}
bool Raw::GetLine(string& str) {
while ( getline(*in, str, separator[0]) ) {
return true;
} }
bool Raw::GetLine(string& str)
{
while ( getline(*in, str, separator[0]) )
return true;
return false; return false;
} }
// read the entire file and send appropriate thingies back to InputMgr // read the entire file and send appropriate thingies back to InputMgr
bool Raw::DoUpdate() { bool Raw::DoUpdate()
if ( firstrun ) { {
if ( firstrun )
firstrun = false; firstrun = false;
} else { else
{
switch ( mode ) { switch ( mode ) {
case REREAD: case REREAD:
{
// check if the file has changed // check if the file has changed
struct stat sb; struct stat sb;
if ( stat(fname.c_str(), &sb) == -1 ) { if ( stat(fname.c_str(), &sb) == -1 )
{
Error(Fmt("Could not get stat for %s", fname.c_str())); Error(Fmt("Could not get stat for %s", fname.c_str()));
return false; return false;
} }
if ( sb.st_mtime <= mtime ) { if ( sb.st_mtime <= mtime )
// no change // no change
return true; return true;
}
mtime = sb.st_mtime; mtime = sb.st_mtime;
// file changed. reread. // file changed. reread.
// fallthrough // fallthrough
}
case MANUAL: case MANUAL:
case STREAM: case STREAM:
if ( mode == STREAM && file != NULL && in != NULL ) { if ( mode == STREAM && file != NULL && in != NULL )
{
//fpurge(file); //fpurge(file);
in->clear(); // remove end of file evil bits in->clear(); // remove end of file evil bits
break; break;
} }
Close(); Close();
if ( !Open() ) { if ( !Open() )
return false; return false;
}
break; break;
default: default:
assert(false); assert(false);
@ -217,7 +233,8 @@ bool Raw::DoUpdate() {
} }
string line; string line;
while ( GetLine(line) ) { while ( GetLine(line) )
{
assert (num_fields == 1); assert (num_fields == 1);
Value** fields = new Value*[1]; Value** fields = new Value*[1];
@ -231,11 +248,11 @@ bool Raw::DoUpdate() {
} }
return true; return true;
} }
bool Raw::DoHeartbeat(double network_time, double current_time) bool Raw::DoHeartbeat(double network_time, double current_time)
{ {
ReaderBackend::DoHeartbeat(network_time, current_time); ReaderBackend::DoHeartbeat(network_time, current_time);
switch ( mode ) { switch ( mode ) {
@ -244,12 +261,12 @@ bool Raw::DoHeartbeat(double network_time, double current_time)
break; break;
case REREAD: case REREAD:
case STREAM: case STREAM:
Update(); // call update and not DoUpdate, because update actually checks disabled. Update(); // call update and not DoUpdate, because update
// checks disabled.
break; break;
default: default:
assert(false); assert(false);
} }
return true; return true;
} }