diff --git a/doc/index.rst b/doc/index.rst index ec67b76fd8..ed14be1dd2 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -24,6 +24,7 @@ Frameworks notice logging + input cluster signatures diff --git a/doc/input.rst b/doc/input.rst new file mode 100644 index 0000000000..78e96fe06e --- /dev/null +++ b/doc/input.rst @@ -0,0 +1,190 @@ +===================== +Loading Data into Bro +===================== + +.. rst-class:: opening + + Bro comes with a flexible input interface that allows to read + previously stored data. Data is either read into bro tables or + sent to scripts using events. + This document describes how the input framework can be used. + +.. contents:: + +Terminology +=========== + +Bro's input framework is built around three main abstracts, that are +very similar to the abstracts used in the logging framework: + + Input Streams + An input stream corresponds to a single input source + (usually a textfile). It defined the information necessary + to find the source (e.g. the filename) + + Filters + Each input stream has a set of filters attached to it, that + determine exaclty what kind of information is read. + There are two different kind of streams, event streams and table + streams. + By default, event streams generate an event for each line read + from the input source. + Table streams on the other hand read the input source in a bro + table for easy later access. + + Readers + A reader defines the input format for the specific input stream. + At the moment, Bro comes with only one type of reader, which can + read the tab seperated ASCII logfiles that were generated by the + logging framework. + + +Basics +====== + +For examples, please look at the unit tests in +``testing/btest/scripts/base/frameworks/input/``. + +A very basic example to open an input stream is: + +.. code:: bro + + module Foo; + + export { + # Create an ID for our new stream + redef enum Input::ID += { INPUT }; + } + + event bro_init() { + Input::create_stream(FOO::INPUT, [$source="input.log"]); + } + +The fields that can be set when creating a stream are: + + ``source`` + A mandatory string identifying the source of the data. + For the ASCII reader this is the filename. + + ``reader`` + The reader used for this stream. Default is ``READER_ASCII``. + + +Filters +======= + +Each filter defines the data fields that it wants to receive from the respective +input file. Depending on the type of filter, events or a table are created from +the data in the source file. + +Event Filters +------------- + +Event filters are filters that generate an event for each line in of the input source. + +For example, a simple filter retrieving the fields ``i`` and ``b`` from an inputSource +could be defined as follows: + +.. code:: bro + + type Val: record { + i: int; + b: bool; + }; + + event line(tpe: Input::Event, i: int, b: bool) { + # work with event data + } + + event bro_init { + # Input stream definition, etc + ... + + Input::add_eventfilter(Foo::INPUT, [$name="input", $fields=Val, $ev=line]); + + # read the file after all filters have been set + Input::force_update(Foo::INPUT); + } + +The fields that can be set for an event filter are: + + ``name`` + A mandatory name for the filter that can later be used + to manipulate it further. + + ``fields`` + Name of a record type containing the fields, which should be retrieved from + the input stream. + + ``ev`` + The event which is fired, after a line has been read from the input source. + The first argument that is passed to the event is an Input::Event structure, + followed by the data, either inside of a record (if ``want_record is set``) or as + individual fields. + The Input::Event structure can contain information, if the received line is ``NEW``, has + been ``CHANGED`` or ``DELETED``. Singe the ascii reader cannot track this information + for event filters, the value is always ``NEW`` at the moment. + + ``want_record`` + Boolean value, that defines if the event wants to receive the fields inside of + a single record value, or individually (default). + +Table Filters +------------- + +Table filters are the second, more complex type of filter. + +Table filters store the information they read from an input source in a bro table. For example, +when reading a file that contains ip addresses and connection attemt information one could use +an approach similar to this: + +.. code:: bro + + type Idx: record { + a: addr; + }; + + type Val: record { + tries: count; + }; + + global conn_attempts: table[addr] of count = table(); + + event bro_init { + # Input stream definitions, etc. + ... + + Input::add_tablefilter(Foo::INPUT, [$name="ssh", $idx=Idx, $val=Val, $destination=conn_attempts]); + + # read the file after all filters have been set + Input::force_update(Foo::INPUT); + } + +The table conn_attempts will then contain the information about connection attemps. + +The possible fields that can be set for an table filter are: + + ``name`` + A mandatory name for the filter that can later be used + to manipulate it further. + + ``idx`` + Record type that defines the index of the table + + ``val`` + Record type that defines the values of the table + + ``want_record`` + Defines if the values of the table should be stored as a record (default), + or as a simple value. Has to be set if Val contains more than one element. + + ``destination`` + The destination table + + ``ev`` + Optional event that is raised, when values are added to, changed in or deleted from the table. + Events are passed an Input::Event description as the first argument, the index record as the second argument + and the values as the third argument. + + ``pred`` + Optional predicate, that can prevent entries from being added to the table and events from being sent. diff --git a/doc/scripts/DocSourcesList.cmake b/doc/scripts/DocSourcesList.cmake index ade0add875..4ab3154ecd 100644 --- a/doc/scripts/DocSourcesList.cmake +++ b/doc/scripts/DocSourcesList.cmake @@ -19,6 +19,7 @@ rest_target(${psd} base/init-bare.bro internal) rest_target(${CMAKE_BINARY_DIR}/src base/bro.bif.bro) rest_target(${CMAKE_BINARY_DIR}/src base/const.bif.bro) rest_target(${CMAKE_BINARY_DIR}/src base/event.bif.bro) +rest_target(${CMAKE_BINARY_DIR}/src base/input.bif.bro) rest_target(${CMAKE_BINARY_DIR}/src base/logging.bif.bro) rest_target(${CMAKE_BINARY_DIR}/src base/reporter.bif.bro) rest_target(${CMAKE_BINARY_DIR}/src base/strings.bif.bro) @@ -31,6 +32,8 @@ rest_target(${psd} base/frameworks/cluster/setup-connections.bro) rest_target(${psd} base/frameworks/communication/main.bro) rest_target(${psd} base/frameworks/control/main.bro) rest_target(${psd} base/frameworks/dpd/main.bro) +rest_target(${psd} base/frameworks/input/main.bro) +rest_target(${psd} base/frameworks/input/readers/ascii.bro) rest_target(${psd} base/frameworks/intel/main.bro) rest_target(${psd} base/frameworks/logging/main.bro) rest_target(${psd} base/frameworks/logging/postprocessors/scp.bro) diff --git a/scripts/base/frameworks/input/__load__.bro b/scripts/base/frameworks/input/__load__.bro new file mode 100644 index 0000000000..a3315186d5 --- /dev/null +++ b/scripts/base/frameworks/input/__load__.bro @@ -0,0 +1,3 @@ +@load ./main +@load ./readers/ascii + diff --git a/scripts/base/frameworks/input/main.bro b/scripts/base/frameworks/input/main.bro new file mode 100644 index 0000000000..c76eba80b9 --- /dev/null +++ b/scripts/base/frameworks/input/main.bro @@ -0,0 +1,192 @@ +##! The input framework provides a way to read previously stored data either +##! as an event stream or into a bro table. + +module Input; + +export { + redef enum Input::ID += { TABLE_READ }; + + ## The default input reader used. Defaults to `READER_ASCII`. + const default_reader = READER_ASCII &redef; + + ## Stream decription type used for the `create_stream` method + type StreamDescription: record { + ## String that allows the reader to find the source. + ## For `READER_ASCII`, this is the filename. + source: string; + + ## Reader to use for this steam + reader: Reader &default=default_reader; + }; + + ## TableFilter description type used for the `add_tablefilter` method. + type TableFilter: record { + ## Descriptive name. Used to remove a filter at a later time + name: string; + + ## Table which will contain the data read by the input framework + destination: any; + ## Record that defines the values used as the index of the table + idx: any; + ## Record that defines the values used as the values of the table + ## If val is undefined, destination has to be a set. + val: any &optional; + ## Defines if the value of the table is a record (default), or a single value. + ## Val can only contain one element when this is set to false. + want_record: bool &default=T; + + ## The event that is raised each time a value is added to, changed in or removed from the table. + ## The event will receive an Input::Event enum as the first argument, the idx record as the second argument + ## and the value (record) as the third argument. + ev: any &optional; # event containing idx, val as values. + + ## Predicate function, that can decide if an insertion, update or removal should really be executed. + ## Parameters are the same as for the event. If true is returned, the update is performed. If false + ## is returned, it is skipped + pred: function(typ: Input::Event, left: any, right: any): bool &optional; + }; + + ## EventFilter description type used for the `add_eventfilter` method. + type EventFilter: record { + ## Descriptive name. Used to remove a filter at a later time + name: string; + + ## Record describing the fields to be retrieved from the source input. + fields: any; + ## If want_record if false (default), the event receives each value in fields as a seperate argument. + ## If it is set to true, the event receives all fields in a signle record value. + want_record: bool &default=F; + + ## The event that is rised each time a new line is received from the reader. + ## The event will receive an Input::Event enum as the first element, and the fields as the following arguments. + ev: any; + + }; + + #const no_filter: Filter = [$name="", $idx="", $val="", $destination=""]; # Sentinel. + + ## Create a new input stream from a given source. Returns true on success. + ## + ## id: `Input::ID` enum value identifying this stream + ## description: `StreamDescription` record describing the source. + global create_stream: function(id: Input::ID, description: Input::StreamDescription) : bool; + + ## Remove a current input stream. Returns true on success. + ## + ## id: `Input::ID` enum value identifying the stream to be removed + global remove_stream: function(id: Input::ID) : bool; + + ## Forces the current input to be checked for changes. + ## + ## id: `Input::ID` enum value identifying the stream + global force_update: function(id: Input::ID) : bool; + + ## Adds a table filter to a specific input stream. Returns true on success. + ## + ## id: `Input::ID` enum value identifying the stream + ## filter: the `TableFilter` record describing the filter. + global add_tablefilter: function(id: Input::ID, filter: Input::TableFilter) : bool; + + ## Removes a named table filter to a specific input stream. Returns true on success. + ## + ## id: `Input::ID` enum value identifying the stream + ## name: the name of the filter to be removed. + global remove_tablefilter: function(id: Input::ID, name: string) : bool; + + ## Adds an event filter to a specific input stream. Returns true on success. + ## + ## id: `Input::ID` enum value identifying the stream + ## filter: the `EventFilter` record describing the filter. + global add_eventfilter: function(id: Input::ID, filter: Input::EventFilter) : bool; + + ## Removes a named event filter to a specific input stream. Returns true on success. + ## + ## id: `Input::ID` enum value identifying the stream + ## name: the name of the filter to be removed. + global remove_eventfilter: function(id: Input::ID, name: string) : bool; + #global get_filter: function(id: ID, name: string) : Filter; + + ## Convenience function for reading a specific input source exactly once using + ## exactly one tablefilter + ## + ## id: `Input::ID` enum value identifying the stream + ## description: `StreamDescription` record describing the source. + ## filter: the `TableFilter` record describing the filter. + global read_table: function(description: Input::StreamDescription, filter: Input::TableFilter) : bool; + +} + +@load base/input.bif + + +module Input; + +#global filters: table[ID, string] of Filter; + +function create_stream(id: Input::ID, description: Input::StreamDescription) : bool + { + return __create_stream(id, description); + } + +function remove_stream(id: Input::ID) : bool + { + return __remove_stream(id); + } + +function force_update(id: Input::ID) : bool + { + return __force_update(id); + } + +function add_tablefilter(id: Input::ID, filter: Input::TableFilter) : bool + { +# filters[id, filter$name] = filter; + return __add_tablefilter(id, filter); + } + +function remove_tablefilter(id: Input::ID, name: string) : bool + { +# delete filters[id, name]; + return __remove_tablefilter(id, name); + } + +function add_eventfilter(id: Input::ID, filter: Input::EventFilter) : bool + { +# filters[id, filter$name] = filter; + return __add_eventfilter(id, filter); + } + +function remove_eventfilter(id: Input::ID, name: string) : bool + { +# delete filters[id, name]; + return __remove_eventfilter(id, name); + } + +function read_table(description: Input::StreamDescription, filter: Input::TableFilter) : bool { + local ok: bool = T; + # since we create and delete it ourselves this should be ok... at least for singlethreaded operation + local id: Input::ID = Input::TABLE_READ; + + ok = create_stream(id, description); + if ( ok ) { + ok = add_tablefilter(id, filter); + } + if ( ok ) { + ok = force_update(id); + } + if ( ok ) { + ok = remove_stream(id); + } else { + remove_stream(id); + } + + return ok; +} + +#function get_filter(id: ID, name: string) : Filter +# { +# if ( [id, name] in filters ) +# return filters[id, name]; +# +# return no_filter; +# } diff --git a/scripts/base/frameworks/input/readers/ascii.bro b/scripts/base/frameworks/input/readers/ascii.bro new file mode 100644 index 0000000000..14c04757f7 --- /dev/null +++ b/scripts/base/frameworks/input/readers/ascii.bro @@ -0,0 +1,19 @@ +##! Interface for the ascii input reader. + +module InputAscii; + +export { + ## Separator between fields. + ## Please note that the separator has to be exactly one character long + const separator = "\t" &redef; + + ## Separator between set elements. + ## Please note that the separator has to be exactly one character long + const set_separator = "," &redef; + + ## String to use for empty fields. + const empty_field = "(empty)" &redef; + + ## String to use for an unset &optional field. + const unset_field = "-" &redef; +} diff --git a/scripts/base/init-bare.bro b/scripts/base/init-bare.bro index 200947938d..07b6451bfb 100644 --- a/scripts/base/init-bare.bro +++ b/scripts/base/init-bare.bro @@ -2337,3 +2337,6 @@ const snaplen = 8192 &redef; # Load the logging framework here because it uses fairly deep integration with # BiFs and script-land defined types. @load base/frameworks/logging + +@load base/frameworks/input + diff --git a/src/Attr.cc b/src/Attr.cc index 40c6c1a75c..6c6ac1983b 100644 --- a/src/Attr.cc +++ b/src/Attr.cc @@ -17,7 +17,7 @@ const char* attr_name(attr_tag t) "&persistent", "&synchronized", "&postprocessor", "&encrypt", "&match", "&disable_print_hook", "&raw_output", "&mergeable", "&priority", - "&group", "&log", "&error_handler", "(&tracked)", + "&group", "&log", "&error_handler", "&type_column", "(&tracked)", }; return attr_names[int(t)]; @@ -420,6 +420,26 @@ void Attributes::CheckAttr(Attr* a) Error("&log applied to a type that cannot be logged"); break; + case ATTR_TYPE_COLUMN: + { + if ( type->Tag() != TYPE_PORT ) + { + Error("type_column tag only applicable to ports"); + break; + } + + BroType* atype = a->AttrExpr()->Type(); + + if ( atype->Tag() != TYPE_STRING ) { + Error("type column needs to have a string argument"); + break; + } + + + break; + } + + default: BadTag("Attributes::CheckAttr", attr_name(a->Tag())); } diff --git a/src/Attr.h b/src/Attr.h index 6c835dc61c..471acfe4ba 100644 --- a/src/Attr.h +++ b/src/Attr.h @@ -35,6 +35,7 @@ typedef enum { ATTR_GROUP, ATTR_LOG, ATTR_ERROR_HANDLER, + ATTR_TYPE_COLUMN, // for input framework ATTR_TRACKED, // hidden attribute, tracked by NotifierRegistry #define NUM_ATTRS (int(ATTR_TRACKED) + 1) } attr_tag; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 7a3cc4babf..6a84053bce 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -142,6 +142,7 @@ endmacro(GET_BIF_OUTPUT_FILES) set(BIF_SRCS bro.bif logging.bif + input.bif event.bif const.bif types.bif @@ -419,6 +420,12 @@ set(bro_SRCS logging/writers/Ascii.cc logging/writers/None.cc + input/Manager.cc + input/ReaderBackend.cc + input/ReaderFrontend.cc + input/readers/Ascii.cc + + ${dns_SRCS} ${openssl_SRCS} ) diff --git a/src/Func.cc b/src/Func.cc index 65cb22b09d..829bc89238 100644 --- a/src/Func.cc +++ b/src/Func.cc @@ -523,11 +523,13 @@ void builtin_error(const char* msg, BroObj* arg) #include "bro.bif.func_h" #include "logging.bif.func_h" +#include "input.bif.func_h" #include "reporter.bif.func_h" #include "strings.bif.func_h" #include "bro.bif.func_def" #include "logging.bif.func_def" +#include "input.bif.func_def" #include "reporter.bif.func_def" #include "strings.bif.func_def" @@ -542,6 +544,7 @@ void init_builtin_funcs() #include "bro.bif.func_init" #include "logging.bif.func_init" +#include "input.bif.func_init" #include "reporter.bif.func_init" #include "strings.bif.func_init" diff --git a/src/NetVar.cc b/src/NetVar.cc index 5aed213508..7285a8433c 100644 --- a/src/NetVar.cc +++ b/src/NetVar.cc @@ -257,6 +257,7 @@ StringVal* cmd_line_bpf_filter; #include "types.bif.netvar_def" #include "event.bif.netvar_def" #include "logging.bif.netvar_def" +#include "input.bif.netvar_def" #include "reporter.bif.netvar_def" void init_event_handlers() @@ -317,6 +318,7 @@ void init_net_var() #include "const.bif.netvar_init" #include "types.bif.netvar_init" #include "logging.bif.netvar_init" +#include "input.bif.netvar_init" #include "reporter.bif.netvar_init" conn_id = internal_type("conn_id")->AsRecordType(); diff --git a/src/NetVar.h b/src/NetVar.h index 4a513a8a53..488d842aec 100644 --- a/src/NetVar.h +++ b/src/NetVar.h @@ -266,6 +266,7 @@ extern void init_net_var(); #include "types.bif.netvar_h" #include "event.bif.netvar_h" #include "logging.bif.netvar_h" +#include "input.bif.netvar_h" #include "reporter.bif.netvar_h" #endif diff --git a/src/Val.h b/src/Val.h index d851be311b..3ae0bc3334 100644 --- a/src/Val.h +++ b/src/Val.h @@ -841,6 +841,9 @@ public: timer = 0; } + HashKey* ComputeHash(const Val* index) const + { return table_hash->ComputeHash(index, 1); } + protected: friend class Val; friend class StateAccess; @@ -851,8 +854,6 @@ protected: void CheckExpireAttr(attr_tag at); int ExpandCompoundAndInit(val_list* vl, int k, Val* new_val); int CheckAndAssign(Val* index, Val* new_val, Opcode op = OP_ASSIGN); - HashKey* ComputeHash(const Val* index) const - { return table_hash->ComputeHash(index, 1); } bool AddProperties(Properties arg_state); bool RemoveProperties(Properties arg_state); diff --git a/src/input.bif b/src/input.bif new file mode 100644 index 0000000000..2e9324ec56 --- /dev/null +++ b/src/input.bif @@ -0,0 +1,64 @@ +# functions and types for the input framework + +module Input; + +%%{ +#include "input/Manager.h" +#include "NetVar.h" +%%} + +type StreamDescription: record; +type TableFilter: record; +type EventFilter: record; + +function Input::__create_stream%(id: Input::ID, description: Input::StreamDescription%) : bool + %{ + input::ReaderFrontend *the_reader = input_mgr->CreateStream(id->AsEnumVal(), description->AsRecordVal()); + return new Val( the_reader != 0, TYPE_BOOL ); + %} + +function Input::__remove_stream%(id: Input::ID%) : bool + %{ + bool res = input_mgr->RemoveStream(id->AsEnumVal()); + return new Val( res, TYPE_BOOL ); + %} + +function Input::__force_update%(id: Input::ID%) : bool + %{ + bool res = input_mgr->ForceUpdate(id->AsEnumVal()); + return new Val( res, TYPE_BOOL ); + %} + +function Input::__add_tablefilter%(id: Input::ID, filter: Input::TableFilter%) : bool + %{ + bool res = input_mgr->AddTableFilter(id->AsEnumVal(), filter->AsRecordVal()); + return new Val( res, TYPE_BOOL ); + %} + +function Input::__remove_tablefilter%(id: Input::ID, name: string%) : bool + %{ + bool res = input_mgr->RemoveTableFilter(id->AsEnumVal(), name->AsString()->CheckString()); + return new Val( res, TYPE_BOOL); + %} + +function Input::__add_eventfilter%(id: Log::ID, filter: Input::EventFilter%) : bool + %{ + bool res = input_mgr->AddEventFilter(id->AsEnumVal(), filter->AsRecordVal()); + return new Val( res, TYPE_BOOL ); + %} + +function Input::__remove_eventfilter%(id: Log::ID, name: string%) : bool + %{ + bool res = input_mgr->RemoveEventFilter(id->AsEnumVal(), name->AsString()->CheckString()); + return new Val( res, TYPE_BOOL); + %} + +# Options for Ascii Reader + +module InputAscii; + +const separator: string; +const set_separator: string; +const empty_field: string; +const unset_field: string; + diff --git a/src/input/Manager.cc b/src/input/Manager.cc new file mode 100644 index 0000000000..24bc464daf --- /dev/null +++ b/src/input/Manager.cc @@ -0,0 +1,1463 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include + +#include "Manager.h" +#include "Event.h" +#include "EventHandler.h" +#include "NetVar.h" +#include "Net.h" + + +#include "InputReader.h" + +#include "InputReaderAscii.h" + +#include "CompHash.h" + +using namespace input; +using threading::Value; +using threading::Field; + +struct InputHash { + HashKey* valhash; + HashKey* idxkey; // does not need ref or whatever - if it is present here, it is also still present in the TableVal. +}; + +declare(PDict, InputHash); + +class Manager::Filter { +public: + EnumVal* id; + string name; + + FilterType filter_type; // to distinguish between event and table filters + + virtual ~Filter(); +}; + +class Manager::TableFilter: public Manager::Filter { +public: + + unsigned int num_idx_fields; + unsigned int num_val_fields; + bool want_record; + EventHandlerPtr table_event; + + TableVal* tab; + RecordType* rtype; + RecordType* itype; + + PDict(InputHash)* currDict; + PDict(InputHash)* lastDict; + + Func* pred; + + EventHandlerPtr event; + + TableFilter(); + ~TableFilter(); +}; + +class Manager::EventFilter: public Manager::Filter { +public: + EventHandlerPtr event; + + RecordType* fields; + unsigned int num_fields; + + bool want_record; + EventFilter(); +}; + +Manager::TableFilter::TableFilter() { + filter_type = TABLE_FILTER; + + tab = 0; + itype = 0; + rtype = 0; +} + +Manager::EventFilter::EventFilter() { + filter_type = EVENT_FILTER; +} + +Manager::Filter::~Filter() { + Unref(id); +} + +Manager::TableFilter::~TableFilter() { + Unref(tab); + Unref(itype); + if ( rtype ) // can be 0 for sets + Unref(rtype); + + delete currDict; + delete lastDict; +} + +struct Manager::ReaderInfo { + EnumVal* id; + EnumVal* type; + InputReader* reader; + + //list events; // events we fire when "something" happens + map filters; // filters that can prevent our actions + + bool HasFilter(int id); + + ~ReaderInfo(); + }; + +Manager::ReaderInfo::~ReaderInfo() { + map::iterator it = filters.begin(); + + while ( it != filters.end() ) { + delete (*it).second; + ++it; + } + + Unref(type); + Unref(id); + + delete(reader); +} + +bool Manager::ReaderInfo::HasFilter(int id) { + map::iterator it = filters.find(id); + if ( it == filters.end() ) { + return false; + } + return true; +} + + +struct InputReaderDefinition { + bro_int_t type; // the type + const char *name; // descriptive name for error messages + bool (*init)(); // optional one-time inifializing function + InputReader* (*factory)(); // factory function for creating instances +}; + +InputReaderDefinition input_readers[] = { + { BifEnum::Input::READER_ASCII, "Ascii", 0, InputReaderAscii::Instantiate }, + + // End marker + { BifEnum::Input::READER_DEFAULT, "None", 0, (InputReader* (*)())0 } +}; + +Manager::Manager() +{ +} + +// create a new input reader object to be used at whomevers leisure lateron. +InputReader* Manager::CreateStream(EnumVal* id, RecordVal* description) +{ + InputReaderDefinition* ir = input_readers; + + RecordType* rtype = description->Type()->AsRecordType(); + if ( ! same_type(rtype, BifType::Record::Input::StreamDescription, 0) ) + { + reporter->Error("Streamdescription argument not of right type"); + return 0; + } + + EnumVal* reader = description->LookupWithDefault(rtype->FieldOffset("reader"))->AsEnumVal(); + + while ( true ) { + if ( ir->type == BifEnum::Input::READER_DEFAULT ) + { + reporter->Error("unknown reader when creating reader"); + return 0; + } + + if ( ir->type != reader->AsEnum() ) { + // no, didn't find the right one... + ++ir; + continue; + } + + + // call init function of writer if presnt + if ( ir->init ) + { + if ( (*ir->init)() ) + { + //clear it to be not called again + ir->init = 0; + } else { + // ohok. init failed, kill factory for all eternity + ir->factory = 0; + DBG_LOG(DBG_LOGGING, "failed to init input class %s", ir->name); + return 0; + } + + } + + if ( !ir->factory ) + // no factory? + return 0; + + // all done. break. + break; + } + + assert(ir->factory); + InputReader* reader_obj = (*ir->factory)(); + assert(reader_obj); + + // get the source... + const BroString* bsource = description->Lookup(rtype->FieldOffset("source"))->AsString(); + string source((const char*) bsource->Bytes(), bsource->Len()); + + ReaderInfo* info = new ReaderInfo; + info->reader = reader_obj; + info->type = reader->AsEnumVal(); // ref'd by lookupwithdefault + info->id = id->Ref()->AsEnumVal(); + + readers.push_back(info); + + int success = reader_obj->Init(source); + if ( success == false ) { + assert( RemoveStream(id) ); + return 0; + } + success = reader_obj->Update(); + if ( success == false ) { + assert ( RemoveStream(id) ); + return 0; + } + + return reader_obj; + +} + +bool Manager::AddEventFilter(EnumVal *id, RecordVal* fval) { + ReaderInfo *i = FindReader(id); + if ( i == 0 ) { + reporter->Error("Stream not found"); + return false; + } + + RecordType* rtype = fval->Type()->AsRecordType(); + if ( ! same_type(rtype, BifType::Record::Input::EventFilter, 0) ) + { + reporter->Error("filter argument not of right type"); + return false; + } + + Val* name = fval->Lookup(rtype->FieldOffset("name")); + RecordType *fields = fval->Lookup(rtype->FieldOffset("fields"))->AsType()->AsTypeType()->Type()->AsRecordType(); + + Val *want_record = fval->LookupWithDefault(rtype->FieldOffset("want_record")); + + Val* event_val = fval->Lookup(rtype->FieldOffset("ev")); + Func* event = event_val->AsFunc(); + + { + FuncType* etype = event->FType()->AsFuncType(); + + if ( ! etype->IsEvent() ) { + reporter->Error("stream event is a function, not an event"); + return false; + } + + const type_list* args = etype->ArgTypes()->Types(); + + if ( args->length() < 2 ) { + reporter->Error("event takes not enough arguments"); + return false; + } + + if ( ! same_type((*args)[0], BifType::Enum::Input::Event, 0) ) + { + reporter->Error("events first attribute must be of type Input::Event"); + return false; + } + + if ( want_record->InternalInt() == 0 ) { + if ( args->length() != fields->NumFields() + 1 ) { + reporter->Error("events has wrong number of arguments"); + return false; + } + + for ( int i = 0; i < fields->NumFields(); i++ ) { + if ( !same_type((*args)[i+1], fields->FieldType(i) ) ) { + reporter->Error("Incompatible type for event"); + return false; + } + } + + } else if ( want_record->InternalInt() == 1 ) { + if ( args->length() != 2 ) { + reporter->Error("events has wrong number of arguments"); + return false; + } + + if ( !same_type((*args)[1], fields ) ) { + reporter->Error("Incompatible type for event"); + return false; + } + + } else { + assert(false); + } + + } + + + vector fieldsV; // vector, because UnrollRecordType needs it + + bool status = !UnrollRecordType(&fieldsV, fields, ""); + + if ( status ) { + reporter->Error("Problem unrolling"); + return false; + } + + + LogField** logf = new LogField*[fieldsV.size()]; + for ( unsigned int i = 0; i < fieldsV.size(); i++ ) { + logf[i] = fieldsV[i]; + } + + EventFilter* filter = new EventFilter(); + filter->name = name->AsString()->CheckString(); + filter->id = id->Ref()->AsEnumVal(); + filter->num_fields = fieldsV.size(); + filter->fields = fields->Ref()->AsRecordType(); + filter->event = event_registry->Lookup(event->GetID()->Name()); + filter->want_record = ( want_record->InternalInt() == 1 ); + Unref(want_record); // ref'd by lookupwithdefault + + int filterid = 0; + if ( i->filters.size() > 0 ) { + filterid = i->filters.rbegin()->first + 1; // largest element is at beginning of map-> new id = old id + 1-> + } + i->filters[filterid] = filter; + i->reader->AddFilter( filterid, fieldsV.size(), logf ); + + return true; +} + +bool Manager::AddTableFilter(EnumVal *id, RecordVal* fval) { + ReaderInfo *i = FindReader(id); + if ( i == 0 ) { + reporter->Error("Stream not found"); + return false; + } + + RecordType* rtype = fval->Type()->AsRecordType(); + if ( ! same_type(rtype, BifType::Record::Input::TableFilter, 0) ) + { + reporter->Error("filter argument not of right type"); + return false; + } + + + Val* name = fval->Lookup(rtype->FieldOffset("name")); + Val* pred = fval->Lookup(rtype->FieldOffset("pred")); + + RecordType *idx = fval->Lookup(rtype->FieldOffset("idx"))->AsType()->AsTypeType()->Type()->AsRecordType(); + RecordType *val = 0; + if ( fval->Lookup(rtype->FieldOffset("val")) != 0 ) { + val = fval->Lookup(rtype->FieldOffset("val"))->AsType()->AsTypeType()->Type()->AsRecordType(); + } + TableVal *dst = fval->Lookup(rtype->FieldOffset("destination"))->AsTableVal(); + + Val *want_record = fval->LookupWithDefault(rtype->FieldOffset("want_record")); + + Val* event_val = fval->Lookup(rtype->FieldOffset("ev")); + Func* event = event_val ? event_val->AsFunc() : 0; + + if ( event ) { + FuncType* etype = event->FType()->AsFuncType(); + + if ( ! etype->IsEvent() ) { + reporter->Error("stream event is a function, not an event"); + return false; + } + + const type_list* args = etype->ArgTypes()->Types(); + + if ( args->length() != 3 ) + { + reporter->Error("Table event must take 3 arguments"); + return false; + } + + if ( ! same_type((*args)[0], BifType::Enum::Input::Event, 0) ) + { + reporter->Error("table events first attribute must be of type Input::Event"); + return false; + } + + if ( ! same_type((*args)[1], idx) ) + { + reporter->Error("table events index attributes do not match"); + return false; + } + + if ( want_record->InternalInt() == 1 && ! same_type((*args)[2], val) ) + { + reporter->Error("table events value attributes do not match"); + return false; + } else if ( want_record->InternalInt() == 0 && !same_type((*args)[2], val->FieldType(0) ) ) { + reporter->Error("table events value attribute does not match"); + return false; + } + assert(want_record->InternalInt() == 1 || want_record->InternalInt() == 0); + + } + + vector fieldsV; // vector, because we don't know the length beforehands + + bool status = !UnrollRecordType(&fieldsV, idx, ""); + + int idxfields = fieldsV.size(); + + if ( val ) // if we are not a set + status = status || !UnrollRecordType(&fieldsV, val, ""); + + int valfields = fieldsV.size() - idxfields; + + if ( !val ) + assert(valfields == 0); + + if ( status ) { + reporter->Error("Problem unrolling"); + return false; + } + + + LogField** fields = new LogField*[fieldsV.size()]; + for ( unsigned int i = 0; i < fieldsV.size(); i++ ) { + fields[i] = fieldsV[i]; + } + + TableFilter* filter = new TableFilter(); + filter->name = name->AsString()->CheckString(); + filter->id = id->Ref()->AsEnumVal(); + filter->pred = pred ? pred->AsFunc() : 0; + filter->num_idx_fields = idxfields; + filter->num_val_fields = valfields; + filter->tab = dst->Ref()->AsTableVal(); + filter->rtype = val ? val->Ref()->AsRecordType() : 0; + filter->itype = idx->Ref()->AsRecordType(); + filter->event = event ? event_registry->Lookup(event->GetID()->Name()) : 0; + filter->currDict = new PDict(InputHash); + filter->lastDict = new PDict(InputHash); + filter->want_record = ( want_record->InternalInt() == 1 ); + Unref(want_record); // ref'd by lookupwithdefault + + if ( valfields > 1 ) { + assert(filter->want_record); + } + + int filterid = 0; + if ( i->filters.size() > 0 ) { + filterid = i->filters.rbegin()->first + 1; // largest element is at beginning of map-> new id = old id + 1-> + } + i->filters[filterid] = filter; + i->reader->AddFilter( filterid, fieldsV.size(), fields ); + + return true; +} + + +bool Manager::IsCompatibleType(BroType* t, bool atomic_only) + { + if ( ! t ) + return false; + + switch ( t->Tag() ) { + case TYPE_BOOL: + case TYPE_INT: + case TYPE_COUNT: + case TYPE_COUNTER: + case TYPE_PORT: + case TYPE_SUBNET: + case TYPE_ADDR: + case TYPE_DOUBLE: + case TYPE_TIME: + case TYPE_INTERVAL: + case TYPE_ENUM: + case TYPE_STRING: + return true; + + case TYPE_RECORD: + return ! atomic_only; + + case TYPE_TABLE: + { + if ( atomic_only ) + return false; + + if ( ! t->IsSet() ) + return false; + + return IsCompatibleType(t->AsSetType()->Indices()->PureType(), true); + } + + case TYPE_VECTOR: + { + if ( atomic_only ) + return false; + + return IsCompatibleType(t->AsVectorType()->YieldType(), true); + } + + default: + return false; + } + + return false; +} + + +bool Manager::RemoveStream(const EnumVal* id) { + ReaderInfo *i = 0; + for ( vector::iterator s = readers.begin(); s != readers.end(); ++s ) + { + if ( (*s)->id == id ) + { + i = (*s); + readers.erase(s); // remove from vector + break; + } + } + + if ( i == 0 ) { + return false; // not found + } + + i->reader->Finish(); + + delete(i); + + return true; +} + +bool Manager::UnrollRecordType(vector *fields, const RecordType *rec, const string& nameprepend) { + for ( int i = 0; i < rec->NumFields(); i++ ) + { + + if ( !IsCompatibleType(rec->FieldType(i)) ) { + reporter->Error("Incompatible type \"%s\" in table definition for InputReader", type_name(rec->FieldType(i)->Tag())); + return false; + } + + if ( rec->FieldType(i)->Tag() == TYPE_RECORD ) + { + string prep = nameprepend + rec->FieldName(i) + "."; + + if ( !UnrollRecordType(fields, rec->FieldType(i)->AsRecordType(), prep) ) + { + return false; + } + + } else { + LogField* field = new LogField(); + field->name = nameprepend + rec->FieldName(i); + field->type = rec->FieldType(i)->Tag(); + if ( field->type == TYPE_TABLE ) { + field->subtype = rec->FieldType(i)->AsSetType()->Indices()->PureType()->Tag(); + } else if ( field->type == TYPE_VECTOR ) { + field->subtype = rec->FieldType(i)->AsVectorType()->YieldType()->Tag(); + } else if ( field->type == TYPE_PORT && + rec->FieldDecl(i)->FindAttr(ATTR_TYPE_COLUMN) ) { + // we have an annotation for the second column + + Val* c = rec->FieldDecl(i)->FindAttr(ATTR_TYPE_COLUMN)->AttrExpr()->Eval(0); + + assert(c); + assert(c->Type()->Tag() == TYPE_STRING); + + field->secondary_name = c->AsStringVal()->AsString()->CheckString(); + } + + fields->push_back(field); + } + } + + return true; +} + +bool Manager::ForceUpdate(const EnumVal* id) +{ + ReaderInfo *i = FindReader(id); + if ( i == 0 ) { + reporter->Error("Reader not found"); + return false; + } + + return i->reader->Update(); +} + +bool Manager::RemoveTableFilter(EnumVal* id, const string &name) { + ReaderInfo *i = FindReader(id); + if ( i == 0 ) { + reporter->Error("Reader not found"); + return false; + } + + map::iterator it = i->filters.find(id->InternalInt()); + if ( it == i->filters.end() ) { + return false; + } + + if ( i->filters[id->InternalInt()]->filter_type != TABLE_FILTER ) { + // wrong type; + return false; + } + + delete (*it).second; + i->filters.erase(it); + return true; +} + +bool Manager::RemoveEventFilter(EnumVal* id, const string &name) { + ReaderInfo *i = FindReader(id); + if ( i == 0 ) { + reporter->Error("Reader not found"); + return false; + } + + map::iterator it = i->filters.find(id->InternalInt()); + if ( it == i->filters.end() ) { + return false; + } + + if ( i->filters[id->InternalInt()]->filter_type != EVENT_FILTER ) { + // wrong type; + return false; + } + + delete (*it).second; + i->filters.erase(it); + return true; +} + +Val* Manager::LogValToIndexVal(int num_fields, const RecordType *type, const LogVal* const *vals) { + Val* idxval; + int position = 0; + + + if ( num_fields == 1 && type->FieldType(0)->Tag() != TYPE_RECORD ) { + idxval = LogValToVal(vals[0], type->FieldType(0)); + position = 1; + } else { + ListVal *l = new ListVal(TYPE_ANY); + for ( int j = 0 ; j < type->NumFields(); j++ ) { + if ( type->FieldType(j)->Tag() == TYPE_RECORD ) { + l->Append(LogValToRecordVal(vals, type->FieldType(j)->AsRecordType(), &position)); + } else { + l->Append(LogValToVal(vals[position], type->FieldType(j))); + position++; + } + } + idxval = l; + } + + //reporter->Error("Position: %d, num_fields: %d", position, num_fields); + assert ( position == num_fields ); + + return idxval; +} + + +void Manager::SendEntry(const InputReader* reader, int id, const LogVal* const *vals) { + ReaderInfo *i = FindReader(reader); + if ( i == 0 ) { + reporter->InternalError("Unknown reader"); + return; + } + + if ( !i->HasFilter(id) ) { + reporter->InternalError("Unknown filter"); + return; + } + + if ( i->filters[id]->filter_type == TABLE_FILTER ) { + SendEntryTable(reader, id, vals); + } else if ( i->filters[id]->filter_type == EVENT_FILTER ) { + EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); + SendEventFilterEvent(reader, type, id, vals); + } else { + assert(false); + } + +} + +void Manager::SendEntryTable(const InputReader* reader, int id, const LogVal* const *vals) { + ReaderInfo *i = FindReader(reader); + + bool updated = false; + + assert(i); + assert(i->HasFilter(id)); + + assert(i->filters[id]->filter_type == TABLE_FILTER); + TableFilter* filter = (TableFilter*) i->filters[id]; + + //reporter->Error("Hashing %d index fields", i->num_idx_fields); + HashKey* idxhash = HashLogVals(filter->num_idx_fields, vals); + //reporter->Error("Result: %d", (uint64_t) idxhash->Hash()); + //reporter->Error("Hashing %d val fields", i->num_val_fields); + HashKey* valhash = 0; + if ( filter->num_val_fields > 0 ) + HashLogVals(filter->num_val_fields, vals+filter->num_idx_fields); + + //reporter->Error("Result: %d", (uint64_t) valhash->Hash()); + + //reporter->Error("received entry with idxhash %d and valhash %d", (uint64_t) idxhash->Hash(), (uint64_t) valhash->Hash()); + + InputHash *h = filter->lastDict->Lookup(idxhash); + if ( h != 0 ) { + // seen before + if ( filter->num_val_fields == 0 || h->valhash->Hash() == valhash->Hash() ) { + // ok, exact duplicate + filter->lastDict->Remove(idxhash); + filter->currDict->Insert(idxhash, h); + return; + } else { + assert( filter->num_val_fields > 0 ); + // updated + filter->lastDict->Remove(idxhash); + delete(h); + updated = true; + + } + } + + + Val* idxval = LogValToIndexVal(filter->num_idx_fields, filter->itype, vals); + Val* valval; + + int position = filter->num_idx_fields; + if ( filter->num_val_fields == 0 ) { + valval = 0; + } else if ( filter->num_val_fields == 1 && !filter->want_record ) { + valval = LogValToVal(vals[position], filter->rtype->FieldType(0)); + } else { + valval = LogValToRecordVal(vals, filter->rtype, &position); + } + + + Val* oldval = 0; + if ( updated == true ) { + assert(filter->num_val_fields > 0); + // in that case, we need the old value to send the event (if we send an event). + oldval = filter->tab->Lookup(idxval); + } + + + // call filter first to determine if we really add / change the entry + if ( filter->pred ) { + EnumVal* ev; + //Ref(idxval); + int startpos = 0; + Val* predidx = LogValToRecordVal(vals, filter->itype, &startpos); + Ref(valval); + + if ( updated ) { + ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event); + } else { + ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); + } + + val_list vl( 2 + (filter->num_val_fields > 0) ); // 2 if we don't have values, 3 otherwise. + vl.append(ev); + vl.append(predidx); + if ( filter->num_val_fields > 0 ) + vl.append(valval); + + Val* v = filter->pred->Call(&vl); + bool result = v->AsBool(); + Unref(v); + + if ( result == false ) { + if ( !updated ) { + // throw away. Hence - we quit. And remove the entry from the current dictionary... + delete(filter->currDict->RemoveEntry(idxhash)); + return; + } else { + // keep old one + filter->currDict->Insert(idxhash, h); + return; + } + } + + } + + + //i->tab->Assign(idxval, valval); + HashKey* k = filter->tab->ComputeHash(idxval); + if ( !k ) { + reporter->InternalError("could not hash"); + return; + } + + filter->tab->Assign(idxval, k, valval); + + InputHash* ih = new InputHash(); + k = filter->tab->ComputeHash(idxval); + ih->idxkey = k; + ih->valhash = valhash; + //i->tab->Delete(k); + + filter->currDict->Insert(idxhash, ih); + + if ( filter->event ) { + EnumVal* ev; + Ref(idxval); + + if ( updated ) { // in case of update send back the old value. + assert ( filter->num_val_fields > 0 ); + ev = new EnumVal(BifEnum::Input::EVENT_CHANGED, BifType::Enum::Input::Event); + assert ( oldval != 0 ); + Ref(oldval); + SendEvent(filter->event, 3, ev, idxval, oldval); + } else { + ev = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); + Ref(valval); + if ( filter->num_val_fields == 0 ) { + SendEvent(filter->event, 3, ev, idxval); + } else { + SendEvent(filter->event, 3, ev, idxval, valval); + } + } + } +} + + +void Manager::EndCurrentSend(const InputReader* reader, int id) { + ReaderInfo *i = FindReader(reader); + if ( i == 0 ) { + reporter->InternalError("Unknown reader"); + return; + } + + assert(i->HasFilter(id)); + + if ( i->filters[id]->filter_type == EVENT_FILTER ) { + // nothing to do.. + return; + } + + assert(i->filters[id]->filter_type == TABLE_FILTER); + TableFilter* filter = (TableFilter*) i->filters[id]; + + // lastdict contains all deleted entries and should be empty apart from that + IterCookie *c = filter->lastDict->InitForIteration(); + filter->lastDict->MakeRobustCookie(c); + InputHash* ih; + HashKey *lastDictIdxKey; + //while ( ( ih = i->lastDict->NextEntry(c) ) ) { + while ( ( ih = filter->lastDict->NextEntry(lastDictIdxKey, c) ) ) { + + ListVal * idx = 0; + Val *val = 0; + + if ( filter->pred || filter->event ) { + idx = filter->tab->RecoverIndex(ih->idxkey); + assert(idx != 0); + val = filter->tab->Lookup(idx); + assert(val != 0); + } + + if ( filter->pred ) { + + bool doBreak = false; + // ask predicate, if we want to expire this element... + + EnumVal* ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); + //Ref(idx); + int startpos = 0; + Val* predidx = ListValToRecordVal(idx, filter->itype, &startpos); + Ref(val); + + val_list vl(3); + vl.append(ev); + vl.append(predidx); + vl.append(val); + Val* v = filter->pred->Call(&vl); + bool result = v->AsBool(); + Unref(v); + + if ( result == false ) { + // Keep it. Hence - we quit and simply go to the next entry of lastDict + // ah well - and we have to add the entry to currDict... + filter->currDict->Insert(lastDictIdxKey, filter->lastDict->RemoveEntry(lastDictIdxKey)); + continue; + } + + + } + + if ( filter->event ) { + Ref(idx); + Ref(val); + EnumVal *ev = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); + SendEvent(filter->event, 3, ev, idx, val); + } + + filter->tab->Delete(ih->idxkey); + filter->lastDict->Remove(lastDictIdxKey); // deletex in next line + delete(ih); + } + + filter->lastDict->Clear(); // should be empty->->-> but->->-> well->->-> who knows->->-> + delete(filter->lastDict); + + filter->lastDict = filter->currDict; + filter->currDict = new PDict(InputHash); +} + +void Manager::Put(const InputReader* reader, int id, const LogVal* const *vals) { + ReaderInfo *i = FindReader(reader); + if ( i == 0 ) { + reporter->InternalError("Unknown reader"); + return; + } + + if ( !i->HasFilter(id) ) { + reporter->InternalError("Unknown filter"); + return; + } + + if ( i->filters[id]->filter_type == TABLE_FILTER ) { + PutTable(reader, id, vals); + } else if ( i->filters[id]->filter_type == EVENT_FILTER ) { + EnumVal *type = new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event); + SendEventFilterEvent(reader, type, id, vals); + } else { + assert(false); + } + +} + +void Manager::SendEventFilterEvent(const InputReader* reader, EnumVal* type, int id, const LogVal* const *vals) { + ReaderInfo *i = FindReader(reader); + + bool updated = false; + + assert(i); + assert(i->HasFilter(id)); + + assert(i->filters[id]->filter_type == EVENT_FILTER); + EventFilter* filter = (EventFilter*) i->filters[id]; + + Val *val; + list out_vals; + // no tracking, send everything with a new event... + //out_vals.push_back(new EnumVal(BifEnum::Input::EVENT_NEW, BifType::Enum::Input::Event)); + out_vals.push_back(type); + + int position = 0; + if ( filter->want_record ) { + RecordVal * r = LogValToRecordVal(vals, filter->fields, &position); + out_vals.push_back(r); + } else { + for ( int j = 0; j < filter->fields->NumFields(); j++) { + Val* val = 0; + if ( filter->fields->FieldType(j)->Tag() == TYPE_RECORD ) { + val = LogValToRecordVal(vals, filter->fields->FieldType(j)->AsRecordType(), &position); + } else { + val = LogValToVal(vals[position], filter->fields->FieldType(j)); + position++; + } + out_vals.push_back(val); + } + } + + SendEvent(filter->event, out_vals); + +} + +void Manager::PutTable(const InputReader* reader, int id, const LogVal* const *vals) { + ReaderInfo *i = FindReader(reader); + + assert(i); + assert(i->HasFilter(id)); + + assert(i->filters[id]->filter_type == TABLE_FILTER); + TableFilter* filter = (TableFilter*) i->filters[id]; + + Val* idxval = LogValToIndexVal(filter->num_idx_fields, filter->itype, vals); + Val* valval; + + int position = filter->num_idx_fields; + if ( filter->num_val_fields == 0 ) { + valval = 0; + } else if ( filter->num_val_fields == 1 && !filter->want_record ) { + valval = LogValToVal(vals[filter->num_idx_fields], filter->rtype->FieldType(filter->num_idx_fields)); + } else { + valval = LogValToRecordVal(vals, filter->rtype, &position); + } + + filter->tab->Assign(idxval, valval); +} + +void Manager::Clear(const InputReader* reader, int id) { + ReaderInfo *i = FindReader(reader); + if ( i == 0 ) { + reporter->InternalError("Unknown reader"); + return; + } + + assert(i->HasFilter(id)); + + assert(i->filters[id]->filter_type == TABLE_FILTER); + TableFilter* filter = (TableFilter*) i->filters[id]; + + filter->tab->RemoveAll(); +} + +bool Manager::Delete(const InputReader* reader, int id, const LogVal* const *vals) { + ReaderInfo *i = FindReader(reader); + if ( i == 0 ) { + reporter->InternalError("Unknown reader"); + return false; + } + + assert(i->HasFilter(id)); + + if ( i->filters[id]->filter_type == TABLE_FILTER ) { + TableFilter* filter = (TableFilter*) i->filters[id]; + Val* idxval = LogValToIndexVal(filter->num_idx_fields, filter->itype, vals); + return( filter->tab->Delete(idxval) != 0 ); + } else if ( i->filters[id]->filter_type == EVENT_FILTER ) { + EnumVal *type = new EnumVal(BifEnum::Input::EVENT_REMOVED, BifType::Enum::Input::Event); + SendEventFilterEvent(reader, type, id, vals); + return true; + } else { + assert(false); + return false; + } +} + +void Manager::Error(InputReader* reader, const char* msg) +{ + reporter->Error("error with input reader for %s: %s", reader->Source().c_str(), msg); +} + +bool Manager::SendEvent(const string& name, const int num_vals, const LogVal* const *vals) +{ + EventHandler* handler = event_registry->Lookup(name.c_str()); + if ( handler == 0 ) { + reporter->Error("Event %s not found", name.c_str()); + return false; + } + + RecordType *type = handler->FType()->Args(); + int num_event_vals = type->NumFields(); + if ( num_vals != num_event_vals ) { + reporter->Error("Wrong number of values for event %s", name.c_str()); + return false; + } + + val_list* vl = new val_list; + for ( int i = 0; i < num_vals; i++) { + vl->append(LogValToVal(vals[i], type->FieldType(i))); + } + + mgr.Dispatch(new Event(handler, vl)); + + return true; +} + +void Manager::SendEvent(EventHandlerPtr ev, const int numvals, ...) +{ + val_list* vl = new val_list; + + va_list lP; + va_start(lP, numvals); + for ( int i = 0; i < numvals; i++ ) + { + vl->append( va_arg(lP, Val*) ); + } + va_end(lP); + + mgr.QueueEvent(ev, vl, SOURCE_LOCAL); +} + +void Manager::SendEvent(EventHandlerPtr ev, list events) +{ + val_list* vl = new val_list; + + for ( list::iterator i = events.begin(); i != events.end(); i++ ) { + vl->append( *i ); + } + + mgr.QueueEvent(ev, vl, SOURCE_LOCAL); +} + + +RecordVal* Manager::ListValToRecordVal(ListVal* list, RecordType *request_type, int* position) { + RecordVal* rec = new RecordVal(request_type->AsRecordType()); + + int maxpos = list->Length(); + + for ( int i = 0; i < request_type->NumFields(); i++ ) { + assert ( (*position) <= maxpos ); + + Val* fieldVal = 0; + if ( request_type->FieldType(i)->Tag() == TYPE_RECORD ) { + fieldVal = ListValToRecordVal(list, request_type->FieldType(i)->AsRecordType(), position); + } else { + fieldVal = list->Index(*position); + (*position)++; + } + + rec->Assign(i, fieldVal); + } + + return rec; +} + + + +RecordVal* Manager::LogValToRecordVal(const LogVal* const *vals, RecordType *request_type, int* position) { + if ( position == 0 ) { + reporter->InternalError("Need position"); + return 0; + } + + /* + if ( request_type->Tag() != TYPE_RECORD ) { + reporter->InternalError("I only work with records"); + return 0; + } */ + + + RecordVal* rec = new RecordVal(request_type->AsRecordType()); + for ( int i = 0; i < request_type->NumFields(); i++ ) { + + Val* fieldVal = 0; + if ( request_type->FieldType(i)->Tag() == TYPE_RECORD ) { + fieldVal = LogValToRecordVal(vals, request_type->FieldType(i)->AsRecordType(), position); + } else { + fieldVal = LogValToVal(vals[*position], request_type->FieldType(i)); + (*position)++; + } + + rec->Assign(i, fieldVal); + } + + return rec; + +} + + +int Manager::GetLogValLength(const LogVal* val) { + int length = 0; + + switch (val->type) { + case TYPE_BOOL: + case TYPE_INT: + length += sizeof(val->val.int_val); + break; + + case TYPE_COUNT: + case TYPE_COUNTER: + length += sizeof(val->val.uint_val); + break; + + case TYPE_PORT: + length += sizeof(val->val.port_val.port); + length += sizeof(val->val.port_val.proto); + break; + + case TYPE_DOUBLE: + case TYPE_TIME: + case TYPE_INTERVAL: + length += sizeof(val->val.double_val); + break; + + case TYPE_STRING: + case TYPE_ENUM: + { + length += val->val.string_val->size(); + break; + } + + case TYPE_ADDR: + length += NUM_ADDR_WORDS*sizeof(uint32_t); + break; + + case TYPE_SUBNET: + length += sizeof(val->val.subnet_val.width); + length += sizeof(val->val.subnet_val.net); + break; + + case TYPE_TABLE: { + for ( int i = 0; i < val->val.set_val.size; i++ ) { + length += GetLogValLength(val->val.set_val.vals[i]); + } + break; + } + + case TYPE_VECTOR: { + int j = val->val.vector_val.size; + for ( int i = 0; i < j; i++ ) { + length += GetLogValLength(val->val.vector_val.vals[i]); + } + break; + } + + default: + reporter->InternalError("unsupported type %d for GetLogValLength", val->type); + } + + return length; + +} + +int Manager::CopyLogVal(char *data, const int startpos, const LogVal* val) { + switch ( val->type ) { + case TYPE_BOOL: + case TYPE_INT: + //reporter->Error("Adding field content to pos %d: %lld", val->val.int_val, startpos); + memcpy(data+startpos, (const void*) &(val->val.int_val), sizeof(val->val.int_val)); + //*(data+startpos) = val->val.int_val; + return sizeof(val->val.int_val); + break; + + case TYPE_COUNT: + case TYPE_COUNTER: + //*(data+startpos) = val->val.uint_val; + memcpy(data+startpos, (const void*) &(val->val.uint_val), sizeof(val->val.uint_val)); + return sizeof(val->val.uint_val); + break; + + case TYPE_PORT: { + int length = 0; + memcpy(data+startpos, (const void*) &(val->val.port_val.port), sizeof(val->val.port_val.port)); + length += sizeof(val->val.port_val.port); + memcpy(data+startpos, (const void*) &(val->val.port_val.proto), sizeof(val->val.port_val.proto)); + length += sizeof(val->val.port_val.proto); + return length; + break; + } + + + case TYPE_DOUBLE: + case TYPE_TIME: + case TYPE_INTERVAL: + //*(data+startpos) = val->val.double_val; + memcpy(data+startpos, (const void*) &(val->val.double_val), sizeof(val->val.double_val)); + return sizeof(val->val.double_val); + break; + + case TYPE_STRING: + case TYPE_ENUM: + { + memcpy(data+startpos, val->val.string_val->c_str(), val->val.string_val->length()); + return val->val.string_val->size(); + break; + } + + case TYPE_ADDR: + memcpy(data+startpos, val->val.addr_val, NUM_ADDR_WORDS*sizeof(uint32_t)); + return NUM_ADDR_WORDS*sizeof(uint32_t); + break; + + case TYPE_SUBNET: { + int length = 0; + memcpy(data+startpos,(const char*) &(val->val.subnet_val.width), sizeof(val->val.subnet_val.width) ); + length += sizeof(val->val.subnet_val.width); + memcpy(data+startpos, (const char*) &(val->val.subnet_val.net), sizeof(val->val.subnet_val.net) ); + length += sizeof(val->val.subnet_val.net); + return length; + break; + } + + case TYPE_TABLE: { + int length = 0; + for ( int i = 0; i < val->val.set_val.size; i++ ) { + length += CopyLogVal(data, startpos+length, val->val.set_val.vals[i]); + } + return length; + break; + } + + case TYPE_VECTOR: { + int length = 0; + int j = val->val.vector_val.size; + for ( int i = 0; i < j; i++ ) { + length += CopyLogVal(data, startpos+length, val->val.vector_val.vals[i]); + } + return length; + break; + } + + default: + reporter->InternalError("unsupported type %d for CopyLogVal", val->type); + return 0; + } + + reporter->InternalError("internal error"); + return 0; + +} + +HashKey* Manager::HashLogVals(const int num_elements, const LogVal* const *vals) { + int length = 0; + + for ( int i = 0; i < num_elements; i++ ) { + const LogVal* val = vals[i]; + length += GetLogValLength(val); + } + + //reporter->Error("Length: %d", length); + + int position = 0; + char *data = (char*) malloc(length); + if ( data == 0 ) { + reporter->InternalError("Could not malloc?"); + } + for ( int i = 0; i < num_elements; i++ ) { + const LogVal* val = vals[i]; + position += CopyLogVal(data, position, val); + } + + assert(position == length); + return new HashKey(data, length); + + +} + +Val* Manager::LogValToVal(const LogVal* val, BroType* request_type) { + + if ( request_type->Tag() != TYPE_ANY && request_type->Tag() != val->type ) { + reporter->InternalError("Typetags don't match: %d vs %d", request_type->Tag(), val->type); + return 0; + } + + if ( !val->present ) { + return 0; // unset field + } + + + switch ( val->type ) { + case TYPE_BOOL: + case TYPE_INT: + return new Val(val->val.int_val, val->type); + break; + + case TYPE_COUNT: + case TYPE_COUNTER: + return new Val(val->val.uint_val, val->type); + break; + + case TYPE_DOUBLE: + case TYPE_TIME: + case TYPE_INTERVAL: + return new Val(val->val.double_val, val->type); + break; + + case TYPE_STRING: + { + BroString *s = new BroString(*(val->val.string_val)); + return new StringVal(s); + break; + } + + case TYPE_PORT: + return new PortVal(val->val.port_val.port, val->val.port_val.proto); + break; + + case TYPE_ADDR: + return new AddrVal(val->val.addr_val); + break; + + case TYPE_SUBNET: + return new SubNetVal(val->val.subnet_val.net, val->val.subnet_val.width); + break; + + case TYPE_TABLE: { + // all entries have to have the same type... + BroType* type = request_type->AsTableType()->Indices()->PureType(); + TypeList* set_index = new TypeList(type->Ref()); + set_index->Append(type->Ref()); + SetType* s = new SetType(set_index, 0); + TableVal* t = new TableVal(s); + for ( int i = 0; i < val->val.set_val.size; i++ ) { + t->Assign(LogValToVal( val->val.set_val.vals[i], type ), 0); + } + return t; + break; + } + + case TYPE_VECTOR: { + // all entries have to have the same type... + BroType* type = request_type->AsVectorType()->YieldType(); + VectorType* vt = new VectorType(type->Ref()); + VectorVal* v = new VectorVal(vt); + for ( int i = 0; i < val->val.vector_val.size; i++ ) { + v->Assign(i, LogValToVal( val->val.set_val.vals[i], type ), 0); + } + return v; + + } + + case TYPE_ENUM: { + // well, this is kind of stupid, because EnumType just mangles the module name and the var name together again... + // but well + string module = extract_module_name(val->val.string_val->c_str()); + string var = extract_var_name(val->val.string_val->c_str()); + bro_int_t index = request_type->AsEnumType()->Lookup(module, var.c_str()); + if ( index == -1 ) { + reporter->InternalError("Value not found in enum mappimg. Module: %s, var: %s", module.c_str(), var.c_str()); + } + return new EnumVal(index, request_type->Ref()->AsEnumType() ); + break; + } + + + default: + reporter->InternalError("unsupported type for input_read"); + } + + + reporter->InternalError("Impossible error"); + return NULL; +} + +Manager::ReaderInfo* Manager::FindReader(const InputReader* reader) + { + for ( vector::iterator s = readers.begin(); s != readers.end(); ++s ) + { + if ( (*s)->reader && (*s)->reader == reader ) + { + return *s; + } + } + + return 0; + } + + +Manager::ReaderInfo* Manager::FindReader(const EnumVal* id) + { + for ( vector::iterator s = readers.begin(); s != readers.end(); ++s ) + { + if ( (*s)->id && (*s)->id->AsEnum() == id->AsEnum() ) + { + return *s; + } + } + + return 0; + } + + +string Manager::Hash(const string &input) { + unsigned char digest[16]; + hash_md5(input.length(), (const unsigned char*) input.c_str(), digest); + string out((const char*) digest, 16); + return out; +} + + diff --git a/src/input/Manager.h b/src/input/Manager.h new file mode 100644 index 0000000000..fe37efa08b --- /dev/null +++ b/src/input/Manager.h @@ -0,0 +1,90 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#ifndef INPUT_MANAGER_H +#define INPUT_MANAGER_H + +#include "../BroString.h" + +#include "../Val.h" +#include "../EventHandler.h" +#include "../RemoteSerializer.h" + +#include + +namespace input { + +class ReaderFrontend; + +class Manager { +public: + Manager(); + + ReaderFrontend* CreateStream(EnumVal* id, RecordVal* description); + bool ForceUpdate(const EnumVal* id); + bool RemoveStream(const EnumVal* id); + + bool AddTableFilter(EnumVal *id, RecordVal* filter); + bool RemoveTableFilter(EnumVal* id, const string &name); + + bool AddEventFilter(EnumVal *id, RecordVal* filter); + bool RemoveEventFilter(EnumVal* id, const string &name); + +protected: + + // Reports an error for the given reader. + void Error(ReaderFrontend* reader, const char* msg); + + // for readers to write to input stream in direct mode (reporting new/deleted values directly) + void Put(const ReaderFrontend* reader, int id, const threading::Value* const *vals); + void Clear(const ReaderFrontend* reader, int id); + bool Delete(const ReaderFrontend* reader, int id, const threading::Value* const *vals); + + // for readers to write to input stream in indirect mode (manager is monitoring new/deleted values) + void SendEntry(const ReaderFrontend* reader, int id, const threading::Value* const *vals); + void EndCurrentSend(const ReaderFrontend* reader, int id); + +private: + struct ReaderInfo; + + void SendEntryTable(const ReaderFrontend* reader, int id, const threading::Value* const *vals); + void PutTable(const ReaderFrontend* reader, int id, const threading::Value* const *vals); + void SendEventFilterEvent(const ReaderFrontend* reader, EnumVal* type, int id, const threading::Value* const *vals); + + bool IsCompatibleType(BroType* t, bool atomic_only=false); + + bool UnrollRecordType(vector *fields, const RecordType *rec, const string& nameprepend); + + void SendEvent(EventHandlerPtr ev, const int numvals, ...); + void SendEvent(EventHandlerPtr ev, list events); + bool SendEvent(const string& name, const int num_vals, const threading::Value* const *vals); + + HashKey* HashValues(const int num_elements, const threading::Value* const *vals); + int GetValueLength(const threading::Value* val); + int CopyValue(char *data, const int startpos, const threading::Value* val); + + Val* ValueToVal(const threading::Value* val, BroType* request_type); + Val* ValueToIndexVal(int num_fields, const RecordType* type, const threading::Value* const *vals); + RecordVal* ValueToRecordVal(const threading::Value* const *vals, RecordType *request_type, int* position); + RecordVal* ListValToRecordVal(ListVal* list, RecordType *request_type, int* position); + + ReaderInfo* FindReader(const ReaderFrontend* reader); + ReaderInfo* FindReader(const EnumVal* id); + + vector readers; + + string Hash(const string &input); + + class Filter; + class TableFilter; + class EventFilter; + + enum FilterType { TABLE_FILTER, EVENT_FILTER }; +}; + + +} + +extern input::Manager* input_mgr; + + +#endif /* INPUT_MANAGER_H */ diff --git a/src/input/ReaderBackend.cc b/src/input/ReaderBackend.cc new file mode 100644 index 0000000000..b2bcedb2ad --- /dev/null +++ b/src/input/ReaderBackend.cc @@ -0,0 +1,124 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "InputReader.h" + +using threading::Value; +using threading::Field; + +namespace logging { + +InputReader::InputReader(ReaderFrontend *arg_frontend) :MsgThread() +{ + buf = 0; + buf_len = 1024; + disabled = true; // disabled will be set correcty in init. + + frontend = arg_frontend; + + SetName(frontend->Name()); +} + +InputReader::~InputReader() +{ + +} + +void InputReader::Error(const char *msg) +{ + input_mgr->Error(this, msg); +} + +void InputReader::Error(const string &msg) +{ + input_mgr->Error(this, msg.c_str()); +} + +void InputReader::Put(int id, const LogVal* const *val) +{ + input_mgr->Put(this, id, val); +} + +void InputReader::Clear(int id) +{ + input_mgr->Clear(this, id); +} + +void InputReader::Delete(int id, const LogVal* const *val) +{ + input_mgr->Delete(this, id, val); +} + + +bool InputReader::Init(string arg_source) +{ + source = arg_source; + + // disable if DoInit returns error. + disabled = !DoInit(arg_source); + return !disabled; +} + +bool InputReader::AddFilter(int id, int arg_num_fields, + const LogField* const * arg_fields) +{ + return DoAddFilter(id, arg_num_fields, arg_fields); +} + +bool InputReader::RemoveFilter(int id) +{ + return DoRemoveFilter(id); +} + +void InputReader::Finish() +{ + DoFinish(); + disabled = true; +} + +bool InputReader::Update() +{ + return DoUpdate(); +} + +bool InputReader::SendEvent(const string& name, const int num_vals, const LogVal* const *vals) +{ + return input_mgr->SendEvent(name, num_vals, vals); +} + +// stolen from logwriter +const char* InputReader::Fmt(const char* format, ...) + { + if ( ! buf ) + buf = (char*) malloc(buf_len); + + va_list al; + va_start(al, format); + int n = safe_vsnprintf(buf, buf_len, format, al); + va_end(al); + + if ( (unsigned int) n >= buf_len ) + { // Not enough room, grow the buffer. + buf_len = n + 32; + buf = (char*) realloc(buf, buf_len); + + // Is it portable to restart? + va_start(al, format); + n = safe_vsnprintf(buf, buf_len, format, al); + va_end(al); + } + + return buf; + } + + +void InputReader::SendEntry(int id, const LogVal* const *vals) +{ + input_mgr->SendEntry(this, id, vals); +} + +void InputReader::EndCurrentSend(int id) +{ + input_mgr->EndCurrentSend(this, id); +} + +} diff --git a/src/input/ReaderBackend.h b/src/input/ReaderBackend.h new file mode 100644 index 0000000000..7d2640b4fd --- /dev/null +++ b/src/input/ReaderBackend.h @@ -0,0 +1,86 @@ +// See the file "COPYING" in the main distribution directory for copyright. +// +// Same notes about thread safety as in LogWriter.h apply. + + +#ifndef INPUT_READERBACKEND_H +#define INPUT_READERBACKEND_H + +#include "InputMgr.h" +#include "BroString.h" +#include "LogMgr.h" + +namespace input { + +class ReaderBackend : public threading::MsgThread { +public: + ReaderBackend(ReaderFrontend *frontend); + + virtual ~ReaderBackend(); + + bool Init(string arg_source); + + bool AddFilter( int id, int arg_num_fields, const LogField* const* fields ); + + bool RemoveFilter ( int id ); + + void Finish(); + + bool Update(); + +protected: + // Methods that have to be overwritten by the individual readers + virtual bool DoInit(string arg_sources) = 0; + + virtual bool DoAddFilter( int id, int arg_num_fields, const LogField* const* fields ) = 0; + + virtual bool DoRemoveFilter( int id ) = 0; + + virtual void DoFinish() = 0; + + // update file contents to logmgr + virtual bool DoUpdate() = 0; + + // Reports an error to the user. + void Error(const string &msg); + void Error(const char *msg); + + // The following methods return the information as passed to Init(). + const string Source() const { return source; } + + // A thread-safe version of fmt(). (stolen from logwriter) + const char* Fmt(const char* format, ...); + + bool SendEvent(const string& name, const int num_vals, const LogVal* const *vals); + + // Content-sendinf-functions (simple mode). Including table-specific stuff that simply is not used if we have no table + void Put(int id, const LogVal* const *val); + void Delete(int id, const LogVal* const *val); + void Clear(int id); + + // Table-functions (tracking mode): Only changed lines are propagated. + void SendEntry(int id, const LogVal* const *vals); + void EndCurrentSend(int id); + + +private: + // Frontend that instantiated us. This object must not be access from + // this class, it's running in a different thread! + ReaderFrontend* frontend; + + string source; + + // When an error occurs, this method is called to set a flag marking the + // writer as disabled. + + bool disabled; + bool Disabled() { return disabled; } + + // For implementing Fmt(). + char* buf; + unsigned int buf_len; +}; + +} + +#endif /* INPUT_READERBACKEND_H */ diff --git a/src/input/ReaderFrontend.cc b/src/input/ReaderFrontend.cc new file mode 100644 index 0000000000..44638d90b3 --- /dev/null +++ b/src/input/ReaderFrontend.cc @@ -0,0 +1,28 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#ifndef INPUT_READERFRONTEND_H +#define INPUT_READERFRONTEND_H + +#include "Manager.h" + +#include "threading/MsgThread.h" + +namespace logging { + +class ReaderBackend; + +class ReaderFrontend { + + ReaderFrontend(bro_int_t type); + + virtual ~ReaderFrontend(); + + +protected: + friend class Manager; +}; + +} + +#endif /* INPUT_READERFRONTEND_H */ + diff --git a/src/input/ReaderFrontend.h b/src/input/ReaderFrontend.h new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/input/readers/Ascii.cc b/src/input/readers/Ascii.cc new file mode 100644 index 0000000000..257cb4cf71 --- /dev/null +++ b/src/input/readers/Ascii.cc @@ -0,0 +1,457 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "InputReaderAscii.h" +#include "DebugLogger.h" +#include "NetVar.h" + +#include + +FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, int arg_position) + : name(arg_name), type(arg_type) +{ + position = arg_position; + secondary_position = -1; +} + +FieldMapping::FieldMapping(const string& arg_name, const TypeTag& arg_type, const TypeTag& arg_subtype, int arg_position) + : name(arg_name), type(arg_type), subtype(arg_subtype) +{ + position = arg_position; + secondary_position = -1; +} + +FieldMapping::FieldMapping(const FieldMapping& arg) + : name(arg.name), type(arg.type), subtype(arg.subtype) +{ + position = arg.position; + secondary_position = arg.secondary_position; +} + +FieldMapping FieldMapping::subType() { + return FieldMapping(name, subtype, position); +} + +InputReaderAscii::InputReaderAscii() +{ + file = 0; + + //keyMap = new map(); + + separator.assign( (const char*) BifConst::InputAscii::separator->Bytes(), BifConst::InputAscii::separator->Len()); + if ( separator.size() != 1 ) { + Error("separator length has to be 1. Separator will be truncated."); + } + + set_separator.assign( (const char*) BifConst::InputAscii::set_separator->Bytes(), BifConst::InputAscii::set_separator->Len()); + if ( set_separator.size() != 1 ) { + Error("set_separator length has to be 1. Separator will be truncated."); + } + + empty_field.assign( (const char*) BifConst::InputAscii::empty_field->Bytes(), BifConst::InputAscii::empty_field->Len()); + + unset_field.assign( (const char*) BifConst::InputAscii::unset_field->Bytes(), BifConst::InputAscii::unset_field->Len()); + +} + +InputReaderAscii::~InputReaderAscii() +{ + DoFinish(); + +} + +void InputReaderAscii::DoFinish() +{ + filters.empty(); + if ( file != 0 ) { + file->close(); + delete(file); + file = 0; + } +} + +bool InputReaderAscii::DoInit(string path) +{ + fname = path; + + file = new ifstream(path.c_str()); + if ( !file->is_open() ) { + Error(Fmt("cannot open %s", fname.c_str())); + return false; + } + + return true; +} + +bool InputReaderAscii::DoAddFilter( int id, int arg_num_fields, const LogField* const* fields ) { + if ( HasFilter(id) ) { + return false; // no, we don't want to add this a second time + } + + Filter f; + f.num_fields = arg_num_fields; + f.fields = fields; + + filters[id] = f; + + return true; +} + +bool InputReaderAscii::DoRemoveFilter ( int id ) { + if (!HasFilter(id) ) { + return false; + } + + assert ( filters.erase(id) == 1 ); + + return true; +} + + +bool InputReaderAscii::HasFilter(int id) { + map::iterator it = filters.find(id); + if ( it == filters.end() ) { + return false; + } + return true; +} + + +bool InputReaderAscii::ReadHeader() { + // try to read the header line... + string line; + if ( !GetLine(line) ) { + Error("could not read first line"); + return false; + } + + map fields; + + // construcr list of field names. + istringstream splitstream(line); + int pos=0; + while ( splitstream ) { + string s; + if ( !getline(splitstream, s, separator[0])) + break; + + fields[s] = pos; + pos++; + } + + + for ( map::iterator it = filters.begin(); it != filters.end(); it++ ) { + + for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) { + const LogField* field = (*it).second.fields[i]; + + map::iterator fit = fields.find(field->name); + if ( fit == fields.end() ) { + Error(Fmt("Did not find requested field %s in input data file.", field->name.c_str())); + return false; + } + + + FieldMapping f(field->name, field->type, field->subtype, fields[field->name]); + if ( field->secondary_name != "" ) { + map::iterator fit2 = fields.find(field->secondary_name); + if ( fit2 == fields.end() ) { + Error(Fmt("Could not find requested port type field %s in input data file.", field->secondary_name.c_str())); + return false; + } + f.secondary_position = fields[field->secondary_name]; + } + (*it).second.columnMap.push_back(f); + } + + } + + // well, that seems to have worked... + return true; +} + +bool InputReaderAscii::GetLine(string& str) { + while ( getline(*file, str) ) { + if ( str[0] != '#' ) { + return true; + } + + if ( str.compare(0,8, "#fields\t") == 0 ) { + str = str.substr(8); + return true; + } + } + + return false; +} + +TransportProto InputReaderAscii::StringToProto(const string &proto) { + if ( proto == "unknown" ) { + return TRANSPORT_UNKNOWN; + } else if ( proto == "tcp" ) { + return TRANSPORT_TCP; + } else if ( proto == "udp" ) { + return TRANSPORT_UDP; + } else if ( proto == "icmp" ) { + return TRANSPORT_ICMP; + } + + //assert(false); + + reporter->Error("Tried to parse invalid/unknown protocol: %s", proto.c_str()); + + return TRANSPORT_UNKNOWN; +} + +LogVal* InputReaderAscii::EntryToVal(string s, FieldMapping field) { + + LogVal* val = new LogVal(field.type, true); + + if ( s.compare(unset_field) == 0 ) { // field is not set... + return new LogVal(field.type, false); + } + + switch ( field.type ) { + case TYPE_ENUM: + case TYPE_STRING: + val->val.string_val = new string(s); + break; + + case TYPE_BOOL: + if ( s == "T" ) { + val->val.int_val = 1; + } else if ( s == "F" ) { + val->val.int_val = 0; + } else { + Error(Fmt("Invalid value for boolean: %s", s.c_str())); + return false; + } + break; + + case TYPE_INT: + val->val.int_val = atoi(s.c_str()); + break; + + case TYPE_DOUBLE: + case TYPE_TIME: + case TYPE_INTERVAL: + val->val.double_val = atof(s.c_str()); + break; + + case TYPE_COUNT: + case TYPE_COUNTER: + val->val.uint_val = atoi(s.c_str()); + break; + + case TYPE_PORT: + val->val.port_val.port = atoi(s.c_str()); + val->val.port_val.proto = TRANSPORT_UNKNOWN; + break; + + case TYPE_SUBNET: { + int pos = s.find("/"); + string width = s.substr(pos+1); + val->val.subnet_val.width = atoi(width.c_str()); + string addr = s.substr(0, pos); + s = addr; + // NOTE: dotted_to_addr BREAKS THREAD SAFETY! it uses reporter. + // Solve this some other time.... +#ifdef BROv6 + if ( s.find(':') != s.npos ) { + uint32* addr = dotted_to_addr6(s.c_str()); + copy_addr(val->val.subnet_val.net, addr); + delete addr; + } else { + val->val.subnet_val.net[0] = val->val.subnet_val.net[1] = val->val.subnet_val.net[2] = 0; + val->val.subnet_val.net[3] = dotted_to_addr(s.c_str()); + } +#else + val->val.subnet_val.net = dotted_to_addr(s.c_str()); +#endif + break; + + } + case TYPE_ADDR: { + // NOTE: dottet_to_addr BREAKS THREAD SAFETY! it uses reporter. + // Solve this some other time.... +#ifdef BROv6 + if ( s.find(':') != s.npos ) { + uint32* addr = dotted_to_addr6(s.c_str()); + copy_addr(val->val.addr_val, addr); + delete addr; + } else { + val->val.addr_val[0] = val->val.addr_val[1] = val->val.addr_val[2] = 0; + val->val.addr_val[3] = dotted_to_addr(s.c_str()); + } +#else + uint32 t = dotted_to_addr(s.c_str()); + copy_addr(&t, val->val.addr_val); +#endif + break; + } + + case TYPE_TABLE: + case TYPE_VECTOR: + // First - common initialization + // Then - initialization for table. + // Then - initialization for vector. + // Then - common stuff + { + // how many entries do we have... + unsigned int length = 1; + for ( unsigned int i = 0; i < s.size(); i++ ) + if ( s[i] == ',') length++; + + unsigned int pos = 0; + + if ( s.compare(empty_field) == 0 ) + length = 0; + + LogVal** lvals = new LogVal* [length]; + + if ( field.type == TYPE_TABLE ) { + val->val.set_val.vals = lvals; + val->val.set_val.size = length; + } else if ( field.type == TYPE_VECTOR ) { + val->val.vector_val.vals = lvals; + val->val.vector_val.size = length; + } else { + assert(false); + } + + if ( length == 0 ) + break; //empty + + istringstream splitstream(s); + while ( splitstream ) { + string element; + + if ( !getline(splitstream, element, set_separator[0]) ) + break; + + if ( pos >= length ) { + Error(Fmt("Internal error while parsing set. pos %d >= length %d. Element: %s", pos, length, element.c_str())); + break; + } + + LogVal* newval = EntryToVal(element, field.subType()); + if ( newval == 0 ) { + Error("Error while reading set"); + return 0; + } + lvals[pos] = newval; + + pos++; + + } + + + if ( pos != length ) { + Error("Internal error while parsing set: did not find all elements"); + return 0; + } + + break; + } + + + default: + Error(Fmt("unsupported field format %d for %s", field.type, + field.name.c_str())); + return 0; + } + + return val; + +} + +// read the entire file and send appropriate thingies back to InputMgr +bool InputReaderAscii::DoUpdate() { + + // dirty, fix me. (well, apparently after trying seeking, etc - this is not that bad) + if ( file && file->is_open() ) { + file->close(); + } + file = new ifstream(fname.c_str()); + if ( !file->is_open() ) { + Error(Fmt("cannot open %s", fname.c_str())); + return false; + } + // + + // file->seekg(0, ios::beg); // do not forget clear. + + + if ( ReadHeader() == false ) { + return false; + } + + string line; + while ( GetLine(line ) ) { + // split on tabs + istringstream splitstream(line); + + map stringfields; + int pos = 0; + while ( splitstream ) { + string s; + if ( !getline(splitstream, s, separator[0]) ) + break; + + stringfields[pos] = s; + pos++; + } + + pos--; // for easy comparisons of max element. + + for ( map::iterator it = filters.begin(); it != filters.end(); it++ ) { + + LogVal** fields = new LogVal*[(*it).second.num_fields]; + + int fpos = 0; + for ( vector::iterator fit = (*it).second.columnMap.begin(); + fit != (*it).second.columnMap.end(); + fit++ ){ + + if ( (*fit).position > pos || (*fit).secondary_position > pos ) { + Error(Fmt("Not enough fields in line %s. Found %d fields, want positions %d and %d", line.c_str(), pos, (*fit).position, (*fit).secondary_position)); + return false; + } + + LogVal* val = EntryToVal(stringfields[(*fit).position], *fit); + if ( val == 0 ) { + return false; + } + + if ( (*fit).secondary_position != -1 ) { + // we have a port definition :) + assert(val->type == TYPE_PORT ); + // Error(Fmt("Got type %d != PORT with secondary position!", val->type)); + + val->val.port_val.proto = StringToProto(stringfields[(*fit).secondary_position]); + } + + fields[fpos] = val; + + fpos++; + } + + assert ( (unsigned int) fpos == (*it).second.num_fields ); + + SendEntry((*it).first, fields); + + for ( unsigned int i = 0; i < (*it).second.num_fields; i++ ) { + delete fields[i]; + } + delete [] fields; + } + + } + + //file->clear(); // remove end of file evil bits + //file->seekg(0, ios::beg); // and seek to start. + + for ( map::iterator it = filters.begin(); it != filters.end(); it++ ) { + EndCurrentSend((*it).first); + } + return true; +} diff --git a/src/input/readers/Ascii.h b/src/input/readers/Ascii.h new file mode 100644 index 0000000000..1747f983e4 --- /dev/null +++ b/src/input/readers/Ascii.h @@ -0,0 +1,88 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#ifndef INPUTREADERASCII_H +#define INPUTREADERASCII_H + +#include "InputReader.h" +#include +#include +#include + +// Description for input field mapping +struct FieldMapping { + string name; + TypeTag type; + // internal type for sets and vectors + TypeTag subtype; + int position; + // for ports: pos of the second field + int secondary_position; + + FieldMapping(const string& arg_name, const TypeTag& arg_type, int arg_position); + FieldMapping(const string& arg_name, const TypeTag& arg_type, const TypeTag& arg_subtype, int arg_position); + FieldMapping(const FieldMapping& arg); + FieldMapping() { position = -1; secondary_position = -1; } + + FieldMapping subType(); + //bool IsEmpty() { return position == -1; } +}; + + +class InputReaderAscii : public InputReader { +public: + InputReaderAscii(); + ~InputReaderAscii(); + + static InputReader* Instantiate() { return new InputReaderAscii; } + +protected: + + virtual bool DoInit(string path); + + virtual bool DoAddFilter( int id, int arg_num_fields, const LogField* const* fields ); + + virtual bool DoRemoveFilter ( int id ); + + virtual void DoFinish(); + + virtual bool DoUpdate(); + +private: + + struct Filter { + unsigned int num_fields; + + const LogField* const * fields; // raw mapping + + // map columns in the file to columns to send back to the manager + vector columnMap; + + }; + + bool HasFilter(int id); + + TransportProto StringToProto(const string &proto); + + bool ReadHeader(); + LogVal* EntryToVal(string s, FieldMapping type); + + bool GetLine(string& str); + + ifstream* file; + string fname; + + map filters; + + // Options set from the script-level. + string separator; + + string set_separator; + + string empty_field; + + string unset_field; + +}; + + +#endif /* INPUTREADERASCII_H */ diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 2333d6c612..add10b3f10 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -850,7 +850,8 @@ threading::Value* Manager::ValToLogVal(Val* val, BroType* ty) break; case TYPE_PORT: - lval->val.uint_val = val->AsPortVal()->Port(); + lval->val.port_val.port = val->AsPortVal()->Port(); + lval->val.port_val.proto = val->AsPortVal()->PortType(); break; case TYPE_SUBNET: diff --git a/src/logging/writers/Ascii.cc b/src/logging/writers/Ascii.cc index fc6832afea..062a4f74f7 100644 --- a/src/logging/writers/Ascii.cc +++ b/src/logging/writers/Ascii.cc @@ -169,10 +169,13 @@ bool Ascii::DoWriteOne(ODesc* desc, Value* val, const Field* field) case TYPE_COUNT: case TYPE_COUNTER: - case TYPE_PORT: desc->Add(val->val.uint_val); break; + case TYPE_PORT: + desc->Add(val->val.port_val.port); + break; + case TYPE_SUBNET: desc->Add(dotted_addr(val->val.subnet_val.net)); desc->Add("/"); diff --git a/src/main.cc b/src/main.cc index e224910db4..333f9e6770 100644 --- a/src/main.cc +++ b/src/main.cc @@ -52,6 +52,8 @@ extern "C" void OPENSSL_add_all_algorithms_conf(void); #include "logging/Manager.h" #include "logging/writers/Ascii.h" +#include "input/Manager.h" + #include "binpac_bro.h" Brofiler brofiler; @@ -79,6 +81,7 @@ DNS_Mgr* dns_mgr; TimerMgr* timer_mgr; logging::Manager* log_mgr = 0; threading::Manager* thread_mgr = 0; +input::Manager* input_mgr = 0; Stmt* stmts; EventHandlerPtr net_done = 0; RuleMatcher* rule_matcher = 0; @@ -743,6 +746,7 @@ int main(int argc, char** argv) remote_serializer = new RemoteSerializer(); event_registry = new EventRegistry(); log_mgr = new logging::Manager(); + input_mgr = new input::Manager(); if ( events_file ) event_player = new EventPlayer(events_file); diff --git a/src/parse.y b/src/parse.y index 1b05171ecf..2761c4c13b 100644 --- a/src/parse.y +++ b/src/parse.y @@ -2,7 +2,7 @@ // See the file "COPYING" in the main distribution directory for copyright. %} -%expect 87 +%expect 90 %token TOK_ADD TOK_ADD_TO TOK_ADDR TOK_ANY %token TOK_ATENDIF TOK_ATELSE TOK_ATIF TOK_ATIFDEF TOK_ATIFNDEF @@ -24,6 +24,7 @@ %token TOK_ATTR_PERSISTENT TOK_ATTR_SYNCHRONIZED %token TOK_ATTR_DISABLE_PRINT_HOOK TOK_ATTR_RAW_OUTPUT TOK_ATTR_MERGEABLE %token TOK_ATTR_PRIORITY TOK_ATTR_GROUP TOK_ATTR_LOG TOK_ATTR_ERROR_HANDLER +%token TOK_ATTR_TYPE_COLUMN %token TOK_DEBUG @@ -1312,6 +1313,8 @@ attr: { $$ = new Attr(ATTR_PRIORITY, $3); } | TOK_ATTR_GROUP '=' expr { $$ = new Attr(ATTR_GROUP, $3); } + | TOK_ATTR_TYPE_COLUMN '=' expr + { $$ = new Attr(ATTR_TYPE_COLUMN, $3); } | TOK_ATTR_LOG { $$ = new Attr(ATTR_LOG); } | TOK_ATTR_ERROR_HANDLER diff --git a/src/scan.l b/src/scan.l index 4914783c44..d836a8d626 100644 --- a/src/scan.l +++ b/src/scan.l @@ -315,6 +315,7 @@ when return TOK_WHEN; &optional return TOK_ATTR_OPTIONAL; &persistent return TOK_ATTR_PERSISTENT; &priority return TOK_ATTR_PRIORITY; +&type_column return TOK_ATTR_TYPE_COLUMN; &read_expire return TOK_ATTR_EXPIRE_READ; &redef return TOK_ATTR_REDEF; &rotate_interval return TOK_ATTR_ROTATE_INTERVAL; diff --git a/src/types.bif b/src/types.bif index 4657584a90..9256fe3bd0 100644 --- a/src/types.bif +++ b/src/types.bif @@ -168,4 +168,21 @@ enum ID %{ Unknown, %} +module Input; + +enum Reader %{ + READER_DEFAULT, + READER_ASCII, +%} + +enum Event %{ + EVENT_NEW, + EVENT_CHANGED, + EVENT_REMOVED, +%} + +enum ID %{ + Unknown, +%} + module GLOBAL; diff --git a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log index d43367f300..77d2f43e92 100644 --- a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log @@ -19,4 +19,8 @@ scripts/base/init-bare.bro scripts/base/frameworks/logging/./postprocessors/./scp.bro scripts/base/frameworks/logging/./postprocessors/./sftp.bro scripts/base/frameworks/logging/./writers/ascii.bro + scripts/base/frameworks/input/__load__.bro + scripts/base/frameworks/input/./main.bro + build/src/base/input.bif.bro + scripts/base/frameworks/input/./readers/ascii.bro scripts/policy/misc/loaded-scripts.bro diff --git a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log index 92deb62edb..354ce4d76c 100644 --- a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log @@ -19,6 +19,10 @@ scripts/base/init-bare.bro scripts/base/frameworks/logging/./postprocessors/./scp.bro scripts/base/frameworks/logging/./postprocessors/./sftp.bro scripts/base/frameworks/logging/./writers/ascii.bro + scripts/base/frameworks/input/__load__.bro + scripts/base/frameworks/input/./main.bro + build/src/base/input.bif.bro + scripts/base/frameworks/input/./readers/ascii.bro scripts/base/init-default.bro scripts/base/utils/site.bro scripts/base/utils/./patterns.bro diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.basic/out b/testing/btest/Baseline/scripts.base.frameworks.input.basic/out new file mode 100644 index 0000000000..ebac1866b6 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.basic/out @@ -0,0 +1,14 @@ +{ +[-42] = [b=T, e=SSH::LOG, c=21, p=123/unknown, sn=10.0.0.0/24, a=1.2.3.4, d=3.14, t=1315801931.273616, iv=100.0, s=hurz, sc={ +2, +4, +1, +3 +}, ss={ +CC, +AA, +BB +}, se={ + +}, vc=[10, 20, 30], ve=[]] +} diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.event/out b/testing/btest/Baseline/scripts.base.frameworks.input.event/out new file mode 100644 index 0000000000..e32a2aea00 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.event/out @@ -0,0 +1,21 @@ +Input::EVENT_NEW +1 +T +Input::EVENT_NEW +2 +T +Input::EVENT_NEW +3 +F +Input::EVENT_NEW +4 +F +Input::EVENT_NEW +5 +F +Input::EVENT_NEW +6 +F +Input::EVENT_NEW +7 +T diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.onecolumn-norecord/out b/testing/btest/Baseline/scripts.base.frameworks.input.onecolumn-norecord/out new file mode 100644 index 0000000000..bbce48f4f6 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.onecolumn-norecord/out @@ -0,0 +1,3 @@ +{ +[-42] = T +} diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.onecolumn-record/out b/testing/btest/Baseline/scripts.base.frameworks.input.onecolumn-record/out new file mode 100644 index 0000000000..3f9af35c59 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.onecolumn-record/out @@ -0,0 +1,3 @@ +{ +[-42] = [b=T] +} diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.port/out b/testing/btest/Baseline/scripts.base.frameworks.input.port/out new file mode 100644 index 0000000000..858551aa2f --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.port/out @@ -0,0 +1,3 @@ +[p=80/tcp] +[p=52/udp] +[p=30/unknown] diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.predicate/out b/testing/btest/Baseline/scripts.base.frameworks.input.predicate/out new file mode 100644 index 0000000000..d805f804d8 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.predicate/out @@ -0,0 +1,7 @@ +VALID +VALID +VALID +VALID +VALID +VALID +VALID diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.tableevent/out b/testing/btest/Baseline/scripts.base.frameworks.input.tableevent/out new file mode 100644 index 0000000000..e32a2aea00 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.tableevent/out @@ -0,0 +1,21 @@ +Input::EVENT_NEW +1 +T +Input::EVENT_NEW +2 +T +Input::EVENT_NEW +3 +F +Input::EVENT_NEW +4 +F +Input::EVENT_NEW +5 +F +Input::EVENT_NEW +6 +F +Input::EVENT_NEW +7 +T diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.twofilters/out b/testing/btest/Baseline/scripts.base.frameworks.input.twofilters/out new file mode 100644 index 0000000000..5b1ee5e983 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.twofilters/out @@ -0,0 +1,15 @@ +VALID +VALID +VALID +VALID +VALID +VALID +VALID +MARK +VALID +VALID +VALID +VALID +VALID +VALID +VALID diff --git a/testing/btest/btest.cfg b/testing/btest/btest.cfg index 2126e733e7..c0d3c1fd20 100644 --- a/testing/btest/btest.cfg +++ b/testing/btest/btest.cfg @@ -3,7 +3,7 @@ TestDirs = doc bifs language core scripts istate coverage TmpDir = %(testbase)s/.tmp BaselineDir = %(testbase)s/Baseline IgnoreDirs = .svn CVS .tmp -IgnoreFiles = *.tmp *.swp #* *.trace +IgnoreFiles = *.tmp *.swp #* *.trace .DS_Store [environment] BROPATH=`bash -c %(testbase)s/../../build/bro-path-dev` diff --git a/testing/btest/scripts/base/frameworks/input/basic.bro b/testing/btest/scripts/base/frameworks/input/basic.bro new file mode 100644 index 0000000000..d1b6659eb6 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/basic.bro @@ -0,0 +1,54 @@ +# +# @TEST-EXEC: bro %INPUT >out +# @TEST-EXEC: btest-diff out + +@TEST-START-FILE input.log +#separator \x09 +#path ssh +#fields b i e c p sn a d t iv s sc ss se vc ve f +#types bool int enum count port subnet addr double time interval string table table table vector vector func +T -42 SSH::LOG 21 123 10.0.0.0/24 1.2.3.4 3.14 1315801931.273616 100.000000 hurz 2,4,1,3 CC,AA,BB EMPTY 10,20,30 EMPTY SSH::foo\x0a{ \x0aif (0 < SSH::i) \x0a\x09return (Foo);\x0aelse\x0a\x09return (Bar);\x0a\x0a} +@TEST-END-FILE + +redef InputAscii::empty_field = "EMPTY"; + +module A; + +export { + redef enum Input::ID += { INPUT }; +} + +type Idx: record { + i: int; +}; + +type Val: record { + b: bool; + e: Log::ID; + c: count; + p: port; + sn: subnet; + a: addr; + d: double; + t: time; + iv: interval; + s: string; + sc: set[count]; + ss: set[string]; + se: set[string]; + vc: vector of int; + ve: vector of int; +}; + +global servers: table[int] of Val = table(); + +event bro_init() +{ + # first read in the old stuff into the table... + Input::create_stream(A::INPUT, [$source="input.log"]); + Input::add_tablefilter(A::INPUT, [$name="ssh", $idx=Idx, $val=Val, $destination=servers]); + Input::force_update(A::INPUT); + print servers; + Input::remove_tablefilter(A::INPUT, "ssh"); + Input::remove_stream(A::INPUT); +} diff --git a/testing/btest/scripts/base/frameworks/input/event.bro b/testing/btest/scripts/base/frameworks/input/event.bro new file mode 100644 index 0000000000..a07f0934a0 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/event.bro @@ -0,0 +1,42 @@ +# +# @TEST-EXEC: bro %INPUT >out +# @TEST-EXEC: btest-diff out + +@TEST-START-FILE input.log +#separator \x09 +#path ssh +#fields i b +#types int bool +1 T +2 T +3 F +4 F +5 F +6 F +7 T +@TEST-END-FILE + + +module A; + +export { + redef enum Input::ID += { INPUT }; +} + +type Val: record { + i: int; + b: bool; +}; + +event line(tpe: Input::Event, i: int, b: bool) { + print tpe; + print i; + print b; +} + +event bro_init() +{ + Input::create_stream(A::INPUT, [$source="input.log"]); + Input::add_eventfilter(A::INPUT, [$name="input", $fields=Val, $ev=line]); + Input::force_update(A::INPUT); +} diff --git a/testing/btest/scripts/base/frameworks/input/onecolumn-norecord.bro b/testing/btest/scripts/base/frameworks/input/onecolumn-norecord.bro new file mode 100644 index 0000000000..88838cc8d6 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/onecolumn-norecord.bro @@ -0,0 +1,38 @@ +# +# @TEST-EXEC: bro %INPUT >out +# @TEST-EXEC: btest-diff out + +@TEST-START-FILE input.log +#separator \x09 +#path ssh +#fields b i +#types bool int +T -42 +@TEST-END-FILE + +redef InputAscii::empty_field = "EMPTY"; + +module A; + +export { + redef enum Input::ID += { INPUT }; +} + +type Idx: record { + i: int; +}; + +type Val: record { + b: bool; +}; + +global servers: table[int] of Val = table(); + +event bro_init() +{ + # first read in the old stuff into the table... + Input::create_stream(A::INPUT, [$source="input.log"]); + Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=servers, $want_record=F]); + Input::force_update(A::INPUT); + print servers; +} diff --git a/testing/btest/scripts/base/frameworks/input/onecolumn-record.bro b/testing/btest/scripts/base/frameworks/input/onecolumn-record.bro new file mode 100644 index 0000000000..fc4d862cd3 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/onecolumn-record.bro @@ -0,0 +1,38 @@ +# +# @TEST-EXEC: bro %INPUT >out +# @TEST-EXEC: btest-diff out + +@TEST-START-FILE input.log +#separator \x09 +#path ssh +#fields b i +#types bool int +T -42 +@TEST-END-FILE + +redef InputAscii::empty_field = "EMPTY"; + +module A; + +export { + redef enum Input::ID += { INPUT }; +} + +type Idx: record { + i: int; +}; + +type Val: record { + b: bool; +}; + +global servers: table[int] of Val = table(); + +event bro_init() +{ + # first read in the old stuff into the table... + Input::create_stream(A::INPUT, [$source="input.log"]); + Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=servers]); + Input::force_update(A::INPUT); + print servers; +} diff --git a/testing/btest/scripts/base/frameworks/input/port.bro b/testing/btest/scripts/base/frameworks/input/port.bro new file mode 100644 index 0000000000..c14892ae36 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/port.bro @@ -0,0 +1,41 @@ +# +# @TEST-EXEC: bro %INPUT >out +# @TEST-EXEC: btest-diff out + +@TEST-START-FILE input.log +#fields i p t +1.2.3.4 80 tcp +1.2.3.5 52 udp +1.2.3.6 30 unknown +@TEST-END-FILE + +redef InputAscii::empty_field = "EMPTY"; + +module A; + +export { + redef enum Input::ID += { INPUT }; +} + +type Idx: record { + i: addr; +}; + +type Val: record { + p: port &type_column="t"; +}; + +global servers: table[addr] of Val = table(); + +event bro_init() +{ + # first read in the old stuff into the table... + Input::create_stream(A::INPUT, [$source="input.log"]); + Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=servers]); + Input::force_update(A::INPUT); + print servers[1.2.3.4]; + print servers[1.2.3.5]; + print servers[1.2.3.6]; + Input::remove_tablefilter(A::INPUT, "input"); + Input::remove_stream(A::INPUT); +} diff --git a/testing/btest/scripts/base/frameworks/input/predicate.bro b/testing/btest/scripts/base/frameworks/input/predicate.bro new file mode 100644 index 0000000000..5e6bae7b62 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/predicate.bro @@ -0,0 +1,66 @@ +# +# @TEST-EXEC: bro %INPUT >out +# @TEST-EXEC: btest-diff out + +@TEST-START-FILE input.log +#separator \x09 +#path ssh +#fields i b +#types int bool +1 T +2 T +3 F +4 F +5 F +6 F +7 T +@TEST-END-FILE + +redef InputAscii::empty_field = "EMPTY"; + +module A; + +export { + redef enum Input::ID += { INPUT }; +} + +type Idx: record { + i: int; +}; + +type Val: record { + b: bool; +}; + +global servers: table[int] of Val = table(); + +event bro_init() +{ + # first read in the old stuff into the table... + Input::create_stream(A::INPUT, [$source="input.log"]); + Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=servers, $want_record=F, + $pred(typ: Input::Event, left: Idx, right: bool) = { return right; } + ]); + Input::force_update(A::INPUT); + if ( 1 in servers ) { + print "VALID"; + } + if ( 2 in servers ) { + print "VALID"; + } + if ( !(3 in servers) ) { + print "VALID"; + } + if ( !(4 in servers) ) { + print "VALID"; + } + if ( !(5 in servers) ) { + print "VALID"; + } + if ( !(6 in servers) ) { + print "VALID"; + } + if ( 7 in servers ) { + print "VALID"; + } +} diff --git a/testing/btest/scripts/base/frameworks/input/tableevent.bro b/testing/btest/scripts/base/frameworks/input/tableevent.bro new file mode 100644 index 0000000000..36e8171689 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/tableevent.bro @@ -0,0 +1,48 @@ +# +# @TEST-EXEC: bro %INPUT >out +# @TEST-EXEC: btest-diff out + +@TEST-START-FILE input.log +#separator \x09 +#path ssh +#fields i b +#types int bool +1 T +2 T +3 F +4 F +5 F +6 F +7 T +@TEST-END-FILE + +redef InputAscii::empty_field = "EMPTY"; + +module A; + +export { + redef enum Log::ID += { LOG }; +} + +type Idx: record { + i: int; +}; + +type Val: record { + b: bool; +}; + +global destination: table[int] of Val = table(); + +event line(tpe: Input::Event, left: Idx, right: bool) { + print tpe; + print left; + print right; +} + +event bro_init() +{ + Input::create_stream(A::LOG, [$source="input.log"]); + Input::add_tablefilter(A::LOG, [$name="input", $idx=Idx, $val=Val, $destination=destination, $want_record=F,$ev=line]); + Input::force_update(A::LOG); +} diff --git a/testing/btest/scripts/base/frameworks/input/twofilters.bro b/testing/btest/scripts/base/frameworks/input/twofilters.bro new file mode 100644 index 0000000000..5af664e0e9 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/twofilters.bro @@ -0,0 +1,95 @@ +# +# @TEST-EXEC: bro %INPUT >out +# @TEST-EXEC: btest-diff out + +@TEST-START-FILE input.log +#separator \x09 +#path ssh +#fields i b +#types int bool +1 T +2 T +3 F +4 F +5 F +6 F +7 T +@TEST-END-FILE + +redef InputAscii::empty_field = "EMPTY"; + +module A; + +export { + redef enum Input::ID += { INPUT }; +} + +type Idx: record { + i: int; +}; + +type Val: record { + b: bool; +}; + +global destination1: table[int] of Val = table(); +global destination2: table[int] of Val = table(); + +event bro_init() +{ + # first read in the old stuff into the table... + Input::create_stream(A::INPUT, [$source="input.log"]); + Input::add_tablefilter(A::INPUT, [$name="input", $idx=Idx, $val=Val, $destination=destination1, $want_record=F, + $pred(typ: Input::Event, left: Idx, right: bool) = { return right; } + ]); + Input::add_tablefilter(A::INPUT, [$name="input2",$idx=Idx, $val=Val, $destination=destination2]); + + Input::force_update(A::INPUT); + if ( 1 in destination1 ) { + print "VALID"; + } + if ( 2 in destination1 ) { + print "VALID"; + } + if ( !(3 in destination1) ) { + print "VALID"; + } + if ( !(4 in destination1) ) { + print "VALID"; + } + if ( !(5 in destination1) ) { + print "VALID"; + } + if ( !(6 in destination1) ) { + print "VALID"; + } + if ( 7 in destination1 ) { + print "VALID"; + } + + print "MARK"; + + if ( 2 in destination2 ) { + print "VALID"; + } + if ( 2 in destination2 ) { + print "VALID"; + } + if ( 3 in destination2 ) { + print "VALID"; + } + if ( 4 in destination2 ) { + print "VALID"; + } + if ( 5 in destination2 ) { + print "VALID"; + } + if ( 6 in destination2 ) { + print "VALID"; + } + if ( 7 in destination2 ) { + print "VALID"; + } + + +}