mirror of
https://github.com/zeek/zeek.git
synced 2025-10-06 16:48:19 +00:00
completely change interface again.
compiles, not really tested. basic test works 70% of the time, coredumps in the other 30 - but was not easy to debug on a first glance (most interestingly the crash happens in the logging framework - I wonder how that works). Other tests are not adjusted to the new interface yet.
This commit is contained in:
parent
b4e6971aab
commit
57ffe1be77
14 changed files with 403 additions and 1072 deletions
|
@ -5,15 +5,15 @@ module Input;
|
||||||
|
|
||||||
export {
|
export {
|
||||||
|
|
||||||
redef enum Input::ID += { TABLE_READ, EVENT_READ };
|
|
||||||
|
|
||||||
## The default input reader used. Defaults to `READER_ASCII`.
|
## The default input reader used. Defaults to `READER_ASCII`.
|
||||||
const default_reader = READER_ASCII &redef;
|
const default_reader = READER_ASCII &redef;
|
||||||
|
|
||||||
const default_mode = MANUAL &redef;
|
const default_mode = MANUAL &redef;
|
||||||
|
|
||||||
## Stream decription type used for the `create_stream` method
|
## TableFilter description type used for the `table` method.
|
||||||
type StreamDescription: record {
|
type TableDescription: record {
|
||||||
|
## Common definitions for tables and events
|
||||||
|
|
||||||
## String that allows the reader to find the source.
|
## String that allows the reader to find the source.
|
||||||
## For `READER_ASCII`, this is the filename.
|
## For `READER_ASCII`, this is the filename.
|
||||||
source: string;
|
source: string;
|
||||||
|
@ -26,13 +26,12 @@ export {
|
||||||
|
|
||||||
## Automatically start the input stream after the first filter has been added
|
## Automatically start the input stream after the first filter has been added
|
||||||
autostart: bool &default=T;
|
autostart: bool &default=T;
|
||||||
};
|
|
||||||
|
|
||||||
## TableFilter description type used for the `add_tablefilter` method.
|
|
||||||
type TableFilter: record {
|
|
||||||
## Descriptive name. Used to remove a filter at a later time
|
## Descriptive name. Used to remove a filter at a later time
|
||||||
name: string;
|
name: string;
|
||||||
|
|
||||||
|
## Special definitions for tables
|
||||||
|
|
||||||
## Table which will contain the data read by the input framework
|
## Table which will contain the data read by the input framework
|
||||||
destination: any;
|
destination: any;
|
||||||
## Record that defines the values used as the index of the table
|
## Record that defines the values used as the index of the table
|
||||||
|
@ -55,11 +54,28 @@ export {
|
||||||
pred: function(typ: Input::Event, left: any, right: any): bool &optional;
|
pred: function(typ: Input::Event, left: any, right: any): bool &optional;
|
||||||
};
|
};
|
||||||
|
|
||||||
## EventFilter description type used for the `add_eventfilter` method.
|
## EventFilter description type used for the `event` method.
|
||||||
type EventFilter: record {
|
type EventDescription: record {
|
||||||
## Descriptive name. Used to remove a filter at a later time
|
## Common definitions for tables and events
|
||||||
name: string;
|
|
||||||
|
## String that allows the reader to find the source.
|
||||||
|
## For `READER_ASCII`, this is the filename.
|
||||||
|
source: string;
|
||||||
|
|
||||||
|
## Reader to use for this steam
|
||||||
|
reader: Reader &default=default_reader;
|
||||||
|
|
||||||
|
## Read mode to use for this stream
|
||||||
|
mode: Mode &default=default_mode;
|
||||||
|
|
||||||
|
## Automatically start the input stream after the first filter has been added
|
||||||
|
autostart: bool &default=T;
|
||||||
|
|
||||||
|
## Descriptive name. Used to remove a filter at a later time
|
||||||
|
name: string;
|
||||||
|
|
||||||
|
## Special definitions for events
|
||||||
|
|
||||||
## Record describing the fields to be retrieved from the source input.
|
## Record describing the fields to be retrieved from the source input.
|
||||||
fields: any;
|
fields: any;
|
||||||
## If want_record if false (default), the event receives each value in fields as a seperate argument.
|
## If want_record if false (default), the event receives each value in fields as a seperate argument.
|
||||||
|
@ -72,61 +88,29 @@ export {
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
#const no_filter: Filter = [$name="<not found>", $idx="", $val="", $destination=""]; # Sentinel.
|
## Create a new table input from a given source. Returns true on success.
|
||||||
|
|
||||||
## Create a new input stream from a given source. Returns true on success.
|
|
||||||
##
|
##
|
||||||
## id: `Input::ID` enum value identifying this stream
|
## description: `TableDescription` record describing the source.
|
||||||
## description: `StreamDescription` record describing the source.
|
global add_table: function(description: Input::TableDescription) : bool;
|
||||||
global create_stream: function(id: Input::ID, description: Input::StreamDescription) : bool;
|
|
||||||
|
## Create a new event input from a given source. Returns true on success.
|
||||||
## Remove a current input stream. Returns true on success.
|
|
||||||
##
|
##
|
||||||
## id: `Input::ID` enum value identifying the stream to be removed
|
## description: `TableDescription` record describing the source.
|
||||||
global remove_stream: function(id: Input::ID) : bool;
|
global add_event: function(description: Input::EventDescription) : bool;
|
||||||
|
|
||||||
|
## Remove a input stream. Returns true on success and false if the named stream was not found.
|
||||||
|
##
|
||||||
|
## id: string value identifying the stream to be removed
|
||||||
|
global remove: function(id: string) : bool;
|
||||||
|
|
||||||
## Forces the current input to be checked for changes.
|
## Forces the current input to be checked for changes.
|
||||||
|
## Returns true on success and false if the named stream was not found
|
||||||
##
|
##
|
||||||
## id: `Input::ID` enum value identifying the stream
|
## id: string value identifying the stream
|
||||||
global force_update: function(id: Input::ID) : bool;
|
global force_update: function(id: string) : bool;
|
||||||
|
|
||||||
## Adds a table filter to a specific input stream. Returns true on success.
|
|
||||||
##
|
|
||||||
## id: `Input::ID` enum value identifying the stream
|
|
||||||
## filter: the `TableFilter` record describing the filter.
|
|
||||||
global add_tablefilter: function(id: Input::ID, filter: Input::TableFilter) : bool;
|
|
||||||
|
|
||||||
## Removes a named table filter to a specific input stream. Returns true on success.
|
|
||||||
##
|
|
||||||
## id: `Input::ID` enum value identifying the stream
|
|
||||||
## name: the name of the filter to be removed.
|
|
||||||
global remove_tablefilter: function(id: Input::ID, name: string) : bool;
|
|
||||||
|
|
||||||
## Adds an event filter to a specific input stream. Returns true on success.
|
|
||||||
##
|
|
||||||
## id: `Input::ID` enum value identifying the stream
|
|
||||||
## filter: the `EventFilter` record describing the filter.
|
|
||||||
global add_eventfilter: function(id: Input::ID, filter: Input::EventFilter) : bool;
|
|
||||||
|
|
||||||
## Removes a named event filter to a specific input stream. Returns true on success.
|
|
||||||
##
|
|
||||||
## id: `Input::ID` enum value identifying the stream
|
|
||||||
## name: the name of the filter to be removed.
|
|
||||||
global remove_eventfilter: function(id: Input::ID, name: string) : bool;
|
|
||||||
#global get_filter: function(id: ID, name: string) : Filter;
|
|
||||||
|
|
||||||
## Convenience function for reading a specific input source exactly once using
|
|
||||||
## exactly one tablefilter
|
|
||||||
##
|
|
||||||
## id: `Input::ID` enum value identifying the stream
|
|
||||||
## description: `StreamDescription` record describing the source.
|
|
||||||
## filter: the `TableFilter` record describing the filter.
|
|
||||||
global read_table: function(description: Input::StreamDescription, filter: Input::TableFilter) : bool;
|
|
||||||
|
|
||||||
global read_event: function(description: Input::StreamDescription, filter: Input::EventFilter) : bool;
|
|
||||||
|
|
||||||
global update_finished: event(id: Input::ID);
|
|
||||||
|
|
||||||
|
## Event that is called, when the update of a specific source is finished
|
||||||
|
global update_finished: event(id: string);
|
||||||
}
|
}
|
||||||
|
|
||||||
@load base/input.bif
|
@load base/input.bif
|
||||||
|
@ -134,90 +118,26 @@ export {
|
||||||
|
|
||||||
module Input;
|
module Input;
|
||||||
|
|
||||||
#global filters: table[ID, string] of Filter;
|
#global streams: table[string] of Filter;
|
||||||
|
# ^ change to set containing the names
|
||||||
|
|
||||||
function create_stream(id: Input::ID, description: Input::StreamDescription) : bool
|
function add_table(description: Input::TableDescription) : bool
|
||||||
{
|
{
|
||||||
return __create_stream(id, description);
|
return __create_table_stream(description);
|
||||||
}
|
}
|
||||||
|
|
||||||
function remove_stream(id: Input::ID) : bool
|
function add_event(description: Input::EventDescription) : bool
|
||||||
|
{
|
||||||
|
return __create_event_stream(description);
|
||||||
|
}
|
||||||
|
|
||||||
|
function remove(id: string) : bool
|
||||||
{
|
{
|
||||||
return __remove_stream(id);
|
return __remove_stream(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
function force_update(id: Input::ID) : bool
|
function force_update(id: string) : bool
|
||||||
{
|
{
|
||||||
return __force_update(id);
|
return __force_update(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
function add_tablefilter(id: Input::ID, filter: Input::TableFilter) : bool
|
|
||||||
{
|
|
||||||
# filters[id, filter$name] = filter;
|
|
||||||
return __add_tablefilter(id, filter);
|
|
||||||
}
|
|
||||||
|
|
||||||
function remove_tablefilter(id: Input::ID, name: string) : bool
|
|
||||||
{
|
|
||||||
# delete filters[id, name];
|
|
||||||
return __remove_tablefilter(id, name);
|
|
||||||
}
|
|
||||||
|
|
||||||
function add_eventfilter(id: Input::ID, filter: Input::EventFilter) : bool
|
|
||||||
{
|
|
||||||
# filters[id, filter$name] = filter;
|
|
||||||
return __add_eventfilter(id, filter);
|
|
||||||
}
|
|
||||||
|
|
||||||
function remove_eventfilter(id: Input::ID, name: string) : bool
|
|
||||||
{
|
|
||||||
# delete filters[id, name];
|
|
||||||
return __remove_eventfilter(id, name);
|
|
||||||
}
|
|
||||||
|
|
||||||
function read_table(description: Input::StreamDescription, filter: Input::TableFilter) : bool {
|
|
||||||
local ok: bool = T;
|
|
||||||
# since we create and delete it ourselves this should be ok... at least for singlethreaded operation
|
|
||||||
local id: Input::ID = Input::TABLE_READ;
|
|
||||||
|
|
||||||
ok = create_stream(id, description);
|
|
||||||
if ( ok ) {
|
|
||||||
ok = add_tablefilter(id, filter);
|
|
||||||
}
|
|
||||||
if ( ok ) {
|
|
||||||
ok = remove_tablefilter(id, filter$name);
|
|
||||||
}
|
|
||||||
if ( ok ) {
|
|
||||||
ok = remove_stream(id);
|
|
||||||
} else {
|
|
||||||
remove_stream(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
return ok;
|
|
||||||
}
|
|
||||||
|
|
||||||
function read_event(description: Input::StreamDescription, filter: Input::EventFilter) : bool {
|
|
||||||
local ok: bool = T;
|
|
||||||
# since we create and delete it ourselves this should be ok... at least for singlethreaded operation
|
|
||||||
local id: Input::ID = Input::EVENT_READ;
|
|
||||||
|
|
||||||
ok = create_stream(id, description);
|
|
||||||
if ( ok ) {
|
|
||||||
ok = add_eventfilter(id, filter);
|
|
||||||
}
|
|
||||||
if ( ok ) {
|
|
||||||
ok = remove_stream(id);
|
|
||||||
} else {
|
|
||||||
remove_stream(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
return ok;
|
|
||||||
}
|
|
||||||
|
|
||||||
#function get_filter(id: ID, name: string) : Filter
|
|
||||||
# {
|
|
||||||
# if ( [id, name] in filters )
|
|
||||||
# return filters[id, name];
|
|
||||||
#
|
|
||||||
# return no_filter;
|
|
||||||
# }
|
|
||||||
|
|
|
@ -7,52 +7,33 @@ module Input;
|
||||||
#include "NetVar.h"
|
#include "NetVar.h"
|
||||||
%%}
|
%%}
|
||||||
|
|
||||||
type StreamDescription: record;
|
type TableDescription: record;
|
||||||
type TableFilter: record;
|
type EventDescription: record;
|
||||||
type EventFilter: record;
|
|
||||||
|
|
||||||
function Input::__create_stream%(id: Input::ID, description: Input::StreamDescription%) : bool
|
function Input::__create_table_stream%(description: Input::TableDescription%) : bool
|
||||||
%{
|
%{
|
||||||
input::ReaderFrontend *the_reader = input_mgr->CreateStream(id->AsEnumVal(), description->AsRecordVal());
|
bool res = input_mgr->CreateTableStream(description->AsRecordVal());
|
||||||
return new Val( the_reader != 0, TYPE_BOOL );
|
|
||||||
%}
|
|
||||||
|
|
||||||
function Input::__remove_stream%(id: Input::ID%) : bool
|
|
||||||
%{
|
|
||||||
bool res = input_mgr->RemoveStream(id->AsEnumVal());
|
|
||||||
return new Val( res, TYPE_BOOL );
|
return new Val( res, TYPE_BOOL );
|
||||||
%}
|
%}
|
||||||
|
|
||||||
function Input::__force_update%(id: Input::ID%) : bool
|
function Input::__create_event_stream%(description: Input::EventDescription%) : bool
|
||||||
%{
|
%{
|
||||||
bool res = input_mgr->ForceUpdate(id->AsEnumVal());
|
bool res = input_mgr->CreateEventStream(description->AsRecordVal());
|
||||||
return new Val( res, TYPE_BOOL );
|
return new Val( res, TYPE_BOOL );
|
||||||
%}
|
%}
|
||||||
|
|
||||||
function Input::__add_tablefilter%(id: Input::ID, filter: Input::TableFilter%) : bool
|
function Input::__remove_stream%(id: string%) : bool
|
||||||
%{
|
%{
|
||||||
bool res = input_mgr->AddTableFilter(id->AsEnumVal(), filter->AsRecordVal());
|
bool res = input_mgr->RemoveStream(id->AsString()->CheckString());
|
||||||
return new Val( res, TYPE_BOOL );
|
return new Val( res, TYPE_BOOL );
|
||||||
%}
|
%}
|
||||||
|
|
||||||
function Input::__remove_tablefilter%(id: Input::ID, name: string%) : bool
|
function Input::__force_update%(id: string%) : bool
|
||||||
%{
|
%{
|
||||||
bool res = input_mgr->RemoveTableFilter(id->AsEnumVal(), name->AsString()->CheckString());
|
bool res = input_mgr->ForceUpdate(id->AsString()->CheckString());
|
||||||
return new Val( res, TYPE_BOOL);
|
|
||||||
%}
|
|
||||||
|
|
||||||
function Input::__add_eventfilter%(id: Log::ID, filter: Input::EventFilter%) : bool
|
|
||||||
%{
|
|
||||||
bool res = input_mgr->AddEventFilter(id->AsEnumVal(), filter->AsRecordVal());
|
|
||||||
return new Val( res, TYPE_BOOL );
|
return new Val( res, TYPE_BOOL );
|
||||||
%}
|
%}
|
||||||
|
|
||||||
function Input::__remove_eventfilter%(id: Log::ID, name: string%) : bool
|
|
||||||
%{
|
|
||||||
bool res = input_mgr->RemoveEventFilter(id->AsEnumVal(), name->AsString()->CheckString());
|
|
||||||
return new Val( res, TYPE_BOOL);
|
|
||||||
%}
|
|
||||||
|
|
||||||
# Options for Ascii Reader
|
# Options for Ascii Reader
|
||||||
|
|
||||||
module InputAscii;
|
module InputAscii;
|
||||||
|
|
|
@ -31,14 +31,25 @@ declare(PDict, InputHash);
|
||||||
|
|
||||||
class Manager::Filter {
|
class Manager::Filter {
|
||||||
public:
|
public:
|
||||||
EnumVal* id;
|
|
||||||
string name;
|
string name;
|
||||||
|
string source;
|
||||||
|
|
||||||
|
int mode;
|
||||||
|
|
||||||
FilterType filter_type; // to distinguish between event and table filters
|
FilterType filter_type; // to distinguish between event and table filters
|
||||||
|
|
||||||
|
EnumVal* type;
|
||||||
|
ReaderFrontend* reader;
|
||||||
|
|
||||||
virtual ~Filter();
|
virtual ~Filter();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Manager::Filter::~Filter() {
|
||||||
|
Unref(type);
|
||||||
|
|
||||||
|
delete(reader);
|
||||||
|
}
|
||||||
|
|
||||||
class Manager::TableFilter: public Manager::Filter {
|
class Manager::TableFilter: public Manager::Filter {
|
||||||
public:
|
public:
|
||||||
|
|
||||||
|
@ -85,10 +96,6 @@ Manager::EventFilter::EventFilter() {
|
||||||
filter_type = EVENT_FILTER;
|
filter_type = EVENT_FILTER;
|
||||||
}
|
}
|
||||||
|
|
||||||
Manager::Filter::~Filter() {
|
|
||||||
Unref(id);
|
|
||||||
}
|
|
||||||
|
|
||||||
Manager::TableFilter::~TableFilter() {
|
Manager::TableFilter::~TableFilter() {
|
||||||
Unref(tab);
|
Unref(tab);
|
||||||
Unref(itype);
|
Unref(itype);
|
||||||
|
@ -99,41 +106,6 @@ Manager::TableFilter::~TableFilter() {
|
||||||
delete lastDict;
|
delete lastDict;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Manager::ReaderInfo {
|
|
||||||
EnumVal* id;
|
|
||||||
EnumVal* type;
|
|
||||||
ReaderFrontend* reader;
|
|
||||||
|
|
||||||
map<int, Manager::Filter*> filters; // filters that can prevent our actions
|
|
||||||
|
|
||||||
bool HasFilter(int id);
|
|
||||||
|
|
||||||
~ReaderInfo();
|
|
||||||
};
|
|
||||||
|
|
||||||
Manager::ReaderInfo::~ReaderInfo() {
|
|
||||||
map<int, Manager::Filter*>::iterator it = filters.begin();
|
|
||||||
|
|
||||||
while ( it != filters.end() ) {
|
|
||||||
delete (*it).second;
|
|
||||||
++it;
|
|
||||||
}
|
|
||||||
|
|
||||||
Unref(type);
|
|
||||||
Unref(id);
|
|
||||||
|
|
||||||
delete(reader);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool Manager::ReaderInfo::HasFilter(int id) {
|
|
||||||
map<int, Manager::Filter*>::iterator it = filters.find(id);
|
|
||||||
if ( it == filters.end() ) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
struct ReaderDefinition {
|
struct ReaderDefinition {
|
||||||
bro_int_t type; // the type
|
bro_int_t type; // the type
|
||||||
const char *name; // descriptive name for error messages
|
const char *name; // descriptive name for error messages
|
||||||
|
@ -203,37 +175,34 @@ ReaderBackend* Manager::CreateBackend(ReaderFrontend* frontend, bro_int_t type)
|
||||||
}
|
}
|
||||||
|
|
||||||
// create a new input reader object to be used at whomevers leisure lateron.
|
// create a new input reader object to be used at whomevers leisure lateron.
|
||||||
ReaderFrontend* Manager::CreateStream(EnumVal* id, RecordVal* description)
|
bool Manager::CreateStream(Filter* info, RecordVal* description)
|
||||||
{
|
{
|
||||||
{
|
|
||||||
ReaderInfo *i = FindReader(id);
|
|
||||||
if ( i != 0 ) {
|
|
||||||
ODesc desc;
|
|
||||||
id->Describe(&desc);
|
|
||||||
reporter->Error("Trying create already existing input stream %s", desc.Description());
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ReaderDefinition* ir = input_readers;
|
ReaderDefinition* ir = input_readers;
|
||||||
|
|
||||||
RecordType* rtype = description->Type()->AsRecordType();
|
RecordType* rtype = description->Type()->AsRecordType();
|
||||||
if ( ! same_type(rtype, BifType::Record::Input::StreamDescription, 0) )
|
if ( ! ( same_type(rtype, BifType::Record::Input::TableDescription, 0) || same_type(rtype, BifType::Record::Input::EventDescription, 0) ) )
|
||||||
{
|
{
|
||||||
ODesc desc;
|
reporter->Error("Streamdescription argument not of right type for new input stream");
|
||||||
id->Describe(&desc);
|
return false;
|
||||||
reporter->Error("Streamdescription argument not of right type for new input stream %s", desc.Description());
|
}
|
||||||
return 0;
|
|
||||||
|
Val* name_val = description->LookupWithDefault(rtype->FieldOffset("name"));
|
||||||
|
string name = name_val->AsString()->CheckString();
|
||||||
|
Unref(name_val);
|
||||||
|
|
||||||
|
{
|
||||||
|
Filter *i = FindFilter(name);
|
||||||
|
if ( i != 0 ) {
|
||||||
|
reporter->Error("Trying create already existing input stream %s", name.c_str());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
EnumVal* reader = description->LookupWithDefault(rtype->FieldOffset("reader"))->AsEnumVal();
|
EnumVal* reader = description->LookupWithDefault(rtype->FieldOffset("reader"))->AsEnumVal();
|
||||||
EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal();
|
|
||||||
Val *autostart = description->LookupWithDefault(rtype->FieldOffset("autostart"));
|
Val *autostart = description->LookupWithDefault(rtype->FieldOffset("autostart"));
|
||||||
bool do_autostart = ( autostart->InternalInt() == 1 );
|
|
||||||
Unref(autostart); // Ref'd by LookupWithDefault
|
ReaderFrontend* reader_obj = new ReaderFrontend(reader->InternalInt());
|
||||||
|
assert(reader_obj);
|
||||||
ReaderFrontend* reader_obj = new ReaderFrontend(reader->InternalInt());
|
|
||||||
assert(reader_obj);
|
|
||||||
|
|
||||||
// get the source...
|
// get the source...
|
||||||
Val* sourceval = description->LookupWithDefault(rtype->FieldOffset("source"));
|
Val* sourceval = description->LookupWithDefault(rtype->FieldOffset("source"));
|
||||||
|
@ -241,42 +210,45 @@ ReaderFrontend* Manager::CreateStream(EnumVal* id, RecordVal* description)
|
||||||
const BroString* bsource = sourceval->AsString();
|
const BroString* bsource = sourceval->AsString();
|
||||||
string source((const char*) bsource->Bytes(), bsource->Len());
|
string source((const char*) bsource->Bytes(), bsource->Len());
|
||||||
Unref(sourceval);
|
Unref(sourceval);
|
||||||
|
|
||||||
|
EnumVal* mode = description->LookupWithDefault(rtype->FieldOffset("mode"))->AsEnumVal();
|
||||||
|
info->mode = mode->InternalInt();
|
||||||
|
Unref(mode);
|
||||||
|
|
||||||
ReaderInfo* info = new ReaderInfo;
|
|
||||||
info->reader = reader_obj;
|
info->reader = reader_obj;
|
||||||
info->type = reader->AsEnumVal(); // ref'd by lookupwithdefault
|
info->type = reader->AsEnumVal(); // ref'd by lookupwithdefault
|
||||||
info->id = id->Ref()->AsEnumVal();
|
info->name = name;
|
||||||
|
info->source = source;
|
||||||
|
|
||||||
readers.push_back(info);
|
|
||||||
|
|
||||||
reader_obj->Init(source, mode->InternalInt(), do_autostart);
|
|
||||||
|
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
ODesc desc;
|
|
||||||
id->Describe(&desc);
|
|
||||||
DBG_LOG(DBG_INPUT, "Successfully created new input stream %s",
|
DBG_LOG(DBG_INPUT, "Successfully created new input stream %s",
|
||||||
desc.Description());
|
name.c_str());
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
return reader_obj;
|
return true;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Manager::AddEventFilter(EnumVal *id, RecordVal* fval) {
|
bool Manager::CreateEventStream(RecordVal* fval) {
|
||||||
ReaderInfo *i = FindReader(id);
|
|
||||||
if ( i == 0 ) {
|
|
||||||
reporter->Error("Stream not found");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
RecordType* rtype = fval->Type()->AsRecordType();
|
RecordType* rtype = fval->Type()->AsRecordType();
|
||||||
if ( ! same_type(rtype, BifType::Record::Input::EventFilter, 0) )
|
if ( ! same_type(rtype, BifType::Record::Input::EventDescription, 0) )
|
||||||
{
|
{
|
||||||
reporter->Error("filter argument not of right type");
|
reporter->Error("filter argument not of right type");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
EventFilter* filter = new EventFilter();
|
||||||
|
{
|
||||||
|
bool res = CreateStream(filter, fval);
|
||||||
|
if ( res == false ) {
|
||||||
|
delete filter;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
Val* name = fval->LookupWithDefault(rtype->FieldOffset("name"));
|
|
||||||
RecordType *fields = fval->LookupWithDefault(rtype->FieldOffset("fields"))->AsType()->AsTypeType()->Type()->AsRecordType();
|
RecordType *fields = fval->LookupWithDefault(rtype->FieldOffset("fields"))->AsType()->AsTypeType()->Type()->AsRecordType();
|
||||||
|
|
||||||
Val *want_record = fval->LookupWithDefault(rtype->FieldOffset("want_record"));
|
Val *want_record = fval->LookupWithDefault(rtype->FieldOffset("want_record"));
|
||||||
|
@ -352,42 +324,36 @@ bool Manager::AddEventFilter(EnumVal *id, RecordVal* fval) {
|
||||||
}
|
}
|
||||||
|
|
||||||
Unref(fields); // ref'd by lookupwithdefault
|
Unref(fields); // ref'd by lookupwithdefault
|
||||||
EventFilter* filter = new EventFilter();
|
|
||||||
filter->name = name->AsString()->CheckString();
|
|
||||||
Unref(name); // ref'd by lookupwithdefault
|
|
||||||
filter->id = id->Ref()->AsEnumVal();
|
|
||||||
filter->num_fields = fieldsV.size();
|
filter->num_fields = fieldsV.size();
|
||||||
filter->fields = fields->Ref()->AsRecordType();
|
filter->fields = fields->Ref()->AsRecordType();
|
||||||
filter->event = event_registry->Lookup(event->GetID()->Name());
|
filter->event = event_registry->Lookup(event->GetID()->Name());
|
||||||
filter->want_record = ( want_record->InternalInt() == 1 );
|
filter->want_record = ( want_record->InternalInt() == 1 );
|
||||||
Unref(want_record); // ref'd by lookupwithdefault
|
Unref(want_record); // ref'd by lookupwithdefault
|
||||||
|
|
||||||
int filterid = 0;
|
assert(filter->reader);
|
||||||
if ( i->filters.size() > 0 ) {
|
filter->reader->Init(filter->source, filter->mode, filter->num_fields, logf );
|
||||||
filterid = i->filters.rbegin()->first + 1; // largest element is at beginning of map-> new id = old id + 1->
|
|
||||||
}
|
|
||||||
i->filters[filterid] = filter;
|
|
||||||
i->reader->AddFilter( filterid, fieldsV.size(), logf );
|
|
||||||
|
|
||||||
|
readers[filter->reader] = filter;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Manager::AddTableFilter(EnumVal *id, RecordVal* fval) {
|
bool Manager::CreateTableStream(RecordVal* fval) {
|
||||||
ReaderInfo *i = FindReader(id);
|
|
||||||
if ( i == 0 ) {
|
|
||||||
reporter->Error("Stream not found");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
RecordType* rtype = fval->Type()->AsRecordType();
|
RecordType* rtype = fval->Type()->AsRecordType();
|
||||||
if ( ! same_type(rtype, BifType::Record::Input::TableFilter, 0) )
|
if ( ! same_type(rtype, BifType::Record::Input::TableDescription, 0) )
|
||||||
{
|
{
|
||||||
reporter->Error("filter argument not of right type");
|
reporter->Error("filter argument not of right type");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TableFilter* filter = new TableFilter();
|
||||||
|
{
|
||||||
|
bool res = CreateStream(filter, fval);
|
||||||
|
if ( res == false ) {
|
||||||
|
delete filter;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Val* name = fval->LookupWithDefault(rtype->FieldOffset("name"));
|
|
||||||
Val* pred = fval->LookupWithDefault(rtype->FieldOffset("pred"));
|
Val* pred = fval->LookupWithDefault(rtype->FieldOffset("pred"));
|
||||||
|
|
||||||
RecordType *idx = fval->LookupWithDefault(rtype->FieldOffset("idx"))->AsType()->AsTypeType()->Type()->AsRecordType();
|
RecordType *idx = fval->LookupWithDefault(rtype->FieldOffset("idx"))->AsType()->AsTypeType()->Type()->AsRecordType();
|
||||||
|
@ -493,9 +459,6 @@ bool Manager::AddTableFilter(EnumVal *id, RecordVal* fval) {
|
||||||
fields[i] = fieldsV[i];
|
fields[i] = fieldsV[i];
|
||||||
}
|
}
|
||||||
|
|
||||||
TableFilter* filter = new TableFilter();
|
|
||||||
filter->name = name->AsString()->CheckString();
|
|
||||||
filter->id = id->Ref()->AsEnumVal();
|
|
||||||
filter->pred = pred ? pred->AsFunc() : 0;
|
filter->pred = pred ? pred->AsFunc() : 0;
|
||||||
filter->num_idx_fields = idxfields;
|
filter->num_idx_fields = idxfields;
|
||||||
filter->num_val_fields = valfields;
|
filter->num_val_fields = valfields;
|
||||||
|
@ -508,25 +471,22 @@ bool Manager::AddTableFilter(EnumVal *id, RecordVal* fval) {
|
||||||
filter->want_record = ( want_record->InternalInt() == 1 );
|
filter->want_record = ( want_record->InternalInt() == 1 );
|
||||||
|
|
||||||
Unref(want_record); // ref'd by lookupwithdefault
|
Unref(want_record); // ref'd by lookupwithdefault
|
||||||
Unref(name);
|
|
||||||
Unref(pred);
|
Unref(pred);
|
||||||
|
|
||||||
if ( valfields > 1 ) {
|
if ( valfields > 1 ) {
|
||||||
assert(filter->want_record);
|
assert(filter->want_record);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
assert(filter->reader);
|
||||||
|
filter->reader->Init(filter->source, filter->mode, fieldsV.size(), fields );
|
||||||
|
|
||||||
|
readers[filter->reader] = filter;
|
||||||
|
|
||||||
int filterid = 0;
|
|
||||||
if ( i->filters.size() > 0 ) {
|
|
||||||
filterid = i->filters.rbegin()->first + 1; // largest element is at beginning of map-> new id = old id + 1->
|
|
||||||
}
|
|
||||||
i->filters[filterid] = filter;
|
|
||||||
i->reader->AddFilter( filterid, fieldsV.size(), fields );
|
|
||||||
|
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
ODesc desc;
|
DBG_LOG(DBG_INPUT, "Successfully created table stream %s",
|
||||||
id->Describe(&desc);
|
filter->name.c_str());
|
||||||
DBG_LOG(DBG_INPUT, "Successfully created new table filter %s for stream",
|
|
||||||
filter->name.c_str(), desc.Description());
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
@ -583,16 +543,8 @@ bool Manager::IsCompatibleType(BroType* t, bool atomic_only)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool Manager::RemoveStream(const EnumVal* id) {
|
bool Manager::RemoveStream(const string &name) {
|
||||||
ReaderInfo *i = 0;
|
Filter *i = FindFilter(name);
|
||||||
for ( vector<ReaderInfo *>::iterator s = readers.begin(); s != readers.end(); ++s )
|
|
||||||
{
|
|
||||||
if ( (*s)->id == id )
|
|
||||||
{
|
|
||||||
i = (*s);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( i == 0 ) {
|
if ( i == 0 ) {
|
||||||
return false; // not found
|
return false; // not found
|
||||||
|
@ -601,38 +553,29 @@ bool Manager::RemoveStream(const EnumVal* id) {
|
||||||
i->reader->Finish();
|
i->reader->Finish();
|
||||||
|
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
ODesc desc;
|
|
||||||
id->Describe(&desc);
|
|
||||||
DBG_LOG(DBG_INPUT, "Successfully queued removal of stream %s",
|
DBG_LOG(DBG_INPUT, "Successfully queued removal of stream %s",
|
||||||
desc.Description());
|
name.c_str());
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Manager::RemoveStreamContinuation(const ReaderFrontend* reader) {
|
bool Manager::RemoveStreamContinuation(ReaderFrontend* reader) {
|
||||||
ReaderInfo *i = 0;
|
Filter *i = FindFilter(reader);
|
||||||
|
|
||||||
|
if ( i == 0 ) {
|
||||||
for ( vector<ReaderInfo *>::iterator s = readers.begin(); s != readers.end(); ++s )
|
reporter->Error("Stream not found in RemoveStreamContinuation");
|
||||||
{
|
return false;
|
||||||
if ( (*s)->reader && (*s)->reader == reader )
|
|
||||||
{
|
|
||||||
i = *s;
|
|
||||||
#ifdef DEBUG
|
|
||||||
ODesc desc;
|
|
||||||
i->id->Describe(&desc);
|
|
||||||
DBG_LOG(DBG_INPUT, "Successfully executed removal of stream %s",
|
|
||||||
desc.Description());
|
|
||||||
#endif
|
|
||||||
delete(i);
|
|
||||||
readers.erase(s);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
reporter->Error("Stream not found in RemoveStreamContinuation");
|
|
||||||
return false;
|
#ifdef DEBUG
|
||||||
|
DBG_LOG(DBG_INPUT, "Successfully executed removal of stream %s",
|
||||||
|
i->name.c_str());
|
||||||
|
#endif
|
||||||
|
readers.erase(reader);
|
||||||
|
delete(i);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Manager::UnrollRecordType(vector<Field*> *fields, const RecordType *rec, const string& nameprepend) {
|
bool Manager::UnrollRecordType(vector<Field*> *fields, const RecordType *rec, const string& nameprepend) {
|
||||||
|
@ -680,132 +623,24 @@ bool Manager::UnrollRecordType(vector<Field*> *fields, const RecordType *rec, co
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Manager::ForceUpdate(const EnumVal* id)
|
bool Manager::ForceUpdate(const string &name)
|
||||||
{
|
{
|
||||||
ReaderInfo *i = FindReader(id);
|
Filter *i = FindFilter(name);
|
||||||
if ( i == 0 ) {
|
if ( i == 0 ) {
|
||||||
reporter->Error("Reader not found");
|
reporter->Error("Stream %s not found", name.c_str());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
i->reader->Update();
|
i->reader->Update();
|
||||||
|
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
ODesc desc;
|
|
||||||
id->Describe(&desc);
|
|
||||||
DBG_LOG(DBG_INPUT, "Forcing update of stream %s",
|
DBG_LOG(DBG_INPUT, "Forcing update of stream %s",
|
||||||
desc.Description());
|
name.c_str());
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
return true; // update is async :(
|
return true; // update is async :(
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Manager::RemoveTableFilter(EnumVal* id, const string &name) {
|
|
||||||
ReaderInfo *i = FindReader(id);
|
|
||||||
if ( i == 0 ) {
|
|
||||||
reporter->Error("Reader not found");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool found = false;
|
|
||||||
int filterId;
|
|
||||||
|
|
||||||
for ( map<int, Manager::Filter*>::iterator it = i->filters.begin(); it != i->filters.end(); ++it ) {
|
|
||||||
if ( (*it).second->name == name ) {
|
|
||||||
found = true;
|
|
||||||
filterId = (*it).first;
|
|
||||||
|
|
||||||
if ( (*it).second->filter_type != TABLE_FILTER ) {
|
|
||||||
reporter->Error("Trying to remove filter %s of wrong type", name.c_str());
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( !found ) {
|
|
||||||
reporter->Error("Trying to remove nonexisting filter %s", name.c_str());
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
i->reader->RemoveFilter(filterId);
|
|
||||||
|
|
||||||
#ifdef DEBUG
|
|
||||||
ODesc desc;
|
|
||||||
id->Describe(&desc);
|
|
||||||
DBG_LOG(DBG_INPUT, "Queued removal of tablefilter %s for stream %s",
|
|
||||||
name.c_str(), desc.Description());
|
|
||||||
#endif
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool Manager::RemoveFilterContinuation(const ReaderFrontend* reader, const int filterId) {
|
|
||||||
ReaderInfo *i = FindReader(reader);
|
|
||||||
if ( i == 0 ) {
|
|
||||||
reporter->Error("Reader not found");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
map<int, Manager::Filter*>::iterator it = i->filters.find(filterId);
|
|
||||||
if ( it == i->filters.end() ) {
|
|
||||||
reporter->Error("Got RemoveFilterContinuation where filter nonexistant for %d", filterId);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef DEBUG
|
|
||||||
ODesc desc;
|
|
||||||
i->id->Describe(&desc);
|
|
||||||
DBG_LOG(DBG_INPUT, "Executed removal of (table|event)-filter %s for stream %s",
|
|
||||||
(*it).second->name.c_str(), desc.Description());
|
|
||||||
#endif
|
|
||||||
|
|
||||||
delete (*it).second;
|
|
||||||
i->filters.erase(it);
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool Manager::RemoveEventFilter(EnumVal* id, const string &name) {
|
|
||||||
ReaderInfo *i = FindReader(id);
|
|
||||||
if ( i == 0 ) {
|
|
||||||
reporter->Error("Reader not found");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool found = false;
|
|
||||||
int filterId;
|
|
||||||
for ( map<int, Manager::Filter*>::iterator it = i->filters.begin(); it != i->filters.end(); ++it ) {
|
|
||||||
if ( (*it).second->name == name ) {
|
|
||||||
found = true;
|
|
||||||
filterId = (*it).first;
|
|
||||||
|
|
||||||
if ( (*it).second->filter_type != EVENT_FILTER ) {
|
|
||||||
reporter->Error("Trying to remove filter %s of wrong type", name.c_str());
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( !found ) {
|
|
||||||
reporter->Error("Trying to remove nonexisting filter %s", name.c_str());
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
i->reader->RemoveFilter(filterId);
|
|
||||||
|
|
||||||
#ifdef DEBUG
|
|
||||||
ODesc desc;
|
|
||||||
id->Describe(&desc);
|
|
||||||
DBG_LOG(DBG_INPUT, "Queued removal of eventfilter %s for stream %s",
|
|
||||||
name.c_str(), desc.Description());
|
|
||||||
#endif
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
Val* Manager::ValueToIndexVal(int num_fields, const RecordType *type, const Value* const *vals) {
|
Val* Manager::ValueToIndexVal(int num_fields, const RecordType *type, const Value* const *vals) {
|
||||||
Val* idxval;
|
Val* idxval;
|
||||||
int position = 0;
|
int position = 0;
|
||||||
|
@ -833,24 +668,19 @@ Val* Manager::ValueToIndexVal(int num_fields, const RecordType *type, const Valu
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void Manager::SendEntry(const ReaderFrontend* reader, const int id, Value* *vals) {
|
void Manager::SendEntry(ReaderFrontend* reader, Value* *vals) {
|
||||||
ReaderInfo *i = FindReader(reader);
|
Filter *i = FindFilter(reader);
|
||||||
if ( i == 0 ) {
|
if ( i == 0 ) {
|
||||||
reporter->InternalError("Unknown reader");
|
reporter->InternalError("Unknown reader in SendEntry");
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( !i->HasFilter(id) ) {
|
|
||||||
reporter->InternalError("Unknown filter");
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int readFields;
|
int readFields;
|
||||||
if ( i->filters[id]->filter_type == TABLE_FILTER ) {
|
if ( i->filter_type == TABLE_FILTER ) {
|
||||||
readFields = SendEntryTable(reader, id, vals);
|
readFields = SendEntryTable(i, vals);
|
||||||
} else if ( i->filters[id]->filter_type == EVENT_FILTER ) {
|
} else if ( i->filter_type == EVENT_FILTER ) {
|
||||||
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
|
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
|
||||||
readFields = SendEventFilterEvent(reader, type, id, vals);
|
readFields = SendEventFilterEvent(i, type, vals);
|
||||||
} else {
|
} else {
|
||||||
assert(false);
|
assert(false);
|
||||||
}
|
}
|
||||||
|
@ -861,16 +691,13 @@ void Manager::SendEntry(const ReaderFrontend* reader, const int id, Value* *vals
|
||||||
delete [] vals;
|
delete [] vals;
|
||||||
}
|
}
|
||||||
|
|
||||||
int Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Value* const *vals) {
|
int Manager::SendEntryTable(Filter* i, const Value* const *vals) {
|
||||||
ReaderInfo *i = FindReader(reader);
|
|
||||||
|
|
||||||
bool updated = false;
|
bool updated = false;
|
||||||
|
|
||||||
assert(i);
|
assert(i);
|
||||||
assert(i->HasFilter(id));
|
|
||||||
|
|
||||||
assert(i->filters[id]->filter_type == TABLE_FILTER);
|
assert(i->filter_type == TABLE_FILTER);
|
||||||
TableFilter* filter = (TableFilter*) i->filters[id];
|
TableFilter* filter = (TableFilter*) i;
|
||||||
|
|
||||||
HashKey* idxhash = HashValues(filter->num_idx_fields, vals);
|
HashKey* idxhash = HashValues(filter->num_idx_fields, vals);
|
||||||
|
|
||||||
|
@ -1006,29 +833,25 @@ int Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Va
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void Manager::EndCurrentSend(const ReaderFrontend* reader, int id) {
|
void Manager::EndCurrentSend(ReaderFrontend* reader) {
|
||||||
ReaderInfo *i = FindReader(reader);
|
Filter *i = FindFilter(reader);
|
||||||
if ( i == 0 ) {
|
if ( i == 0 ) {
|
||||||
reporter->InternalError("Unknown reader");
|
reporter->InternalError("Unknown reader in EndCurrentSend");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(i->HasFilter(id));
|
|
||||||
|
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
ODesc desc;
|
DBG_LOG(DBG_INPUT, "Got EndCurrentSend stream %s",
|
||||||
i->id->Describe(&desc);
|
i->name.c_str());
|
||||||
DBG_LOG(DBG_INPUT, "Got EndCurrentSend for filter %d and stream %s",
|
|
||||||
id, desc.Description());
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if ( i->filters[id]->filter_type == EVENT_FILTER ) {
|
if ( i->filter_type == EVENT_FILTER ) {
|
||||||
// nothing to do..
|
// nothing to do..
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(i->filters[id]->filter_type == TABLE_FILTER);
|
assert(i->filter_type == TABLE_FILTER);
|
||||||
TableFilter* filter = (TableFilter*) i->filters[id];
|
TableFilter* filter = (TableFilter*) i;
|
||||||
|
|
||||||
// lastdict contains all deleted entries and should be empty apart from that
|
// lastdict contains all deleted entries and should be empty apart from that
|
||||||
IterCookie *c = filter->lastDict->InitForIteration();
|
IterCookie *c = filter->lastDict->InitForIteration();
|
||||||
|
@ -1093,8 +916,8 @@ void Manager::EndCurrentSend(const ReaderFrontend* reader, int id) {
|
||||||
filter->currDict = new PDict(InputHash);
|
filter->currDict = new PDict(InputHash);
|
||||||
|
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
DBG_LOG(DBG_INPUT, "EndCurrentSend complete for filter %d and stream %s, queueing update_finished event",
|
DBG_LOG(DBG_INPUT, "EndCurrentSend complete for stream %s, queueing update_finished event",
|
||||||
id, desc.Description());
|
i->name.c_str());
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// Send event that the current update is indeed finished.
|
// Send event that the current update is indeed finished.
|
||||||
|
@ -1104,42 +927,40 @@ void Manager::EndCurrentSend(const ReaderFrontend* reader, int id) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
Ref(i->id);
|
SendEvent(handler, 1, new BroString(i->name));
|
||||||
SendEvent(handler, 1, i->id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Manager::Put(const ReaderFrontend* reader, int id, Value* *vals) {
|
void Manager::Put(ReaderFrontend* reader, Value* *vals) {
|
||||||
ReaderInfo *i = FindReader(reader);
|
Filter *i = FindFilter(reader);
|
||||||
if ( i == 0 ) {
|
if ( i == 0 ) {
|
||||||
reporter->InternalError("Unknown reader");
|
reporter->InternalError("Unknown reader in Put");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( !i->HasFilter(id) ) {
|
int readFields;
|
||||||
reporter->InternalError("Unknown filter");
|
if ( i->filter_type == TABLE_FILTER ) {
|
||||||
return;
|
readFields = PutTable(i, vals);
|
||||||
}
|
} else if ( i->filter_type == EVENT_FILTER ) {
|
||||||
|
|
||||||
if ( i->filters[id]->filter_type == TABLE_FILTER ) {
|
|
||||||
PutTable(reader, id, vals);
|
|
||||||
} else if ( i->filters[id]->filter_type == EVENT_FILTER ) {
|
|
||||||
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
|
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event);
|
||||||
SendEventFilterEvent(reader, type, id, vals);
|
readFields = SendEventFilterEvent(i, type, vals);
|
||||||
} else {
|
} else {
|
||||||
assert(false);
|
assert(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for ( int i = 0; i < readFields; i++ ) {
|
||||||
|
delete vals[i];
|
||||||
|
}
|
||||||
|
delete [] vals;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int Manager::SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type, int id, const Value* const *vals) {
|
int Manager::SendEventFilterEvent(Filter* i, EnumVal* type, const Value* const *vals) {
|
||||||
ReaderInfo *i = FindReader(reader);
|
|
||||||
|
|
||||||
bool updated = false;
|
bool updated = false;
|
||||||
|
|
||||||
assert(i);
|
assert(i);
|
||||||
assert(i->HasFilter(id));
|
|
||||||
|
|
||||||
assert(i->filters[id]->filter_type == EVENT_FILTER);
|
assert(i->filter_type == EVENT_FILTER);
|
||||||
EventFilter* filter = (EventFilter*) i->filters[id];
|
EventFilter* filter = (EventFilter*) i;
|
||||||
|
|
||||||
Val *val;
|
Val *val;
|
||||||
list<Val*> out_vals;
|
list<Val*> out_vals;
|
||||||
|
@ -1170,14 +991,11 @@ int Manager::SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type, i
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const *vals) {
|
int Manager::PutTable(Filter* i, const Value* const *vals) {
|
||||||
ReaderInfo *i = FindReader(reader);
|
|
||||||
|
|
||||||
assert(i);
|
assert(i);
|
||||||
assert(i->HasFilter(id));
|
|
||||||
|
|
||||||
assert(i->filters[id]->filter_type == TABLE_FILTER);
|
assert(i->filter_type == TABLE_FILTER);
|
||||||
TableFilter* filter = (TableFilter*) i->filters[id];
|
TableFilter* filter = (TableFilter*) i;
|
||||||
|
|
||||||
Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals);
|
Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals);
|
||||||
Val* valval;
|
Val* valval;
|
||||||
|
@ -1274,43 +1092,37 @@ int Manager::PutTable(const ReaderFrontend* reader, int id, const Value* const *
|
||||||
}
|
}
|
||||||
|
|
||||||
// Todo:: perhaps throw some kind of clear-event?
|
// Todo:: perhaps throw some kind of clear-event?
|
||||||
void Manager::Clear(const ReaderFrontend* reader, int id) {
|
void Manager::Clear(ReaderFrontend* reader) {
|
||||||
ReaderInfo *i = FindReader(reader);
|
Filter *i = FindFilter(reader);
|
||||||
if ( i == 0 ) {
|
if ( i == 0 ) {
|
||||||
reporter->InternalError("Unknown reader");
|
reporter->InternalError("Unknown reader in Clear");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef DEBUG
|
#ifdef DEBUG
|
||||||
ODesc desc;
|
DBG_LOG(DBG_INPUT, "Got Clear for stream %s",
|
||||||
i->id->Describe(&desc);
|
i->name.c_str());
|
||||||
DBG_LOG(DBG_INPUT, "Got Clear for filter %d and stream %s",
|
|
||||||
id, desc.Description());
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
assert(i->HasFilter(id));
|
assert(i->filter_type == TABLE_FILTER);
|
||||||
|
TableFilter* filter = (TableFilter*) i;
|
||||||
assert(i->filters[id]->filter_type == TABLE_FILTER);
|
|
||||||
TableFilter* filter = (TableFilter*) i->filters[id];
|
|
||||||
|
|
||||||
filter->tab->RemoveAll();
|
filter->tab->RemoveAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
// put interface: delete old entry from table.
|
// put interface: delete old entry from table.
|
||||||
bool Manager::Delete(const ReaderFrontend* reader, int id, Value* *vals) {
|
bool Manager::Delete(ReaderFrontend* reader, Value* *vals) {
|
||||||
ReaderInfo *i = FindReader(reader);
|
Filter *i = FindFilter(reader);
|
||||||
if ( i == 0 ) {
|
if ( i == 0 ) {
|
||||||
reporter->InternalError("Unknown reader");
|
reporter->InternalError("Unknown reader in Delete");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(i->HasFilter(id));
|
|
||||||
|
|
||||||
bool success = false;
|
bool success = false;
|
||||||
int readVals = 0;
|
int readVals = 0;
|
||||||
|
|
||||||
if ( i->filters[id]->filter_type == TABLE_FILTER ) {
|
if ( i->filter_type == TABLE_FILTER ) {
|
||||||
TableFilter* filter = (TableFilter*) i->filters[id];
|
TableFilter* filter = (TableFilter*) i;
|
||||||
Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals);
|
Val* idxval = ValueToIndexVal(filter->num_idx_fields, filter->itype, vals);
|
||||||
assert(idxval != 0);
|
assert(idxval != 0);
|
||||||
readVals = filter->num_idx_fields + filter->num_val_fields;
|
readVals = filter->num_idx_fields + filter->num_val_fields;
|
||||||
|
@ -1352,9 +1164,9 @@ bool Manager::Delete(const ReaderFrontend* reader, int id, Value* *vals) {
|
||||||
reporter->Error("Internal error while deleting values from input table");
|
reporter->Error("Internal error while deleting values from input table");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if ( i->filters[id]->filter_type == EVENT_FILTER ) {
|
} else if ( i->filter_type == EVENT_FILTER ) {
|
||||||
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
|
EnumVal *type = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event);
|
||||||
readVals = SendEventFilterEvent(reader, type, id, vals);
|
readVals = SendEventFilterEvent(i, type, vals);
|
||||||
success = true;
|
success = true;
|
||||||
} else {
|
} else {
|
||||||
assert(false);
|
assert(false);
|
||||||
|
@ -1867,29 +1679,24 @@ Val* Manager::ValueToVal(const Value* val, BroType* request_type) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
Manager::ReaderInfo* Manager::FindReader(const ReaderFrontend* reader)
|
Manager::Filter* Manager::FindFilter(const string &name)
|
||||||
{
|
{
|
||||||
for ( vector<ReaderInfo *>::iterator s = readers.begin(); s != readers.end(); ++s )
|
for ( map<ReaderFrontend*, Filter*>::iterator s = readers.begin(); s != readers.end(); ++s )
|
||||||
{
|
{
|
||||||
if ( (*s)->reader && (*s)->reader == reader )
|
if ( (*s).second->name == name )
|
||||||
{
|
{
|
||||||
return *s;
|
return (*s).second;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
Manager::ReaderInfo* Manager::FindReader(const EnumVal* id)
|
Manager::Filter* Manager::FindFilter(ReaderFrontend* reader)
|
||||||
{
|
{
|
||||||
for ( vector<ReaderInfo *>::iterator s = readers.begin(); s != readers.end(); ++s )
|
map<ReaderFrontend*, Filter*>::iterator s = readers.find(reader);
|
||||||
{
|
if ( s != readers.end() ) {
|
||||||
if ( (*s)->id && (*s)->id->AsEnum() == id->AsEnum() )
|
return s->second;
|
||||||
{
|
|
||||||
return *s;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
|
@ -11,7 +11,7 @@
|
||||||
#include "../EventHandler.h"
|
#include "../EventHandler.h"
|
||||||
#include "../RemoteSerializer.h"
|
#include "../RemoteSerializer.h"
|
||||||
|
|
||||||
#include <vector>
|
#include <map>
|
||||||
|
|
||||||
namespace input {
|
namespace input {
|
||||||
|
|
||||||
|
@ -35,6 +35,9 @@ public:
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new input stream.
|
* Creates a new input stream.
|
||||||
|
* Add a filter to an input source, which will write the data from the data source into
|
||||||
|
* a Bro table.
|
||||||
|
* Add a filter to an input source, which sends events for read input data.
|
||||||
*
|
*
|
||||||
* @param id The enum value corresponding the input stream.
|
* @param id The enum value corresponding the input stream.
|
||||||
*
|
*
|
||||||
|
@ -43,7 +46,9 @@ public:
|
||||||
* This method corresponds directly to the internal BiF defined in
|
* This method corresponds directly to the internal BiF defined in
|
||||||
* input.bif, which just forwards here.
|
* input.bif, which just forwards here.
|
||||||
*/
|
*/
|
||||||
ReaderFrontend* CreateStream(EnumVal* id, RecordVal* description);
|
bool CreateTableStream(RecordVal* description);
|
||||||
|
bool CreateEventStream(RecordVal* description);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Force update on a input stream.
|
* Force update on a input stream.
|
||||||
|
@ -57,7 +62,7 @@ public:
|
||||||
* This method corresponds directly to the internal BiF defined in
|
* This method corresponds directly to the internal BiF defined in
|
||||||
* input.bif, which just forwards here.
|
* input.bif, which just forwards here.
|
||||||
*/
|
*/
|
||||||
bool ForceUpdate(const EnumVal* id);
|
bool ForceUpdate(const string &id);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deletes an existing input stream
|
* Deletes an existing input stream
|
||||||
|
@ -67,53 +72,8 @@ public:
|
||||||
* This method corresponds directly to the internal BiF defined in
|
* This method corresponds directly to the internal BiF defined in
|
||||||
* input.bif, which just forwards here.
|
* input.bif, which just forwards here.
|
||||||
*/
|
*/
|
||||||
bool RemoveStream(const EnumVal* id);
|
bool RemoveStream(const string &id);
|
||||||
|
|
||||||
/**
|
|
||||||
* Add a filter to an input source, which will write the data from the data source into
|
|
||||||
* a Bro table.
|
|
||||||
*
|
|
||||||
* @param id The enum value corresponding the input stream.
|
|
||||||
*
|
|
||||||
* @param description A record of script type \c Input:TableFilter.
|
|
||||||
*
|
|
||||||
* This method corresponds directly to the internal BiF defined in
|
|
||||||
* input.bif, which just forwards here.
|
|
||||||
*/
|
|
||||||
bool AddTableFilter(EnumVal *id, RecordVal* filter);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Removes a tablefilter from the log stream
|
|
||||||
*
|
|
||||||
* @param id The enum value corresponding the input stream.
|
|
||||||
*
|
|
||||||
* This method corresponds directly to the internal BiF defined in
|
|
||||||
* input.bif, which just forwards here.
|
|
||||||
*/
|
|
||||||
bool RemoveTableFilter(EnumVal* id, const string &name);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Add a filter to an input source, which sends events for read input data.
|
|
||||||
*
|
|
||||||
* @param id The enum value corresponding the input stream.
|
|
||||||
*
|
|
||||||
* @param description A record of script type \c Input:EventFilter.
|
|
||||||
*
|
|
||||||
* This method corresponds directly to the internal BiF defined in
|
|
||||||
* input.bif, which just forwards here.
|
|
||||||
*/
|
|
||||||
bool AddEventFilter(EnumVal *id, RecordVal* filter);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Removes a eventfilter from the log stream
|
|
||||||
*
|
|
||||||
* @param id The enum value corresponding the input stream.
|
|
||||||
*
|
|
||||||
* This method corresponds directly to the internal BiF defined in
|
|
||||||
* input.bif, which just forwards here.
|
|
||||||
*/
|
|
||||||
bool RemoveEventFilter(EnumVal* id, const string &name);
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
friend class ReaderFrontend;
|
friend class ReaderFrontend;
|
||||||
friend class PutMessage;
|
friend class PutMessage;
|
||||||
|
@ -122,19 +82,18 @@ protected:
|
||||||
friend class SendEventMessage;
|
friend class SendEventMessage;
|
||||||
friend class SendEntryMessage;
|
friend class SendEntryMessage;
|
||||||
friend class EndCurrentSendMessage;
|
friend class EndCurrentSendMessage;
|
||||||
friend class FilterRemovedMessage;
|
|
||||||
friend class ReaderFinishedMessage;
|
friend class ReaderFinishedMessage;
|
||||||
|
|
||||||
// For readers to write to input stream in direct mode (reporting new/deleted values directly)
|
// For readers to write to input stream in direct mode (reporting new/deleted values directly)
|
||||||
// Functions take ownership of threading::Value fields
|
// Functions take ownership of threading::Value fields
|
||||||
void Put(const ReaderFrontend* reader, int id, threading::Value* *vals);
|
void Put(ReaderFrontend* reader, threading::Value* *vals);
|
||||||
void Clear(const ReaderFrontend* reader, int id);
|
void Clear(ReaderFrontend* reader);
|
||||||
bool Delete(const ReaderFrontend* reader, int id, threading::Value* *vals);
|
bool Delete(ReaderFrontend* reader, threading::Value* *vals);
|
||||||
|
|
||||||
// for readers to write to input stream in indirect mode (manager is monitoring new/deleted values)
|
// for readers to write to input stream in indirect mode (manager is monitoring new/deleted values)
|
||||||
// Functions take ownership of threading::Value fields
|
// Functions take ownership of threading::Value fields
|
||||||
void SendEntry(const ReaderFrontend* reader, const int id, threading::Value* *vals);
|
void SendEntry(ReaderFrontend* reader, threading::Value* *vals);
|
||||||
void EndCurrentSend(const ReaderFrontend* reader, const int id);
|
void EndCurrentSend(ReaderFrontend* reader);
|
||||||
|
|
||||||
// Allows readers to directly send Bro events.
|
// Allows readers to directly send Bro events.
|
||||||
// The num_vals and vals must be the same the named event expects.
|
// The num_vals and vals must be the same the named event expects.
|
||||||
|
@ -150,20 +109,23 @@ protected:
|
||||||
// Used to prevent race conditions where data for a specific filter is still in the queue when the
|
// Used to prevent race conditions where data for a specific filter is still in the queue when the
|
||||||
// RemoveFilter directive is executed by the main thread.
|
// RemoveFilter directive is executed by the main thread.
|
||||||
// This makes sure all data that has ben queued for a filter is still received.
|
// This makes sure all data that has ben queued for a filter is still received.
|
||||||
bool RemoveFilterContinuation(const ReaderFrontend* reader, const int filterId);
|
bool RemoveStreamContinuation(ReaderFrontend* reader);
|
||||||
bool RemoveStreamContinuation(const ReaderFrontend* reader);
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
struct ReaderInfo;
|
class Filter;
|
||||||
|
class TableFilter;
|
||||||
|
class EventFilter;
|
||||||
|
|
||||||
|
bool CreateStream(Filter*, RecordVal* description);
|
||||||
|
|
||||||
// SendEntry implementation for Tablefilter
|
// SendEntry implementation for Tablefilter
|
||||||
int SendEntryTable(const ReaderFrontend* reader, int id, const threading::Value* const *vals);
|
int SendEntryTable(Filter* i, const threading::Value* const *vals);
|
||||||
|
|
||||||
// Put implementation for Tablefilter
|
// Put implementation for Tablefilter
|
||||||
int PutTable(const ReaderFrontend* reader, int id, const threading::Value* const *vals);
|
int PutTable(Filter* i, const threading::Value* const *vals);
|
||||||
|
|
||||||
// SendEntry and Put implementation for Eventfilter
|
// SendEntry and Put implementation for Eventfilter
|
||||||
int SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type, int id, const threading::Value* const *vals);
|
int SendEventFilterEvent(Filter* i, EnumVal* type, const threading::Value* const *vals);
|
||||||
|
|
||||||
// Checks is a bro type can be used for data reading. The equivalend in threading cannot be used, because we have support different types
|
// Checks is a bro type can be used for data reading. The equivalend in threading cannot be used, because we have support different types
|
||||||
// from the log framework
|
// from the log framework
|
||||||
|
@ -200,16 +162,12 @@ private:
|
||||||
// Converts a Bro ListVal to a RecordVal given the record type
|
// Converts a Bro ListVal to a RecordVal given the record type
|
||||||
RecordVal* ListValToRecordVal(ListVal* list, RecordType *request_type, int* position);
|
RecordVal* ListValToRecordVal(ListVal* list, RecordType *request_type, int* position);
|
||||||
|
|
||||||
ReaderInfo* FindReader(const ReaderFrontend* reader);
|
Filter* FindFilter(const string &name);
|
||||||
ReaderInfo* FindReader(const EnumVal* id);
|
Filter* FindFilter(ReaderFrontend* reader);
|
||||||
|
|
||||||
vector<ReaderInfo*> readers;
|
|
||||||
|
|
||||||
class Filter;
|
|
||||||
class TableFilter;
|
|
||||||
class EventFilter;
|
|
||||||
|
|
||||||
enum FilterType { TABLE_FILTER, EVENT_FILTER };
|
enum FilterType { TABLE_FILTER, EVENT_FILTER };
|
||||||
|
|
||||||
|
map<ReaderFrontend*, Filter*> readers;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -11,48 +11,44 @@ namespace input {
|
||||||
|
|
||||||
class PutMessage : public threading::OutputMessage<ReaderFrontend> {
|
class PutMessage : public threading::OutputMessage<ReaderFrontend> {
|
||||||
public:
|
public:
|
||||||
PutMessage(ReaderFrontend* reader, int id, Value* *val)
|
PutMessage(ReaderFrontend* reader, Value* *val)
|
||||||
: threading::OutputMessage<ReaderFrontend>("Put", reader),
|
: threading::OutputMessage<ReaderFrontend>("Put", reader),
|
||||||
id(id), val(val) {}
|
val(val) {}
|
||||||
|
|
||||||
virtual bool Process() {
|
virtual bool Process() {
|
||||||
input_mgr->Put(Object(), id, val);
|
input_mgr->Put(Object(), val);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int id;
|
|
||||||
Value* *val;
|
Value* *val;
|
||||||
};
|
};
|
||||||
|
|
||||||
class DeleteMessage : public threading::OutputMessage<ReaderFrontend> {
|
class DeleteMessage : public threading::OutputMessage<ReaderFrontend> {
|
||||||
public:
|
public:
|
||||||
DeleteMessage(ReaderFrontend* reader, int id, Value* *val)
|
DeleteMessage(ReaderFrontend* reader, Value* *val)
|
||||||
: threading::OutputMessage<ReaderFrontend>("Delete", reader),
|
: threading::OutputMessage<ReaderFrontend>("Delete", reader),
|
||||||
id(id), val(val) {}
|
val(val) {}
|
||||||
|
|
||||||
virtual bool Process() {
|
virtual bool Process() {
|
||||||
return input_mgr->Delete(Object(), id, val);
|
return input_mgr->Delete(Object(), val);
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int id;
|
|
||||||
Value* *val;
|
Value* *val;
|
||||||
};
|
};
|
||||||
|
|
||||||
class ClearMessage : public threading::OutputMessage<ReaderFrontend> {
|
class ClearMessage : public threading::OutputMessage<ReaderFrontend> {
|
||||||
public:
|
public:
|
||||||
ClearMessage(ReaderFrontend* reader, int id)
|
ClearMessage(ReaderFrontend* reader)
|
||||||
: threading::OutputMessage<ReaderFrontend>("Clear", reader),
|
: threading::OutputMessage<ReaderFrontend>("Clear", reader) {}
|
||||||
id(id) {}
|
|
||||||
|
|
||||||
virtual bool Process() {
|
virtual bool Process() {
|
||||||
input_mgr->Clear(Object(), id);
|
input_mgr->Clear(Object());
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int id;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
class SendEventMessage : public threading::OutputMessage<ReaderFrontend> {
|
class SendEventMessage : public threading::OutputMessage<ReaderFrontend> {
|
||||||
|
@ -73,47 +69,30 @@ private:
|
||||||
|
|
||||||
class SendEntryMessage : public threading::OutputMessage<ReaderFrontend> {
|
class SendEntryMessage : public threading::OutputMessage<ReaderFrontend> {
|
||||||
public:
|
public:
|
||||||
SendEntryMessage(ReaderFrontend* reader, const int id, Value* *val)
|
SendEntryMessage(ReaderFrontend* reader, Value* *val)
|
||||||
: threading::OutputMessage<ReaderFrontend>("SendEntry", reader),
|
: threading::OutputMessage<ReaderFrontend>("SendEntry", reader),
|
||||||
id(id), val(val) { }
|
val(val) { }
|
||||||
|
|
||||||
virtual bool Process() {
|
virtual bool Process() {
|
||||||
input_mgr->SendEntry(Object(), id, val);
|
input_mgr->SendEntry(Object(), val);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const int id;
|
|
||||||
Value* *val;
|
Value* *val;
|
||||||
};
|
};
|
||||||
|
|
||||||
class EndCurrentSendMessage : public threading::OutputMessage<ReaderFrontend> {
|
class EndCurrentSendMessage : public threading::OutputMessage<ReaderFrontend> {
|
||||||
public:
|
public:
|
||||||
EndCurrentSendMessage(ReaderFrontend* reader, const int id)
|
EndCurrentSendMessage(ReaderFrontend* reader)
|
||||||
: threading::OutputMessage<ReaderFrontend>("EndCurrentSend", reader),
|
: threading::OutputMessage<ReaderFrontend>("EndCurrentSend", reader) {}
|
||||||
id(id) {}
|
|
||||||
|
|
||||||
virtual bool Process() {
|
virtual bool Process() {
|
||||||
input_mgr->EndCurrentSend(Object(), id);
|
input_mgr->EndCurrentSend(Object());
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const int id;
|
|
||||||
};
|
|
||||||
|
|
||||||
class FilterRemovedMessage : public threading::OutputMessage<ReaderFrontend> {
|
|
||||||
public:
|
|
||||||
FilterRemovedMessage(ReaderFrontend* reader, const int id)
|
|
||||||
: threading::OutputMessage<ReaderFrontend>("FilterRemoved", reader),
|
|
||||||
id(id) {}
|
|
||||||
|
|
||||||
virtual bool Process() {
|
|
||||||
return input_mgr->RemoveFilterContinuation(Object(), id);
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
const int id;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
class ReaderFinishedMessage : public threading::OutputMessage<ReaderFrontend> {
|
class ReaderFinishedMessage : public threading::OutputMessage<ReaderFrontend> {
|
||||||
|
@ -155,19 +134,19 @@ ReaderBackend::~ReaderBackend()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReaderBackend::Put(int id, Value* *val)
|
void ReaderBackend::Put(Value* *val)
|
||||||
{
|
{
|
||||||
SendOut(new PutMessage(frontend, id, val));
|
SendOut(new PutMessage(frontend, val));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReaderBackend::Delete(int id, Value* *val)
|
void ReaderBackend::Delete(Value* *val)
|
||||||
{
|
{
|
||||||
SendOut(new DeleteMessage(frontend, id, val));
|
SendOut(new DeleteMessage(frontend, val));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReaderBackend::Clear(int id)
|
void ReaderBackend::Clear()
|
||||||
{
|
{
|
||||||
SendOut(new ClearMessage(frontend, id));
|
SendOut(new ClearMessage(frontend));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReaderBackend::SendEvent(const string& name, const int num_vals, Value* *vals)
|
void ReaderBackend::SendEvent(const string& name, const int num_vals, Value* *vals)
|
||||||
|
@ -175,70 +154,32 @@ void ReaderBackend::SendEvent(const string& name, const int num_vals, Value* *va
|
||||||
SendOut(new SendEventMessage(frontend, name, num_vals, vals));
|
SendOut(new SendEventMessage(frontend, name, num_vals, vals));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReaderBackend::EndCurrentSend(int id)
|
void ReaderBackend::EndCurrentSend()
|
||||||
{
|
{
|
||||||
SendOut(new EndCurrentSendMessage(frontend, id));
|
SendOut(new EndCurrentSendMessage(frontend));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReaderBackend::SendEntry(int id, Value* *vals)
|
void ReaderBackend::SendEntry(Value* *vals)
|
||||||
{
|
{
|
||||||
SendOut(new SendEntryMessage(frontend, id, vals));
|
SendOut(new SendEntryMessage(frontend, vals));
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ReaderBackend::Init(string arg_source, int mode, bool arg_autostart)
|
bool ReaderBackend::Init(string arg_source, int mode, const int arg_num_fields, const threading::Field* const* arg_fields)
|
||||||
{
|
{
|
||||||
source = arg_source;
|
source = arg_source;
|
||||||
autostart = arg_autostart;
|
|
||||||
SetName("InputReader/"+source);
|
SetName("InputReader/"+source);
|
||||||
|
|
||||||
// disable if DoInit returns error.
|
// disable if DoInit returns error.
|
||||||
disabled = !DoInit(arg_source, mode);
|
int success = DoInit(arg_source, mode, arg_num_fields, arg_fields);
|
||||||
|
|
||||||
if ( disabled ) {
|
if ( !success ) {
|
||||||
Error("Init failed");
|
Error("Init failed");
|
||||||
DisableFrontend();
|
DisableFrontend();
|
||||||
}
|
}
|
||||||
|
|
||||||
return !disabled;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool ReaderBackend::StartReading() {
|
|
||||||
if ( disabled )
|
|
||||||
return false;
|
|
||||||
|
|
||||||
int success = DoStartReading();
|
|
||||||
|
|
||||||
if ( success == false ) {
|
|
||||||
DisableFrontend();
|
|
||||||
}
|
|
||||||
|
|
||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ReaderBackend::AddFilter(int id, int arg_num_fields,
|
|
||||||
const Field* const * arg_fields)
|
|
||||||
{
|
|
||||||
if ( disabled )
|
|
||||||
return false;
|
|
||||||
|
|
||||||
bool success = DoAddFilter(id, arg_num_fields, arg_fields);
|
|
||||||
if ( success && autostart) {
|
|
||||||
autostart = false;
|
|
||||||
return StartReading();
|
|
||||||
}
|
|
||||||
return success;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool ReaderBackend::RemoveFilter(int id)
|
|
||||||
{
|
|
||||||
if ( disabled )
|
|
||||||
return false;
|
|
||||||
|
|
||||||
bool success = DoRemoveFilter(id);
|
|
||||||
SendOut(new FilterRemovedMessage(frontend, id));
|
|
||||||
return success; // yes, I know, noone reads this.
|
|
||||||
}
|
|
||||||
|
|
||||||
void ReaderBackend::Finish()
|
void ReaderBackend::Finish()
|
||||||
{
|
{
|
||||||
DoFinish();
|
DoFinish();
|
||||||
|
|
|
@ -53,46 +53,14 @@ public:
|
||||||
*
|
*
|
||||||
* @param mode the opening mode for the input source
|
* @param mode the opening mode for the input source
|
||||||
*
|
*
|
||||||
* @param autostart automatically start the input source after the first filter has been added
|
|
||||||
*
|
|
||||||
* @return False if an error occured.
|
|
||||||
*/
|
|
||||||
bool Init(string arg_source, int mode, bool autostart);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* One-time start method of the reader.
|
|
||||||
*
|
|
||||||
* This method is called from the scripting layer, after all filters have been added.
|
|
||||||
* No data should be read before this method is called.
|
|
||||||
*
|
|
||||||
* If autostart in Init is set to true, this method is called automatically by the backend after
|
|
||||||
* the first filter has been added.
|
|
||||||
*/
|
|
||||||
bool StartReading();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Add an input filter to the input stream
|
|
||||||
*
|
|
||||||
* @param id identifier of the input stream
|
|
||||||
*
|
|
||||||
* @param arg_num_fields number of fields contained in \a fields
|
* @param arg_num_fields number of fields contained in \a fields
|
||||||
*
|
*
|
||||||
* @param fields the types and names of the fields to be retrieved from the input source
|
* @param fields the types and names of the fields to be retrieved from the input source
|
||||||
*
|
*
|
||||||
* @return False if an error occured.
|
* @return False if an error occured.
|
||||||
*/
|
*/
|
||||||
bool AddFilter( int id, int arg_num_fields, const threading::Field* const* fields );
|
bool Init(string arg_source, int mode, int arg_num_fields, const threading::Field* const* fields);
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Remove an input filter to the input stream
|
|
||||||
*
|
|
||||||
* @param id identifier of the input stream
|
|
||||||
*
|
|
||||||
* @return False if an error occured.
|
|
||||||
*/
|
|
||||||
bool RemoveFilter ( int id );
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Finishes reading from this input stream in a regular fashion. Must not be
|
* Finishes reading from this input stream in a regular fashion. Must not be
|
||||||
* called if an error has been indicated earlier. After calling this,
|
* called if an error has been indicated earlier. After calling this,
|
||||||
|
@ -131,33 +99,7 @@ protected:
|
||||||
* disabled and eventually deleted. When returning false, an
|
* disabled and eventually deleted. When returning false, an
|
||||||
* implementation should also call Error() to indicate what happened.
|
* implementation should also call Error() to indicate what happened.
|
||||||
*/
|
*/
|
||||||
virtual bool DoInit(string arg_sources, int mode) = 0;
|
virtual bool DoInit(string arg_sources, int mode, int arg_num_fields, const threading::Field* const* fields) = 0;
|
||||||
|
|
||||||
/**
|
|
||||||
* Reader-specific start method. After this function has been called, data may be read from
|
|
||||||
* the input source and be sent to the specified filters
|
|
||||||
*
|
|
||||||
* A reader implementation must override this method.
|
|
||||||
* If it returns false, it will be assumed that a fatal error has occured
|
|
||||||
* that prevents the reader from further operation; it will then be
|
|
||||||
* disabled and eventually deleted. When returning false, an implementation
|
|
||||||
* should also call Error to indicate what happened.
|
|
||||||
*/
|
|
||||||
virtual bool DoStartReading() = 0;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Reader-specific method to add a filter.
|
|
||||||
*
|
|
||||||
* A reader implementation must override this method.
|
|
||||||
*/
|
|
||||||
virtual bool DoAddFilter( int id, int arg_num_fields, const threading::Field* const* fields ) = 0;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Reader-specific method to remove a filter.
|
|
||||||
*
|
|
||||||
* A reader implementation must override this method.
|
|
||||||
*/
|
|
||||||
virtual bool DoRemoveFilter( int id ) = 0;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reader-specific method implementing input finalization at
|
* Reader-specific method implementing input finalization at
|
||||||
|
@ -209,31 +151,26 @@ protected:
|
||||||
*
|
*
|
||||||
* If the filter points to a table, the values are inserted into the table; if it points to an event, the event is raised
|
* If the filter points to a table, the values are inserted into the table; if it points to an event, the event is raised
|
||||||
*
|
*
|
||||||
* @param id the input filter id for which the values are sent
|
|
||||||
*
|
|
||||||
* @param val list of threading::Values expected by the filter
|
* @param val list of threading::Values expected by the filter
|
||||||
*/
|
*/
|
||||||
void Put(int id, threading::Value* *val);
|
void Put(threading::Value* *val);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Method allowing a reader to delete a specific value from a bro table.
|
* Method allowing a reader to delete a specific value from a bro table.
|
||||||
*
|
*
|
||||||
* If the receiving filter is an event, only a removed event is raised
|
* If the receiving filter is an event, only a removed event is raised
|
||||||
*
|
*
|
||||||
* @param id the input filter id for which the values are sent
|
|
||||||
*
|
|
||||||
* @param val list of threading::Values expected by the filter
|
* @param val list of threading::Values expected by the filter
|
||||||
*/
|
*/
|
||||||
void Delete(int id, threading::Value* *val);
|
void Delete(threading::Value* *val);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Method allowing a reader to clear a value from a bro table.
|
* Method allowing a reader to clear a value from a bro table.
|
||||||
*
|
*
|
||||||
* If the receiving filter is an event, this is ignored.
|
* If the receiving filter is an event, this is ignored.
|
||||||
*
|
*
|
||||||
* @param id the input filter id for which the values are sent
|
|
||||||
*/
|
*/
|
||||||
void Clear(int id);
|
void Clear();
|
||||||
|
|
||||||
// Content-sending-functions (tracking mode): Only changed lines are propagated.
|
// Content-sending-functions (tracking mode): Only changed lines are propagated.
|
||||||
|
|
||||||
|
@ -243,11 +180,9 @@ protected:
|
||||||
*
|
*
|
||||||
* If the filter points to a table, the values are inserted into the table; if it points to an event, the event is raised.
|
* If the filter points to a table, the values are inserted into the table; if it points to an event, the event is raised.
|
||||||
*
|
*
|
||||||
* @param id the input filter id for which the values are sent
|
|
||||||
*
|
|
||||||
* @param val list of threading::Values expected by the filter
|
* @param val list of threading::Values expected by the filter
|
||||||
*/
|
*/
|
||||||
void SendEntry(int id, threading::Value* *vals);
|
void SendEntry(threading::Value* *vals);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Method telling the manager, that the current list of entries sent by SendEntry is finished.
|
* Method telling the manager, that the current list of entries sent by SendEntry is finished.
|
||||||
|
@ -255,9 +190,8 @@ protected:
|
||||||
* For table filters, all entries that were not updated since the last EndCurrentSend will be deleted, because they are no longer
|
* For table filters, all entries that were not updated since the last EndCurrentSend will be deleted, because they are no longer
|
||||||
* present in the input source
|
* present in the input source
|
||||||
*
|
*
|
||||||
* @param id the input filter id for which the values are sent
|
|
||||||
*/
|
*/
|
||||||
void EndCurrentSend(int id);
|
void EndCurrentSend();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Triggered by regular heartbeat messages from the main thread.
|
* Triggered by regular heartbeat messages from the main thread.
|
||||||
|
|
|
@ -12,16 +12,17 @@ namespace input {
|
||||||
class InitMessage : public threading::InputMessage<ReaderBackend>
|
class InitMessage : public threading::InputMessage<ReaderBackend>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
InitMessage(ReaderBackend* backend, const string source, const int mode, const bool autostart)
|
InitMessage(ReaderBackend* backend, const string source, const int mode, const int num_fields, const threading::Field* const* fields)
|
||||||
: threading::InputMessage<ReaderBackend>("Init", backend),
|
: threading::InputMessage<ReaderBackend>("Init", backend),
|
||||||
source(source), mode(mode), autostart(autostart) { }
|
source(source), mode(mode), num_fields(num_fields), fields(fields) { }
|
||||||
|
|
||||||
virtual bool Process() { return Object()->Init(source, mode, autostart); }
|
virtual bool Process() { return Object()->Init(source, mode, num_fields, fields); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const string source;
|
const string source;
|
||||||
const int mode;
|
const int mode;
|
||||||
const bool autostart;
|
const int num_fields;
|
||||||
|
const threading::Field* const* fields;
|
||||||
};
|
};
|
||||||
|
|
||||||
class UpdateMessage : public threading::InputMessage<ReaderBackend>
|
class UpdateMessage : public threading::InputMessage<ReaderBackend>
|
||||||
|
@ -44,44 +45,6 @@ public:
|
||||||
virtual bool Process() { Object()->Finish(); return true; }
|
virtual bool Process() { Object()->Finish(); return true; }
|
||||||
};
|
};
|
||||||
|
|
||||||
class StartReadingMessage : public threading::InputMessage<ReaderBackend>
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
StartReadingMessage(ReaderBackend* backend)
|
|
||||||
: threading::InputMessage<ReaderBackend>("StartReading", backend)
|
|
||||||
{ }
|
|
||||||
|
|
||||||
virtual bool Process() { Object()->StartReading(); return true; }
|
|
||||||
};
|
|
||||||
|
|
||||||
class AddFilterMessage : public threading::InputMessage<ReaderBackend>
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
AddFilterMessage(ReaderBackend* backend, const int id, const int num_fields, const threading::Field* const* fields)
|
|
||||||
: threading::InputMessage<ReaderBackend>("AddFilter", backend),
|
|
||||||
id(id), num_fields(num_fields), fields(fields) { }
|
|
||||||
|
|
||||||
virtual bool Process() { return Object()->AddFilter(id, num_fields, fields); }
|
|
||||||
|
|
||||||
private:
|
|
||||||
const int id;
|
|
||||||
const int num_fields;
|
|
||||||
const threading::Field* const* fields;
|
|
||||||
};
|
|
||||||
|
|
||||||
class RemoveFilterMessage : public threading::InputMessage<ReaderBackend>
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
RemoveFilterMessage(ReaderBackend* backend, const int id)
|
|
||||||
: threading::InputMessage<ReaderBackend>("RemoveFilter", backend),
|
|
||||||
id(id) { }
|
|
||||||
|
|
||||||
virtual bool Process() { return Object()->RemoveFilter(id); }
|
|
||||||
|
|
||||||
private:
|
|
||||||
const int id;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
ReaderFrontend::ReaderFrontend(bro_int_t type) {
|
ReaderFrontend::ReaderFrontend(bro_int_t type) {
|
||||||
disabled = initialized = false;
|
disabled = initialized = false;
|
||||||
|
@ -95,7 +58,7 @@ ReaderFrontend::ReaderFrontend(bro_int_t type) {
|
||||||
ReaderFrontend::~ReaderFrontend() {
|
ReaderFrontend::~ReaderFrontend() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReaderFrontend::Init(string arg_source, int mode, bool autostart) {
|
void ReaderFrontend::Init(string arg_source, int mode, const int num_fields, const threading::Field* const* fields) {
|
||||||
if ( disabled )
|
if ( disabled )
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
@ -105,37 +68,33 @@ void ReaderFrontend::Init(string arg_source, int mode, bool autostart) {
|
||||||
source = arg_source;
|
source = arg_source;
|
||||||
initialized = true;
|
initialized = true;
|
||||||
|
|
||||||
backend->SendIn(new InitMessage(backend, arg_source, mode, autostart));
|
backend->SendIn(new InitMessage(backend, arg_source, mode, num_fields, fields));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReaderFrontend::Update() {
|
void ReaderFrontend::Update() {
|
||||||
if ( disabled )
|
if ( disabled )
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
if ( !initialized ) {
|
||||||
|
reporter->Error("Tried to call update on uninitialized reader");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
backend->SendIn(new UpdateMessage(backend));
|
backend->SendIn(new UpdateMessage(backend));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReaderFrontend::Finish() {
|
void ReaderFrontend::Finish() {
|
||||||
if ( disabled )
|
if ( disabled )
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
if ( !initialized ) {
|
||||||
|
reporter->Error("Tried to call finish on uninitialized reader");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
backend->SendIn(new FinishMessage(backend));
|
backend->SendIn(new FinishMessage(backend));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReaderFrontend::AddFilter(const int id, const int arg_num_fields, const threading::Field* const* fields) {
|
|
||||||
if ( disabled )
|
|
||||||
return;
|
|
||||||
|
|
||||||
backend->SendIn(new AddFilterMessage(backend, id, arg_num_fields, fields));
|
|
||||||
}
|
|
||||||
|
|
||||||
void ReaderFrontend::RemoveFilter(const int id) {
|
|
||||||
if ( disabled )
|
|
||||||
return;
|
|
||||||
|
|
||||||
backend->SendIn(new RemoveFilterMessage(backend, id));
|
|
||||||
}
|
|
||||||
|
|
||||||
string ReaderFrontend::Name() const
|
string ReaderFrontend::Name() const
|
||||||
{
|
{
|
||||||
if ( source.size() )
|
if ( source.size() )
|
||||||
|
@ -144,13 +103,5 @@ string ReaderFrontend::Name() const
|
||||||
return ty_name + "/" + source;
|
return ty_name + "/" + source;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReaderFrontend::StartReading() {
|
|
||||||
if ( disabled )
|
|
||||||
return;
|
|
||||||
|
|
||||||
backend->SendIn(new StartReadingMessage(backend));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -49,18 +49,7 @@ public:
|
||||||
* See ReaderBackend::Init() for arguments.
|
* See ReaderBackend::Init() for arguments.
|
||||||
* This method must only be called from the main thread.
|
* This method must only be called from the main thread.
|
||||||
*/
|
*/
|
||||||
void Init(string arg_source, int mode, bool autostart);
|
void Init(string arg_source, int mode, const int arg_num_fields, const threading::Field* const* fields);
|
||||||
|
|
||||||
/**
|
|
||||||
* Start the reader.
|
|
||||||
*
|
|
||||||
* This methods starts the reader, after all necessary filters have been added.
|
|
||||||
* It is not necessary to call this function, if autostart has been set.
|
|
||||||
* If autostart has been set, the reader will be initialized automatically after the first filter has been added
|
|
||||||
*
|
|
||||||
* This method must only be called from the main thread.
|
|
||||||
*/
|
|
||||||
void StartReading();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Force an update of the current input source. Actual action depends on
|
* Force an update of the current input source. Actual action depends on
|
||||||
|
@ -72,20 +61,6 @@ public:
|
||||||
*/
|
*/
|
||||||
void Update();
|
void Update();
|
||||||
|
|
||||||
/**
|
|
||||||
* Add a filter to the current input source.
|
|
||||||
*
|
|
||||||
* See ReaderBackend::AddFilter for arguments.
|
|
||||||
*
|
|
||||||
* The method takes ownership of \a fields
|
|
||||||
*/
|
|
||||||
void AddFilter( const int id, const int arg_num_fields, const threading::Field* const* fields );
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Removes a filter to the current input source.
|
|
||||||
*/
|
|
||||||
void RemoveFilter ( const int id );
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Finalizes writing to this tream.
|
* Finalizes writing to this tream.
|
||||||
*
|
*
|
||||||
|
|
|
@ -76,7 +76,6 @@ Ascii::~Ascii()
|
||||||
|
|
||||||
void Ascii::DoFinish()
|
void Ascii::DoFinish()
|
||||||
{
|
{
|
||||||
filters.empty();
|
|
||||||
if ( file != 0 ) {
|
if ( file != 0 ) {
|
||||||
file->close();
|
file->close();
|
||||||
delete(file);
|
delete(file);
|
||||||
|
@ -84,9 +83,8 @@ void Ascii::DoFinish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Ascii::DoInit(string path, int arg_mode)
|
bool Ascii::DoInit(string path, int arg_mode, int arg_num_fields, const Field* const* arg_fields)
|
||||||
{
|
{
|
||||||
started = false;
|
|
||||||
fname = path;
|
fname = path;
|
||||||
mode = arg_mode;
|
mode = arg_mode;
|
||||||
mtime = 0;
|
mtime = 0;
|
||||||
|
@ -107,17 +105,10 @@ bool Ascii::DoInit(string path, int arg_mode)
|
||||||
file->close();
|
file->close();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
num_fields = arg_num_fields;
|
||||||
|
fields = arg_fields;
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool Ascii::DoStartReading() {
|
|
||||||
if ( started == true ) {
|
|
||||||
Error("Started twice");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
started = true;
|
|
||||||
switch ( mode ) {
|
switch ( mode ) {
|
||||||
case MANUAL:
|
case MANUAL:
|
||||||
case REREAD:
|
case REREAD:
|
||||||
|
@ -131,46 +122,11 @@ bool Ascii::DoStartReading() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Ascii::DoAddFilter( int id, int arg_num_fields, const Field* const* fields ) {
|
|
||||||
if ( HasFilter(id) ) {
|
|
||||||
Error("Filter was added twice, ignoring.");
|
|
||||||
return false; // no, we don't want to add this a second time
|
|
||||||
}
|
|
||||||
|
|
||||||
Filter f;
|
|
||||||
f.num_fields = arg_num_fields;
|
|
||||||
f.fields = fields;
|
|
||||||
|
|
||||||
filters[id] = f;
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool Ascii::DoRemoveFilter ( int id ) {
|
|
||||||
if (!HasFilter(id) ) {
|
|
||||||
Error("Filter removal of nonexisting filter requested.");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
assert ( filters.erase(id) == 1 );
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
bool Ascii::HasFilter(int id) {
|
|
||||||
map<int, Filter>::iterator it = filters.find(id);
|
|
||||||
if ( it == filters.end() ) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
bool Ascii::ReadHeader(bool useCached) {
|
bool Ascii::ReadHeader(bool useCached) {
|
||||||
// try to read the header line...
|
// try to read the header line...
|
||||||
string line;
|
string line;
|
||||||
map<string, uint32_t> fields;
|
map<string, uint32_t> ifields;
|
||||||
|
|
||||||
if ( !useCached ) {
|
if ( !useCached ) {
|
||||||
if ( !GetLine(line) ) {
|
if ( !GetLine(line) ) {
|
||||||
|
@ -194,37 +150,35 @@ bool Ascii::ReadHeader(bool useCached) {
|
||||||
if ( !getline(splitstream, s, separator[0]))
|
if ( !getline(splitstream, s, separator[0]))
|
||||||
break;
|
break;
|
||||||
|
|
||||||
fields[s] = pos;
|
ifields[s] = pos;
|
||||||
pos++;
|
pos++;
|
||||||
}
|
}
|
||||||
|
|
||||||
//printf("Updating fields from description %s\n", line.c_str());
|
//printf("Updating fields from description %s\n", line.c_str());
|
||||||
for ( map<int, Filter>::iterator it = filters.begin(); it != filters.end(); it++ ) {
|
columnMap.clear();
|
||||||
(*it).second.columnMap.clear();
|
|
||||||
|
for ( unsigned int i = 0; i < num_fields; i++ ) {
|
||||||
for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) {
|
const Field* field = fields[i];
|
||||||
const Field* field = (*it).second.fields[i];
|
|
||||||
|
map<string, uint32_t>::iterator fit = ifields.find(field->name);
|
||||||
map<string, uint32_t>::iterator fit = fields.find(field->name);
|
if ( fit == ifields.end() ) {
|
||||||
if ( fit == fields.end() ) {
|
Error(Fmt("Did not find requested field %s in input data file.", field->name.c_str()));
|
||||||
Error(Fmt("Did not find requested field %s in input data file.", field->name.c_str()));
|
return false;
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
FieldMapping f(field->name, field->type, field->subtype, fields[field->name]);
|
|
||||||
if ( field->secondary_name != "" ) {
|
|
||||||
map<string, uint32_t>::iterator fit2 = fields.find(field->secondary_name);
|
|
||||||
if ( fit2 == fields.end() ) {
|
|
||||||
Error(Fmt("Could not find requested port type field %s in input data file.", field->secondary_name.c_str()));
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
f.secondary_position = fields[field->secondary_name];
|
|
||||||
}
|
|
||||||
(*it).second.columnMap.push_back(f);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
FieldMapping f(field->name, field->type, field->subtype, ifields[field->name]);
|
||||||
|
if ( field->secondary_name != "" ) {
|
||||||
|
map<string, uint32_t>::iterator fit2 = ifields.find(field->secondary_name);
|
||||||
|
if ( fit2 == ifields.end() ) {
|
||||||
|
Error(Fmt("Could not find requested port type field %s in input data file.", field->secondary_name.c_str()));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
f.secondary_position = ifields[field->secondary_name];
|
||||||
|
}
|
||||||
|
columnMap.push_back(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// well, that seems to have worked...
|
// well, that seems to have worked...
|
||||||
return true;
|
return true;
|
||||||
|
@ -461,57 +415,55 @@ bool Ascii::DoUpdate() {
|
||||||
|
|
||||||
pos--; // for easy comparisons of max element.
|
pos--; // for easy comparisons of max element.
|
||||||
|
|
||||||
for ( map<int, Filter>::iterator it = filters.begin(); it != filters.end(); it++ ) {
|
|
||||||
|
|
||||||
Value** fields = new Value*[(*it).second.num_fields];
|
Value** fields = new Value*[num_fields];
|
||||||
|
|
||||||
int fpos = 0;
|
int fpos = 0;
|
||||||
for ( vector<FieldMapping>::iterator fit = (*it).second.columnMap.begin();
|
for ( vector<FieldMapping>::iterator fit = columnMap.begin();
|
||||||
fit != (*it).second.columnMap.end();
|
fit != columnMap.end();
|
||||||
fit++ ){
|
fit++ ){
|
||||||
|
|
||||||
|
|
||||||
if ( (*fit).position > pos || (*fit).secondary_position > pos ) {
|
if ( (*fit).position > pos || (*fit).secondary_position > pos ) {
|
||||||
Error(Fmt("Not enough fields in line %s. Found %d fields, want positions %d and %d", line.c_str(), pos, (*fit).position, (*fit).secondary_position));
|
Error(Fmt("Not enough fields in line %s. Found %d fields, want positions %d and %d", line.c_str(), pos, (*fit).position, (*fit).secondary_position));
|
||||||
return false;
|
return false;
|
||||||
}
|
|
||||||
|
|
||||||
Value* val = EntryToVal(stringfields[(*fit).position], *fit);
|
|
||||||
if ( val == 0 ) {
|
|
||||||
Error("Could not convert String value to Val");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( (*fit).secondary_position != -1 ) {
|
|
||||||
// we have a port definition :)
|
|
||||||
assert(val->type == TYPE_PORT );
|
|
||||||
// Error(Fmt("Got type %d != PORT with secondary position!", val->type));
|
|
||||||
|
|
||||||
val->val.port_val.proto = StringToProto(stringfields[(*fit).secondary_position]);
|
|
||||||
}
|
|
||||||
|
|
||||||
fields[fpos] = val;
|
|
||||||
|
|
||||||
fpos++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields);
|
Value* val = EntryToVal(stringfields[(*fit).position], *fit);
|
||||||
assert ( (unsigned int) fpos == (*it).second.num_fields );
|
if ( val == 0 ) {
|
||||||
|
Error("Could not convert String value to Val");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( (*fit).secondary_position != -1 ) {
|
||||||
|
// we have a port definition :)
|
||||||
|
assert(val->type == TYPE_PORT );
|
||||||
|
// Error(Fmt("Got type %d != PORT with secondary position!", val->type));
|
||||||
|
|
||||||
if ( mode == STREAM ) {
|
val->val.port_val.proto = StringToProto(stringfields[(*fit).secondary_position]);
|
||||||
Put((*it).first, fields);
|
|
||||||
} else {
|
|
||||||
SendEntry((*it).first, fields);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Do not do this, ownership changes to other thread
|
fields[fpos] = val;
|
||||||
* for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) {
|
|
||||||
delete fields[i];
|
fpos++;
|
||||||
}
|
|
||||||
delete [] fields;
|
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//printf("fpos: %d, second.num_fields: %d\n", fpos, (*it).second.num_fields);
|
||||||
|
assert ( (unsigned int) fpos == num_fields );
|
||||||
|
|
||||||
|
if ( mode == STREAM ) {
|
||||||
|
Put(fields);
|
||||||
|
} else {
|
||||||
|
SendEntry(fields);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Do not do this, ownership changes to other thread
|
||||||
|
* for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) {
|
||||||
|
delete fields[i];
|
||||||
|
}
|
||||||
|
delete [] fields;
|
||||||
|
*/
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -519,9 +471,7 @@ bool Ascii::DoUpdate() {
|
||||||
//file->seekg(0, ios::beg); // and seek to start.
|
//file->seekg(0, ios::beg); // and seek to start.
|
||||||
|
|
||||||
if ( mode != STREAM ) {
|
if ( mode != STREAM ) {
|
||||||
for ( map<int, Filter>::iterator it = filters.begin(); it != filters.end(); it++ ) {
|
EndCurrentSend();
|
||||||
EndCurrentSend((*it).first);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -39,33 +39,22 @@ public:
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
|
||||||
virtual bool DoInit(string path, int mode);
|
virtual bool DoInit(string path, int mode, int arg_num_fields, const threading::Field* const* fields);
|
||||||
|
|
||||||
virtual bool DoAddFilter( int id, int arg_num_fields, const threading::Field* const* fields );
|
|
||||||
|
|
||||||
virtual bool DoRemoveFilter ( int id );
|
|
||||||
|
|
||||||
virtual void DoFinish();
|
virtual void DoFinish();
|
||||||
|
|
||||||
virtual bool DoUpdate();
|
virtual bool DoUpdate();
|
||||||
|
|
||||||
virtual bool DoStartReading();
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
virtual bool DoHeartbeat(double network_time, double current_time);
|
virtual bool DoHeartbeat(double network_time, double current_time);
|
||||||
|
|
||||||
struct Filter {
|
unsigned int num_fields;
|
||||||
unsigned int num_fields;
|
|
||||||
|
|
||||||
const threading::Field* const * fields; // raw mapping
|
const threading::Field* const * fields; // raw mapping
|
||||||
|
|
||||||
// map columns in the file to columns to send back to the manager
|
// map columns in the file to columns to send back to the manager
|
||||||
vector<FieldMapping> columnMap;
|
vector<FieldMapping> columnMap;
|
||||||
|
|
||||||
};
|
|
||||||
|
|
||||||
bool HasFilter(int id);
|
|
||||||
|
|
||||||
bool ReadHeader(bool useCached);
|
bool ReadHeader(bool useCached);
|
||||||
threading::Value* EntryToVal(string s, FieldMapping type);
|
threading::Value* EntryToVal(string s, FieldMapping type);
|
||||||
|
@ -75,8 +64,6 @@ private:
|
||||||
ifstream* file;
|
ifstream* file;
|
||||||
string fname;
|
string fname;
|
||||||
|
|
||||||
map<int, Filter> filters;
|
|
||||||
|
|
||||||
// Options set from the script-level.
|
// Options set from the script-level.
|
||||||
string separator;
|
string separator;
|
||||||
|
|
||||||
|
@ -91,7 +78,6 @@ private:
|
||||||
|
|
||||||
int mode;
|
int mode;
|
||||||
|
|
||||||
bool started;
|
|
||||||
time_t mtime;
|
time_t mtime;
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
|
@ -40,7 +40,6 @@ Raw::~Raw()
|
||||||
|
|
||||||
void Raw::DoFinish()
|
void Raw::DoFinish()
|
||||||
{
|
{
|
||||||
filters.empty();
|
|
||||||
if ( file != 0 ) {
|
if ( file != 0 ) {
|
||||||
file->close();
|
file->close();
|
||||||
delete(file);
|
delete(file);
|
||||||
|
@ -48,9 +47,8 @@ void Raw::DoFinish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Raw::DoInit(string path, int arg_mode)
|
bool Raw::DoInit(string path, int arg_mode, int arg_num_fields, const Field* const* arg_fields)
|
||||||
{
|
{
|
||||||
started = false;
|
|
||||||
fname = path;
|
fname = path;
|
||||||
mode = arg_mode;
|
mode = arg_mode;
|
||||||
mtime = 0;
|
mtime = 0;
|
||||||
|
@ -66,16 +64,19 @@ bool Raw::DoInit(string path, int arg_mode)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
if ( arg_num_fields != 1 ) {
|
||||||
}
|
Error("Filter for raw reader contains more than one field. Filters for the raw reader may only contain exactly one string field. Filter ignored.");
|
||||||
|
|
||||||
bool Raw::DoStartReading() {
|
|
||||||
if ( started == true ) {
|
|
||||||
Error("Started twice");
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ( fields[0]->type != TYPE_STRING ) {
|
||||||
|
Error("Filter for raw reader contains a field that is not of type string.");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
num_fields = arg_num_fields;
|
||||||
|
fields = arg_fields;
|
||||||
|
|
||||||
started = true;
|
|
||||||
switch ( mode ) {
|
switch ( mode ) {
|
||||||
case MANUAL:
|
case MANUAL:
|
||||||
case REREAD:
|
case REREAD:
|
||||||
|
@ -89,51 +90,6 @@ bool Raw::DoStartReading() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Raw::DoAddFilter( int id, int arg_num_fields, const Field* const* fields ) {
|
|
||||||
|
|
||||||
if ( arg_num_fields != 1 ) {
|
|
||||||
Error("Filter for raw reader contains more than one field. Filters for the raw reader may only contain exactly one string field. Filter ignored.");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( fields[0]->type != TYPE_STRING ) {
|
|
||||||
Error("Filter for raw reader contains a field that is not of type string.");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( HasFilter(id) ) {
|
|
||||||
Error("Filter was added twice, ignoring");
|
|
||||||
return false; // no, we don't want to add this a second time
|
|
||||||
}
|
|
||||||
|
|
||||||
Filter f;
|
|
||||||
f.num_fields = arg_num_fields;
|
|
||||||
f.fields = fields;
|
|
||||||
|
|
||||||
filters[id] = f;
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool Raw::DoRemoveFilter ( int id ) {
|
|
||||||
if (!HasFilter(id) ) {
|
|
||||||
Error("Filter removal of nonexisting filter requested.");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
assert ( filters.erase(id) == 1 );
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
bool Raw::HasFilter(int id) {
|
|
||||||
map<int, Filter>::iterator it = filters.find(id);
|
|
||||||
if ( it == filters.end() ) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool Raw::GetLine(string& str) {
|
bool Raw::GetLine(string& str) {
|
||||||
while ( getline(*file, str, separator[0]) ) {
|
while ( getline(*file, str, separator[0]) ) {
|
||||||
|
@ -188,21 +144,16 @@ bool Raw::DoUpdate() {
|
||||||
|
|
||||||
string line;
|
string line;
|
||||||
while ( GetLine(line) ) {
|
while ( GetLine(line) ) {
|
||||||
for ( map<int, Filter>::iterator it = filters.begin(); it != filters.end(); it++ ) {
|
assert (num_fields == 1);
|
||||||
|
|
||||||
|
Value** fields = new Value*[1];
|
||||||
|
|
||||||
assert ((*it).second.num_fields == 1);
|
// filter has exactly one text field. convert to it.
|
||||||
|
Value* val = new Value(TYPE_STRING, true);
|
||||||
|
val->val.string_val = new string(line);
|
||||||
|
fields[0] = val;
|
||||||
|
|
||||||
Value** fields = new Value*[1];
|
Put(fields);
|
||||||
|
|
||||||
// filter has exactly one text field. convert to it.
|
|
||||||
Value* val = new Value(TYPE_STRING, true);
|
|
||||||
val->val.string_val = new string(line);
|
|
||||||
fields[0] = val;
|
|
||||||
|
|
||||||
Put((*it).first, fields);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -19,37 +19,21 @@ public:
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
|
||||||
virtual bool DoInit(string path, int mode);
|
virtual bool DoInit(string path, int mode, int arg_num_fields, const threading::Field* const* fields);
|
||||||
|
|
||||||
virtual bool DoAddFilter( int id, int arg_num_fields, const threading::Field* const* fields );
|
|
||||||
|
|
||||||
virtual bool DoRemoveFilter ( int id );
|
|
||||||
|
|
||||||
virtual void DoFinish();
|
virtual void DoFinish();
|
||||||
|
|
||||||
virtual bool DoUpdate();
|
virtual bool DoUpdate();
|
||||||
|
|
||||||
virtual bool DoStartReading();
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
virtual bool DoHeartbeat(double network_time, double current_time);
|
virtual bool DoHeartbeat(double network_time, double current_time);
|
||||||
|
|
||||||
struct Filter {
|
|
||||||
unsigned int num_fields;
|
|
||||||
|
|
||||||
const threading::Field* const * fields; // raw mapping
|
|
||||||
};
|
|
||||||
|
|
||||||
bool HasFilter(int id);
|
|
||||||
|
|
||||||
bool GetLine(string& str);
|
bool GetLine(string& str);
|
||||||
|
|
||||||
ifstream* file;
|
ifstream* file;
|
||||||
string fname;
|
string fname;
|
||||||
|
|
||||||
map<int, Filter> filters;
|
|
||||||
|
|
||||||
// Options set from the script-level.
|
// Options set from the script-level.
|
||||||
string separator;
|
string separator;
|
||||||
|
|
||||||
|
@ -58,8 +42,11 @@ private:
|
||||||
|
|
||||||
int mode;
|
int mode;
|
||||||
|
|
||||||
bool started;
|
|
||||||
time_t mtime;
|
time_t mtime;
|
||||||
|
|
||||||
|
unsigned int num_fields;
|
||||||
|
|
||||||
|
const threading::Field* const * fields; // raw mapping
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -182,10 +182,6 @@ enum Event %{
|
||||||
EVENT_REMOVED,
|
EVENT_REMOVED,
|
||||||
%}
|
%}
|
||||||
|
|
||||||
enum ID %{
|
|
||||||
Unknown,
|
|
||||||
%}
|
|
||||||
|
|
||||||
enum Mode %{
|
enum Mode %{
|
||||||
MANUAL = 0,
|
MANUAL = 0,
|
||||||
REREAD = 1,
|
REREAD = 1,
|
||||||
|
|
|
@ -14,10 +14,6 @@ redef InputAscii::empty_field = "EMPTY";
|
||||||
|
|
||||||
module A;
|
module A;
|
||||||
|
|
||||||
export {
|
|
||||||
redef enum Input::ID += { INPUT };
|
|
||||||
}
|
|
||||||
|
|
||||||
type Idx: record {
|
type Idx: record {
|
||||||
i: int;
|
i: int;
|
||||||
};
|
};
|
||||||
|
@ -45,12 +41,10 @@ global servers: table[int] of Val = table();
|
||||||
event bro_init()
|
event bro_init()
|
||||||
{
|
{
|
||||||
# first read in the old stuff into the table...
|
# first read in the old stuff into the table...
|
||||||
Input::create_stream(A::INPUT, [$source="input.log"]);
|
Input::add_table([$source="input.log", $name="ssh", $idx=Idx, $val=Val, $destination=servers]);
|
||||||
Input::add_tablefilter(A::INPUT, [$name="ssh", $idx=Idx, $val=Val, $destination=servers]);
|
Input::remove("ssh");
|
||||||
Input::remove_tablefilter(A::INPUT, "ssh");
|
|
||||||
Input::remove_stream(A::INPUT);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
event Input::update_finished(id: Input::ID) {
|
event Input::update_finished(id: string) {
|
||||||
print servers;
|
print servers;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue