it compiles :)

But that's all, not tested, don't expect it to do anything but crash.
This commit is contained in:
Bernhard Amann 2012-02-06 17:37:02 -08:00
parent 238e9545c0
commit 8385d5bb2d
7 changed files with 258 additions and 67 deletions

View file

@ -680,7 +680,7 @@ Val* Manager::ValueToIndexVal(int num_fields, const RecordType *type, const Valu
}
void Manager::SendEntry(const ReaderFrontend* reader, int id, const Value* const *vals) {
void Manager::SendEntry(const ReaderFrontend* reader, const int id, const Value* const *vals) {
ReaderInfo *i = FindReader(reader);
if ( i == 0 ) {
reporter->InternalError("Unknown reader");
@ -703,7 +703,7 @@ void Manager::SendEntry(const ReaderFrontend* reader, int id, const Value* const
}
void Manager::SendEntryTable(const ReaderFrontend* reader, int id, const Value* const *vals) {
void Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Value* const *vals) {
ReaderInfo *i = FindReader(reader);
bool updated = false;

View file

@ -32,6 +32,13 @@ public:
protected:
friend class ReaderFrontend;
friend class ErrorMessage;
friend class PutMessage;
friend class DeleteMessage;
friend class ClearMessage;
friend class SendEventMessage;
friend class SendEntryMessage;
friend class EndCurrentSendMessage;
// Reports an error for the given reader.
void Error(ReaderFrontend* reader, const char* msg);
@ -42,11 +49,14 @@ protected:
bool Delete(const ReaderFrontend* reader, int id, const threading::Value* const *vals);
// for readers to write to input stream in indirect mode (manager is monitoring new/deleted values)
void SendEntry(const ReaderFrontend* reader, int id, const threading::Value* const *vals);
void EndCurrentSend(const ReaderFrontend* reader, int id);
void SendEntry(const ReaderFrontend* reader, const int id, const threading::Value* const *vals);
void EndCurrentSend(const ReaderFrontend* reader, const int id);
bool SendEvent(const string& name, const int num_vals, const threading::Value* const *vals);
ReaderBackend* CreateBackend(ReaderFrontend* frontend, bro_int_t type);
private:
struct ReaderInfo;
@ -60,7 +70,6 @@ private:
void SendEvent(EventHandlerPtr ev, const int numvals, ...);
void SendEvent(EventHandlerPtr ev, list<Val*> events);
bool SendEvent(const string& name, const int num_vals, const threading::Value* const *vals);
HashKey* HashValues(const int num_elements, const threading::Value* const *vals);
int GetValueLength(const threading::Value* val);

View file

@ -2,6 +2,7 @@
#include "ReaderBackend.h"
#include "ReaderFrontend.h"
#include "Manager.h"
using threading::Value;
using threading::Field;
@ -15,28 +16,106 @@ public:
message(message) {}
virtual bool Process() {
input_mgr->Error(object, message.c_str());
input_mgr->Error(Object(), message.c_str());
return true;
}
private:
string message;
}
};
class PutMessage : public threading::OutputMessage<ReaderFrontend> {
public:
PutMessage(ReaderFrontend* reader, int id, const Value* const *val)
: threading::OutputMessage<ReaderFrontend>("Error", reader),
: threading::OutputMessage<ReaderFrontend>("Put", reader),
id(id), val(val) {}
virtual bool Process() {
return input_mgr->Put(object, id, val);
input_mgr->Put(Object(), id, val);
return true;
}
private:
int id;
Value* val;
}
const Value* const *val;
};
class DeleteMessage : public threading::OutputMessage<ReaderFrontend> {
public:
DeleteMessage(ReaderFrontend* reader, int id, const Value* const *val)
: threading::OutputMessage<ReaderFrontend>("Delete", reader),
id(id), val(val) {}
virtual bool Process() {
return input_mgr->Delete(Object(), id, val);
}
private:
int id;
const Value* const *val;
};
class ClearMessage : public threading::OutputMessage<ReaderFrontend> {
public:
ClearMessage(ReaderFrontend* reader, int id)
: threading::OutputMessage<ReaderFrontend>("Clear", reader),
id(id) {}
virtual bool Process() {
input_mgr->Clear(Object(), id);
return true;
}
private:
int id;
};
class SendEventMessage : public threading::OutputMessage<ReaderFrontend> {
public:
SendEventMessage(ReaderFrontend* reader, const string& name, const int num_vals, const Value* const *val)
: threading::OutputMessage<ReaderFrontend>("SendEvent", reader),
name(name), num_vals(num_vals), val(val) {}
virtual bool Process() {
return input_mgr->SendEvent(name, num_vals, val);
}
private:
const string name;
const int num_vals;
const Value* const *val;
};
class SendEntryMessage : public threading::OutputMessage<ReaderFrontend> {
public:
SendEntryMessage(ReaderFrontend* reader, const int id, const Value* const *val)
: threading::OutputMessage<ReaderFrontend>("SendEntry", reader),
id(id), val(val) {}
virtual bool Process() {
input_mgr->SendEntry(Object(), id, val);
return true;
}
private:
const int id;
const Value* const *val;
};
class EndCurrentSendMessage : public threading::OutputMessage<ReaderFrontend> {
public:
EndCurrentSendMessage(ReaderFrontend* reader, int id)
: threading::OutputMessage<ReaderFrontend>("SendEntry", reader),
id(id) {}
virtual bool Process() {
input_mgr->EndCurrentSend(Object(), id);
return true;
}
private:
int id;
};
ReaderBackend::ReaderBackend(ReaderFrontend* arg_frontend) : MsgThread()
{
@ -56,37 +135,44 @@ ReaderBackend::~ReaderBackend()
void ReaderBackend::Error(const string &msg)
{
SendOut(new ErrorMessage(frontend, msg);
SendOut(new ErrorMessage(frontend, msg));
}
/*
void ReaderBackend::Error(const char *msg)
{
SendOut(new ErrorMessage(frontend, string(msg)));
} */
void ReaderBackend::Put(int id, const Value* const *val)
{
SendOut(new PutMessage(frontend, id, val);
SendOut(new PutMessage(frontend, id, val));
}
void ReaderBackend::Delete(int id, const Value* const *val)
{
SendOut(new DeleteMessage(frontend, id, val);
SendOut(new DeleteMessage(frontend, id, val));
}
void ReaderBackend::Clear(int id)
{
SendOut(new ClearMessage(frontend, id);
SendOut(new ClearMessage(frontend, id));
}
bool ReaderBackend::SendEvent(const string& name, const int num_vals, const Value* const *vals)
void ReaderBackend::SendEvent(const string& name, const int num_vals, const Value* const *vals)
{
SendOut(new SendEventMessage(frontend, name, num_vals, vals);
SendOut(new SendEventMessage(frontend, name, num_vals, vals));
}
void ReaderBackend::EndCurrentSend(int id)
{
SendOut(new EndCurrentSendMessage(frontent, id);
SendOut(new EndCurrentSendMessage(frontend, id));
}
void ReaderBackend::SendEntry(int id, const Value* const *vals)
{
SendOut(new SendEntryMessage(frontend, id, vals);
SendOut(new SendEntryMessage(frontend, id, vals));
}
bool ReaderBackend::Init(string arg_source)

View file

@ -42,7 +42,7 @@ protected:
// Reports an error to the user.
void Error(const string &msg);
void Error(const char *msg);
//void Error(const char *msg);
// The following methods return the information as passed to Init().
const string Source() const { return source; }
@ -50,7 +50,7 @@ protected:
// A thread-safe version of fmt(). (stolen from logwriter)
const char* Fmt(const char* format, ...);
bool SendEvent(const string& name, const int num_vals, const threading::Value* const *vals);
void SendEvent(const string& name, const int num_vals, const threading::Value* const *vals);
// Content-sendinf-functions (simple mode). Including table-specific stuff that simply is not used if we have no table
void Put(int id, const threading::Value* const *val);

View file

@ -1,28 +1,117 @@
// See the file "COPYING" in the main distribution directory for copyright.
#ifndef INPUT_READERFRONTEND_H
#define INPUT_READERFRONTEND_H
#include "Manager.h"
#include "ReaderFrontend.h"
#include "ReaderBackend.h"
#include "threading/MsgThread.h"
namespace logging {
class ReaderBackend;
class ReaderFrontend {
ReaderFrontend(bro_int_t type);
virtual ~ReaderFrontend();
namespace input {
protected:
friend class Manager;
class InitMessage : public threading::InputMessage<ReaderBackend>
{
public:
InitMessage(ReaderBackend* backend, const string source)
: threading::InputMessage<ReaderBackend>("Init", backend),
source(source) { }
virtual bool Process() { return Object()->Init(source); }
private:
const string source;
};
class UpdateMessage : public threading::InputMessage<ReaderBackend>
{
public:
UpdateMessage(ReaderBackend* backend)
: threading::InputMessage<ReaderBackend>("Update", backend)
{ }
virtual bool Process() { return Object()->Update(); }
};
class FinishMessage : public threading::InputMessage<ReaderBackend>
{
public:
FinishMessage(ReaderBackend* backend)
: threading::InputMessage<ReaderBackend>("Finish", backend)
{ }
virtual bool Process() { Object()->Finish(); return true; }
};
class AddFilterMessage : public threading::InputMessage<ReaderBackend>
{
public:
AddFilterMessage(ReaderBackend* backend, const int id, const int num_fields, const threading::Field* const* fields)
: threading::InputMessage<ReaderBackend>("AddFilter", backend),
id(id), num_fields(num_fields), fields(fields) { }
virtual bool Process() { return Object()->AddFilter(id, num_fields, fields); }
private:
const int id;
const int num_fields;
const threading::Field* const* fields;
};
ReaderFrontend::ReaderFrontend(bro_int_t type) {
disabled = initialized = false;
ty_name = "<not set>";
backend = input_mgr->CreateBackend(this, type);
assert(backend);
backend->Start();
}
ReaderFrontend::~ReaderFrontend() {
}
void ReaderFrontend::Init(string arg_source) {
if ( disabled )
return;
if ( initialized )
reporter->InternalError("writer initialize twice");
source = arg_source;
initialized = true;
backend->SendIn(new InitMessage(backend, arg_source));
}
void ReaderFrontend::Update() {
if ( disabled )
return;
backend->SendIn(new UpdateMessage(backend));
}
void ReaderFrontend::Finish() {
if ( disabled )
return;
backend->SendIn(new FinishMessage(backend));
}
void ReaderFrontend::AddFilter(const int id, const int arg_num_fields, const threading::Field* const* fields) {
if ( disabled )
return;
backend->SendIn(new AddFilterMessage(backend, id, arg_num_fields, fields));
}
string ReaderFrontend::Name() const
{
if ( source.size() )
return ty_name;
return ty_name + "/" + source;
}
}
#endif /* INPUT_READERFRONTEND_H */

View file

@ -3,13 +3,12 @@
#ifndef INPUT_READERFRONTEND_H
#define INPUT_READERFRONTEND_H
#include "Manager.h"
#include "threading/MsgThread.h"
#include "../threading/MsgThread.h"
#include "../threading/SerializationTypes.h"
namespace input {
class ReaderBackend;
class Manager;
class ReaderFrontend {
public:
@ -21,7 +20,7 @@ public:
void Update();
void AddFilter( int id, int arg_num_fields, const threading::Field* const* fields );
void AddFilter( const int id, const int arg_num_fields, const threading::Field* const* fields );
void Finish();
@ -33,16 +32,18 @@ public:
*/
string Name() const;
protected:
friend class Manager;
const string Source() const { return source; }
const string Source() const { return source; };
string ty_name; // Name of the backend type. Set by the manager.
private:
ReaderBackend* backend; // The backend we have instanatiated.
string source;
bool disabled; // True if disabled.
bool initialized; // True if initialized.
};

View file

@ -1,11 +1,17 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "InputReaderAscii.h"
#include "DebugLogger.h"
#include "Ascii.h"
#include "NetVar.h"
#include <fstream>
#include <sstream>
#include "../../threading/SerializationTypes.h"
using namespace input::reader;
using threading::Value;
using threading::Field;
FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, int arg_position)
: name(arg_name), type(arg_type)
{
@ -31,7 +37,7 @@ FieldMapping FieldMapping::subType() {
return FieldMapping(name, subtype, position);
}
InputReaderAscii::InputReaderAscii()
Ascii::Ascii(ReaderFrontend *frontend) : ReaderBackend(frontend)
{
file = 0;
@ -53,13 +59,13 @@ InputReaderAscii::InputReaderAscii()
}
InputReaderAscii::~InputReaderAscii()
Ascii::~Ascii()
{
DoFinish();
}
void InputReaderAscii::DoFinish()
void Ascii::DoFinish()
{
filters.empty();
if ( file != 0 ) {
@ -69,7 +75,7 @@ void InputReaderAscii::DoFinish()
}
}
bool InputReaderAscii::DoInit(string path)
bool Ascii::DoInit(string path)
{
fname = path;
@ -82,7 +88,7 @@ bool InputReaderAscii::DoInit(string path)
return true;
}
bool InputReaderAscii::DoAddFilter( int id, int arg_num_fields, const LogField* const* fields ) {
bool Ascii::DoAddFilter( int id, int arg_num_fields, const Field* const* fields ) {
if ( HasFilter(id) ) {
return false; // no, we don't want to add this a second time
}
@ -96,7 +102,7 @@ bool InputReaderAscii::DoAddFilter( int id, int arg_num_fields, const LogField*
return true;
}
bool InputReaderAscii::DoRemoveFilter ( int id ) {
bool Ascii::DoRemoveFilter ( int id ) {
if (!HasFilter(id) ) {
return false;
}
@ -107,7 +113,7 @@ bool InputReaderAscii::DoRemoveFilter ( int id ) {
}
bool InputReaderAscii::HasFilter(int id) {
bool Ascii::HasFilter(int id) {
map<int, Filter>::iterator it = filters.find(id);
if ( it == filters.end() ) {
return false;
@ -116,7 +122,7 @@ bool InputReaderAscii::HasFilter(int id) {
}
bool InputReaderAscii::ReadHeader() {
bool Ascii::ReadHeader() {
// try to read the header line...
string line;
if ( !GetLine(line) ) {
@ -142,7 +148,7 @@ bool InputReaderAscii::ReadHeader() {
for ( map<int, Filter>::iterator it = filters.begin(); it != filters.end(); it++ ) {
for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) {
const LogField* field = (*it).second.fields[i];
const Field* field = (*it).second.fields[i];
map<string, uint32_t>::iterator fit = fields.find(field->name);
if ( fit == fields.end() ) {
@ -169,7 +175,7 @@ bool InputReaderAscii::ReadHeader() {
return true;
}
bool InputReaderAscii::GetLine(string& str) {
bool Ascii::GetLine(string& str) {
while ( getline(*file, str) ) {
if ( str[0] != '#' ) {
return true;
@ -184,7 +190,7 @@ bool InputReaderAscii::GetLine(string& str) {
return false;
}
TransportProto InputReaderAscii::StringToProto(const string &proto) {
TransportProto Ascii::StringToProto(const string &proto) {
if ( proto == "unknown" ) {
return TRANSPORT_UNKNOWN;
} else if ( proto == "tcp" ) {
@ -202,12 +208,12 @@ TransportProto InputReaderAscii::StringToProto(const string &proto) {
return TRANSPORT_UNKNOWN;
}
LogVal* InputReaderAscii::EntryToVal(string s, FieldMapping field) {
Value* Ascii::EntryToVal(string s, FieldMapping field) {
LogVal* val = new LogVal(field.type, true);
Value* val = new Value(field.type, true);
if ( s.compare(unset_field) == 0 ) { // field is not set...
return new LogVal(field.type, false);
return new Value(field.type, false);
}
switch ( field.type ) {
@ -306,7 +312,7 @@ LogVal* InputReaderAscii::EntryToVal(string s, FieldMapping field) {
if ( s.compare(empty_field) == 0 )
length = 0;
LogVal** lvals = new LogVal* [length];
Value** lvals = new Value* [length];
if ( field.type == TYPE_TABLE ) {
val->val.set_val.vals = lvals;
@ -333,7 +339,7 @@ LogVal* InputReaderAscii::EntryToVal(string s, FieldMapping field) {
break;
}
LogVal* newval = EntryToVal(element, field.subType());
Value* newval = EntryToVal(element, field.subType());
if ( newval == 0 ) {
Error("Error while reading set");
return 0;
@ -365,7 +371,7 @@ LogVal* InputReaderAscii::EntryToVal(string s, FieldMapping field) {
}
// read the entire file and send appropriate thingies back to InputMgr
bool InputReaderAscii::DoUpdate() {
bool Ascii::DoUpdate() {
// dirty, fix me. (well, apparently after trying seeking, etc - this is not that bad)
if ( file && file->is_open() ) {
@ -405,7 +411,7 @@ bool InputReaderAscii::DoUpdate() {
for ( map<int, Filter>::iterator it = filters.begin(); it != filters.end(); it++ ) {
LogVal** fields = new LogVal*[(*it).second.num_fields];
Value** fields = new Value*[(*it).second.num_fields];
int fpos = 0;
for ( vector<FieldMapping>::iterator fit = (*it).second.columnMap.begin();
@ -417,7 +423,7 @@ bool InputReaderAscii::DoUpdate() {
return false;
}
LogVal* val = EntryToVal(stringfields[(*fit).position], *fit);
Value* val = EntryToVal(stringfields[(*fit).position], *fit);
if ( val == 0 ) {
return false;
}