mirror of
https://github.com/zeek/zeek.git
synced 2025-10-10 10:38:20 +00:00
Merge remote-tracking branch 'origin/topic/bernhard/input-threads-merge'
* origin/topic/bernhard/input-threads-merge: disable streaming reads from executed commands. automatically delete disabled input streams small documentation fixes Documentation
This commit is contained in:
commit
464732bfce
11 changed files with 406 additions and 352 deletions
12
CHANGES
12
CHANGES
|
@ -1,4 +1,16 @@
|
||||||
|
|
||||||
|
2.0-622 | 2012-06-15 15:38:43 -0700
|
||||||
|
|
||||||
|
* Input framework updates. (Bernhard Amann)
|
||||||
|
|
||||||
|
- Disable streaming reads from executed commands. This lead to
|
||||||
|
hanging Bros because pclose apparently can wait for eternity if
|
||||||
|
things go wrong.
|
||||||
|
|
||||||
|
- Automatically delete disabled input streams.
|
||||||
|
|
||||||
|
- Documentation.
|
||||||
|
|
||||||
2.0-614 | 2012-06-15 15:19:49 -0700
|
2.0-614 | 2012-06-15 15:19:49 -0700
|
||||||
|
|
||||||
* Remove an old, unused diff canonifier. (Jon Siwek)
|
* Remove an old, unused diff canonifier. (Jon Siwek)
|
||||||
|
|
2
VERSION
2
VERSION
|
@ -1 +1 @@
|
||||||
2.0-614
|
2.0-622
|
||||||
|
|
424
doc/input.rst
424
doc/input.rst
|
@ -1,93 +1,347 @@
|
||||||
=====================
|
==============================================
|
||||||
Loading Data into Bro
|
Loading Data into Bro with the Input Framework
|
||||||
=====================
|
==============================================
|
||||||
|
|
||||||
.. rst-class:: opening
|
.. rst-class:: opening
|
||||||
|
|
||||||
Bro comes with a flexible input interface that allows to read
|
Bro now features a flexible input frameworks that allows users
|
||||||
previously stored data. Data is either read into bro tables or
|
to import data into Bro. Data is either read into Bro tables or
|
||||||
sent to scripts using events.
|
converted to events which can then be handled by scripts.
|
||||||
This document describes how the input framework can be used.
|
|
||||||
|
The input framework is merged into the git master and we
|
||||||
|
will give a short summary on how to use it.
|
||||||
|
The input framework is automatically compiled and installed
|
||||||
|
together with Bro. The interface to it is exposed via the
|
||||||
|
scripting layer.
|
||||||
|
|
||||||
|
This document gives the most common examples. For more complex
|
||||||
|
scenarios it is worthwhile to take a look at the unit tests in
|
||||||
|
``testing/btest/scripts/base/frameworks/input/``.
|
||||||
|
|
||||||
.. contents::
|
.. contents::
|
||||||
|
|
||||||
Terminology
|
Reading Data into Tables
|
||||||
===========
|
========================
|
||||||
|
|
||||||
Bro's input framework is built around three main abstracts, that are
|
Probably the most interesting use-case of the input framework is to
|
||||||
very similar to the abstracts used in the logging framework:
|
read data into a Bro table.
|
||||||
|
|
||||||
Input Streams
|
By default, the input framework reads the data in the same format
|
||||||
An input stream corresponds to a single input source
|
as it is written by the logging framework in Bro - a tab-separated
|
||||||
(usually a textfile). It defined the information necessary
|
ASCII file.
|
||||||
to find the source (e.g. the filename), the reader that it used
|
|
||||||
to get data from it (see below).
|
|
||||||
It also defines exactly what data is read from the input source.
|
|
||||||
There are two different kind of streams, event streams and table
|
|
||||||
streams.
|
|
||||||
By default, event streams generate an event for each line read
|
|
||||||
from the input source.
|
|
||||||
Table streams on the other hand read the input source in a bro
|
|
||||||
table for easy later access.
|
|
||||||
|
|
||||||
Readers
|
We will show the ways to read files into Bro with a simple example.
|
||||||
A reader defines the input format for the specific input stream.
|
For this example we assume that we want to import data from a blacklist
|
||||||
At the moment, Bro comes with two types of reader. The default reader is READER_ASCII,
|
that contains server IP addresses as well as the timestamp and the reason
|
||||||
which can read the tab seperated ASCII logfiles that were generated by the
|
for the block.
|
||||||
logging framework.
|
|
||||||
READER_RAW can files containing records separated by a character(like e.g. newline) and send
|
|
||||||
one event per line.
|
|
||||||
|
|
||||||
|
An example input file could look like this:
|
||||||
|
|
||||||
Event Streams
|
::
|
||||||
=============
|
|
||||||
|
|
||||||
For examples, please look at the unit tests in
|
#fields ip timestamp reason
|
||||||
``testing/btest/scripts/base/frameworks/input/``.
|
192.168.17.1 1333252748 Malware host
|
||||||
|
192.168.27.2 1330235733 Botnet server
|
||||||
|
192.168.250.3 1333145108 Virus detected
|
||||||
|
|
||||||
Event Streams are streams that generate an event for each line in of the input source.
|
To read a file into a Bro table, two record types have to be defined.
|
||||||
|
One contains the types and names of the columns that should constitute the
|
||||||
|
table keys and the second contains the types and names of the columns that
|
||||||
|
should constitute the table values.
|
||||||
|
|
||||||
For example, a simple stream retrieving the fields ``i`` and ``b`` from an inputSource
|
In our case, we want to be able to lookup IPs. Hence, our key record
|
||||||
could be defined as follows:
|
only contains the server IP. All other elements should be stored as
|
||||||
|
the table content.
|
||||||
|
|
||||||
|
The two records are defined as:
|
||||||
|
|
||||||
.. code:: bro
|
.. code:: bro
|
||||||
|
|
||||||
type Val: record {
|
type Idx: record {
|
||||||
i: int;
|
ip: addr;
|
||||||
b: bool;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
event line(description: Input::EventDescription, tpe: Input::Event, i: int, b: bool) {
|
type Val: record {
|
||||||
# work with event data
|
timestamp: time;
|
||||||
|
reason: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
ote that the names of the fields in the record definitions have to correspond to
|
||||||
|
the column names listed in the '#fields' line of the log file, in this case 'ip',
|
||||||
|
'timestamp', and 'reason'.
|
||||||
|
|
||||||
|
The log file is read into the table with a simple call of the add_table function:
|
||||||
|
|
||||||
|
.. code:: bro
|
||||||
|
|
||||||
|
global blacklist: table[addr] of Val = table();
|
||||||
|
|
||||||
|
Input::add_table([$source="blacklist.file", $name="blacklist", $idx=Idx, $val=Val, $destination=blacklist]);
|
||||||
|
Input::remove("blacklist");
|
||||||
|
|
||||||
|
With these three lines we first create an empty table that should contain the
|
||||||
|
blacklist data and then instruct the Input framework to open an input stream
|
||||||
|
named ``blacklist`` to read the data into the table. The third line removes the
|
||||||
|
input stream again, because we do not need it any more after the data has been
|
||||||
|
read.
|
||||||
|
|
||||||
|
Because some data files can - potentially - be rather big, the input framework
|
||||||
|
works asynchronously. A new thread is created for each new input stream.
|
||||||
|
This thread opens the input data file, converts the data into a Bro format and
|
||||||
|
sends it back to the main Bro thread.
|
||||||
|
|
||||||
|
Because of this, the data is not immediately accessible. Depending on the
|
||||||
|
size of the data source it might take from a few milliseconds up to a few seconds
|
||||||
|
until all data is present in the table. Please note that this means that when Bro
|
||||||
|
is running without an input source or on very short captured files, it might terminate
|
||||||
|
before the data is present in the system (because Bro already handled all packets
|
||||||
|
before the import thread finished).
|
||||||
|
|
||||||
|
Subsequent calls to an input source are queued until the previous action has been
|
||||||
|
completed. Because of this, it is, for example, possible to call ``add_table`` and
|
||||||
|
``remove`` in two subsequent lines: the ``remove`` action will remain queued until
|
||||||
|
the first read has been completed.
|
||||||
|
|
||||||
|
Once the input framework finishes reading from a data source, it fires the ``update_finished``
|
||||||
|
event. Once this event has been received all data from the input file is available
|
||||||
|
in the table.
|
||||||
|
|
||||||
|
.. code:: bro
|
||||||
|
|
||||||
|
event Input::update_finished(name: string, source: string) {
|
||||||
|
# now all data is in the table
|
||||||
|
print blacklist;
|
||||||
}
|
}
|
||||||
|
|
||||||
event bro_init {
|
The table can also already be used while the data is still being read - it just might
|
||||||
Input::add_event([$source="input.log", $name="input", $fields=Val, $ev=line]);
|
not contain all lines in the input file when the event has not yet fired. After it has
|
||||||
|
been populated it can be used like any other Bro table and blacklist entries easily be
|
||||||
|
tested:
|
||||||
|
|
||||||
|
.. code:: bro
|
||||||
|
|
||||||
|
if ( 192.168.18.12 in blacklist )
|
||||||
|
# take action
|
||||||
|
|
||||||
|
|
||||||
|
Re-reading and streaming data
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
For many data sources, like for many blacklists, the source data is continually
|
||||||
|
changing. For this cases, the Bro input framework supports several ways to
|
||||||
|
deal with changing data files.
|
||||||
|
|
||||||
|
The first, very basic method is an explicit refresh of an input stream. When an input
|
||||||
|
stream is open, the function ``force_update`` can be called. This will trigger
|
||||||
|
a complete refresh of the table; any changed elements from the file will be updated.
|
||||||
|
After the update is finished the ``update_finished`` event will be raised.
|
||||||
|
|
||||||
|
In our example the call would look like:
|
||||||
|
|
||||||
|
.. code:: bro
|
||||||
|
|
||||||
|
Input::force_update("blacklist");
|
||||||
|
|
||||||
|
The input framework also supports two automatic refresh mode. The first mode
|
||||||
|
continually checks if a file has been changed. If the file has been changed, it
|
||||||
|
is re-read and the data in the Bro table is updated to reflect the current state.
|
||||||
|
Each time a change has been detected and all the new data has been read into the
|
||||||
|
table, the ``update_finished`` event is raised.
|
||||||
|
|
||||||
|
The second mode is a streaming mode. This mode assumes that the source data file
|
||||||
|
is an append-only file to which new data is continually appended. Bro continually
|
||||||
|
checks for new data at the end of the file and will add the new data to the table.
|
||||||
|
If newer lines in the file have the same index as previous lines, they will overwrite
|
||||||
|
the values in the output table.
|
||||||
|
Because of the nature of streaming reads (data is continually added to the table),
|
||||||
|
the ``update_finished`` event is never raised when using streaming reads.
|
||||||
|
|
||||||
|
The reading mode can be selected by setting the ``mode`` option of the add_table call.
|
||||||
|
Valid values are ``MANUAL`` (the default), ``REREAD`` and ``STREAM``.
|
||||||
|
|
||||||
|
Hence, when using adding ``$mode=Input::REREAD`` to the previous example, the blacklists
|
||||||
|
table will always reflect the state of the blacklist input file.
|
||||||
|
|
||||||
|
.. code:: bro
|
||||||
|
|
||||||
|
Input::add_table([$source="blacklist.file", $name="blacklist", $idx=Idx, $val=Val, $destination=blacklist, $mode=Input::REREAD]);
|
||||||
|
|
||||||
|
Receiving change events
|
||||||
|
-----------------------
|
||||||
|
|
||||||
|
When re-reading files, it might be interesting to know exactly which lines in the source
|
||||||
|
files have changed.
|
||||||
|
|
||||||
|
For this reason, the input framework can raise an event each time when a data item is added to,
|
||||||
|
removed from or changed in a table.
|
||||||
|
|
||||||
|
The event definition looks like this:
|
||||||
|
|
||||||
|
.. code:: bro
|
||||||
|
|
||||||
|
event entry(description: Input::TableDescription, tpe: Input::Event, left: Idx, right: Val) {
|
||||||
|
# act on values
|
||||||
}
|
}
|
||||||
|
|
||||||
The fields that can be set for an event stream are:
|
The event has to be specified in ``$ev`` in the ``add_table`` call:
|
||||||
|
|
||||||
``want_record``
|
.. code:: bro
|
||||||
Boolean value, that defines if the event wants to receive the fields inside of
|
|
||||||
a single record value, or individually (default).
|
Input::add_table([$source="blacklist.file", $name="blacklist", $idx=Idx, $val=Val, $destination=blacklist, $mode=Input::REREAD, $ev=entry]);
|
||||||
|
|
||||||
|
The ``description`` field of the event contains the arguments that were originally supplied to the add_table call.
|
||||||
|
Hence, the name of the stream can, for example, be accessed with ``description$name``. ``tpe`` is an enum containing
|
||||||
|
the type of the change that occurred.
|
||||||
|
|
||||||
|
It will contain ``Input::EVENT_NEW``, when a line that was not previously been
|
||||||
|
present in the table has been added. In this case ``left`` contains the Index of the added table entry and ``right`` contains
|
||||||
|
the values of the added entry.
|
||||||
|
|
||||||
|
If a table entry that already was present is altered during the re-reading or streaming read of a file, ``tpe`` will contain
|
||||||
|
``Input::EVENT_CHANGED``. In this case ``left`` contains the Index of the changed table entry and ``right`` contains the
|
||||||
|
values of the entry before the change. The reason for this is, that the table already has been updated when the event is
|
||||||
|
raised. The current value in the table can be ascertained by looking up the current table value. Hence it is possible to compare
|
||||||
|
the new and the old value of the table.
|
||||||
|
|
||||||
|
``tpe`` contains ``Input::REMOVED``, when a table element is removed because it was no longer present during a re-read.
|
||||||
|
In this case ``left`` contains the index and ``right`` the values of the removed element.
|
||||||
|
|
||||||
|
|
||||||
|
Filtering data during import
|
||||||
|
----------------------------
|
||||||
|
|
||||||
|
The input framework also allows a user to filter the data during the import. To this end, predicate functions are used. A predicate
|
||||||
|
function is called before a new element is added/changed/removed from a table. The predicate can either accept or veto
|
||||||
|
the change by returning true for an accepted change and false for an rejected change. Furthermore, it can alter the data
|
||||||
|
before it is written to the table.
|
||||||
|
|
||||||
|
The following example filter will reject to add entries to the table when they were generated over a month ago. It
|
||||||
|
will accept all changes and all removals of values that are already present in the table.
|
||||||
|
|
||||||
|
.. code:: bro
|
||||||
|
|
||||||
|
Input::add_table([$source="blacklist.file", $name="blacklist", $idx=Idx, $val=Val, $destination=blacklist, $mode=Input::REREAD,
|
||||||
|
$pred(typ: Input::Event, left: Idx, right: Val) = {
|
||||||
|
if ( typ != Input::EVENT_NEW ) {
|
||||||
|
return T;
|
||||||
|
}
|
||||||
|
return ( ( current_time() - right$timestamp ) < (30 day) );
|
||||||
|
}]);
|
||||||
|
|
||||||
|
To change elements while they are being imported, the predicate function can manipulate ``left`` and ``right``. Note
|
||||||
|
that predicate functions are called before the change is committed to the table. Hence, when a table element is changed ( ``tpe``
|
||||||
|
is ``INPUT::EVENT_CHANGED`` ), ``left`` and ``right`` contain the new values, but the destination (``blacklist`` in our example)
|
||||||
|
still contains the old values. This allows predicate functions to examine the changes between the old and the new version before
|
||||||
|
deciding if they should be allowed.
|
||||||
|
|
||||||
|
Different readers
|
||||||
|
-----------------
|
||||||
|
|
||||||
|
The input framework supports different kinds of readers for different kinds of source data files. At the moment, the default
|
||||||
|
reader reads ASCII files formatted in the Bro log-file-format (tab-separated values). At the moment, Bro comes with two
|
||||||
|
other readers. The ``RAW`` reader reads a file that is split by a specified record separator (usually newline). The contents
|
||||||
|
are returned line-by-line as strings; it can, for example, be used to read configuration files and the like and is probably
|
||||||
|
only useful in the event mode and not for reading data to tables.
|
||||||
|
|
||||||
|
Another included reader is the ``BENCHMARK`` reader, which is being used to optimize the speed of the input framework. It
|
||||||
|
can generate arbitrary amounts of semi-random data in all Bro data types supported by the input framework.
|
||||||
|
|
||||||
|
In the future, the input framework will get support for new data sources like, for example, different databases.
|
||||||
|
|
||||||
|
Add_table options
|
||||||
|
-----------------
|
||||||
|
|
||||||
|
This section lists all possible options that can be used for the add_table function and gives
|
||||||
|
a short explanation of their use. Most of the options already have been discussed in the
|
||||||
|
previous sections.
|
||||||
|
|
||||||
|
The possible fields that can be set for an table stream are:
|
||||||
|
|
||||||
``source``
|
``source``
|
||||||
A mandatory string identifying the source of the data.
|
A mandatory string identifying the source of the data.
|
||||||
For the ASCII reader this is the filename.
|
For the ASCII reader this is the filename.
|
||||||
|
|
||||||
``reader``
|
``name``
|
||||||
|
A mandatory name for the filter that can later be used
|
||||||
|
to manipulate it further.
|
||||||
|
|
||||||
|
``idx``
|
||||||
|
Record type that defines the index of the table
|
||||||
|
|
||||||
|
``val``
|
||||||
|
Record type that defines the values of the table
|
||||||
|
|
||||||
|
``reader``
|
||||||
The reader used for this stream. Default is ``READER_ASCII``.
|
The reader used for this stream. Default is ``READER_ASCII``.
|
||||||
|
|
||||||
``mode``
|
``mode``
|
||||||
The mode in which the stream is opened. Possible values are ``MANUAL``, ``REREAD`` and ``STREAM``.
|
The mode in which the stream is opened. Possible values are ``MANUAL``, ``REREAD`` and ``STREAM``.
|
||||||
Default is ``MANUAL``.
|
Default is ``MANUAL``.
|
||||||
``MANUAL`` means, that the files is not updated after it has been read. Changes to the file will not
|
``MANUAL`` means, that the files is not updated after it has been read. Changes to the file will not
|
||||||
be reflected in the data bro knows.
|
be reflected in the data Bro knows.
|
||||||
``REREAD`` means that the whole file is read again each time a change is found. This should be used for
|
``REREAD`` means that the whole file is read again each time a change is found. This should be used for
|
||||||
files that are mapped to a table where individual lines can change.
|
files that are mapped to a table where individual lines can change.
|
||||||
``STREAM`` means that the data from the file is streamed. Events / table entries will be generated as new
|
``STREAM`` means that the data from the file is streamed. Events / table entries will be generated as new
|
||||||
data is added to the file.
|
data is added to the file.
|
||||||
|
|
||||||
|
``destination``
|
||||||
|
The destination table
|
||||||
|
|
||||||
|
``ev``
|
||||||
|
Optional event that is raised, when values are added to, changed in or deleted from the table.
|
||||||
|
Events are passed an Input::Event description as the first argument, the index record as the second argument
|
||||||
|
and the values as the third argument.
|
||||||
|
|
||||||
|
``pred``
|
||||||
|
Optional predicate, that can prevent entries from being added to the table and events from being sent.
|
||||||
|
|
||||||
|
``want_record``
|
||||||
|
Boolean value, that defines if the event wants to receive the fields inside of
|
||||||
|
a single record value, or individually (default).
|
||||||
|
This can be used, if ``val`` is a record containing only one type. In this case,
|
||||||
|
if ``want_record`` is set to false, the table will contain elements of the type
|
||||||
|
contained in ``val``.
|
||||||
|
|
||||||
|
Reading data to events
|
||||||
|
======================
|
||||||
|
|
||||||
|
The second supported mode of the input framework is reading data to Bro events instead
|
||||||
|
of reading them to a table using event streams.
|
||||||
|
|
||||||
|
Event streams work very similarly to table streams that were already discussed in much
|
||||||
|
detail. To read the blacklist of the previous example into an event stream, the following
|
||||||
|
Bro code could be used:
|
||||||
|
|
||||||
|
.. code:: bro
|
||||||
|
|
||||||
|
type Val: record {
|
||||||
|
ip: addr;
|
||||||
|
timestamp: time;
|
||||||
|
reason: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
event blacklistentry(description: Input::EventDescription, tpe: Input::Event, ip: addr, timestamp: time, reason: string) {
|
||||||
|
# work with event data
|
||||||
|
}
|
||||||
|
|
||||||
|
event bro_init() {
|
||||||
|
Input::add_event([$source="blacklist.file", $name="blacklist", $fields=Val, $ev=blacklistentry]);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
The main difference in the declaration of the event stream is, that an event stream needs no
|
||||||
|
separate index and value declarations -- instead, all source data types are provided in a single
|
||||||
|
record definition.
|
||||||
|
|
||||||
|
Apart from this, event streams work exactly the same as table streams and support most of the options
|
||||||
|
that are also supported for table streams.
|
||||||
|
|
||||||
|
The options that can be set for when creating an event stream with ``add_event`` are:
|
||||||
|
|
||||||
|
``source``
|
||||||
|
A mandatory string identifying the source of the data.
|
||||||
|
For the ASCII reader this is the filename.
|
||||||
|
|
||||||
``name``
|
``name``
|
||||||
A mandatory name for the stream that can later be used
|
A mandatory name for the stream that can later be used
|
||||||
to remove it.
|
to remove it.
|
||||||
|
@ -102,82 +356,26 @@ The fields that can be set for an event stream are:
|
||||||
followed by the data, either inside of a record (if ``want_record is set``) or as
|
followed by the data, either inside of a record (if ``want_record is set``) or as
|
||||||
individual fields.
|
individual fields.
|
||||||
The Input::Event structure can contain information, if the received line is ``NEW``, has
|
The Input::Event structure can contain information, if the received line is ``NEW``, has
|
||||||
been ``CHANGED`` or ``DELETED``. Singe the ascii reader cannot track this information
|
been ``CHANGED`` or ``DELETED``. Singe the ASCII reader cannot track this information
|
||||||
for event filters, the value is always ``NEW`` at the moment.
|
for event filters, the value is always ``NEW`` at the moment.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Table Streams
|
|
||||||
=============
|
|
||||||
|
|
||||||
Table streams are the second, more complex type of input streams.
|
|
||||||
|
|
||||||
Table streams store the information they read from an input source in a bro table. For example,
|
|
||||||
when reading a file that contains ip addresses and connection attemt information one could use
|
|
||||||
an approach similar to this:
|
|
||||||
|
|
||||||
.. code:: bro
|
|
||||||
|
|
||||||
type Idx: record {
|
|
||||||
a: addr;
|
|
||||||
};
|
|
||||||
|
|
||||||
type Val: record {
|
|
||||||
tries: count;
|
|
||||||
};
|
|
||||||
|
|
||||||
global conn_attempts: table[addr] of count = table();
|
|
||||||
|
|
||||||
event bro_init {
|
|
||||||
Input::add_table([$source="input.txt", $name="input", $idx=Idx, $val=Val, $destination=conn_attempts]);
|
|
||||||
}
|
|
||||||
|
|
||||||
The table conn_attempts will then contain the information about connection attemps.
|
|
||||||
|
|
||||||
The possible fields that can be set for an table stream are:
|
|
||||||
|
|
||||||
``want_record``
|
|
||||||
Boolean value, that defines if the event wants to receive the fields inside of
|
|
||||||
a single record value, or individually (default).
|
|
||||||
|
|
||||||
``source``
|
|
||||||
A mandatory string identifying the source of the data.
|
|
||||||
For the ASCII reader this is the filename.
|
|
||||||
|
|
||||||
``reader``
|
|
||||||
The reader used for this stream. Default is ``READER_ASCII``.
|
|
||||||
|
|
||||||
``mode``
|
``mode``
|
||||||
The mode in which the stream is opened. Possible values are ``MANUAL``, ``REREAD`` and ``STREAM``.
|
The mode in which the stream is opened. Possible values are ``MANUAL``, ``REREAD`` and ``STREAM``.
|
||||||
Default is ``MANUAL``.
|
Default is ``MANUAL``.
|
||||||
``MANUAL`` means, that the files is not updated after it has been read. Changes to the file will not
|
``MANUAL`` means, that the files is not updated after it has been read. Changes to the file will not
|
||||||
be reflected in the data bro knows.
|
be reflected in the data Bro knows.
|
||||||
``REREAD`` means that the whole file is read again each time a change is found. This should be used for
|
``REREAD`` means that the whole file is read again each time a change is found. This should be used for
|
||||||
files that are mapped to a table where individual lines can change.
|
files that are mapped to a table where individual lines can change.
|
||||||
``STREAM`` means that the data from the file is streamed. Events / table entries will be generated as new
|
``STREAM`` means that the data from the file is streamed. Events / table entries will be generated as new
|
||||||
data is added to the file.
|
data is added to the file.
|
||||||
|
|
||||||
``name``
|
``reader``
|
||||||
A mandatory name for the filter that can later be used
|
The reader used for this stream. Default is ``READER_ASCII``.
|
||||||
to manipulate it further.
|
|
||||||
|
|
||||||
``idx``
|
|
||||||
Record type that defines the index of the table
|
|
||||||
|
|
||||||
``val``
|
|
||||||
Record type that defines the values of the table
|
|
||||||
|
|
||||||
``want_record``
|
``want_record``
|
||||||
Defines if the values of the table should be stored as a record (default),
|
Boolean value, that defines if the event wants to receive the fields inside of
|
||||||
or as a simple value. Has to be set if Val contains more than one element.
|
a single record value, or individually (default). If this is set to true, the
|
||||||
|
event will receive a single record of the type provided in ``fields``.
|
||||||
|
|
||||||
``destination``
|
|
||||||
The destination table
|
|
||||||
|
|
||||||
``ev``
|
|
||||||
Optional event that is raised, when values are added to, changed in or deleted from the table.
|
|
||||||
Events are passed an Input::Event description as the first argument, the index record as the second argument
|
|
||||||
and the values as the third argument.
|
|
||||||
|
|
||||||
``pred``
|
|
||||||
Optional predicate, that can prevent entries from being added to the table and events from being sent.
|
|
||||||
|
|
|
@ -689,16 +689,14 @@ bool Manager::IsCompatibleType(BroType* t, bool atomic_only)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool Manager::RemoveStream(const string &name)
|
bool Manager::RemoveStream(Stream *i)
|
||||||
{
|
{
|
||||||
Stream *i = FindStream(name);
|
|
||||||
|
|
||||||
if ( i == 0 )
|
if ( i == 0 )
|
||||||
return false; // not found
|
return false; // not found
|
||||||
|
|
||||||
if ( i->removed )
|
if ( i->removed )
|
||||||
{
|
{
|
||||||
reporter->Error("Stream %s is already queued for removal. Ignoring remove.", name.c_str());
|
reporter->Error("Stream %s is already queued for removal. Ignoring remove.", i->name.c_str());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -706,14 +704,24 @@ bool Manager::RemoveStream(const string &name)
|
||||||
|
|
||||||
i->reader->Close();
|
i->reader->Close();
|
||||||
|
|
||||||
#ifdef DEBUG
|
DBG_LOG(DBG_INPUT, "Successfully queued removal of stream %s",
|
||||||
DBG_LOG(DBG_INPUT, "Successfully queued removal of stream %s",
|
i->name.c_str());
|
||||||
name.c_str());
|
|
||||||
#endif
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool Manager::RemoveStream(ReaderFrontend* frontend)
|
||||||
|
{
|
||||||
|
return RemoveStream(FindStream(frontend));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
bool Manager::RemoveStream(const string &name)
|
||||||
|
{
|
||||||
|
return RemoveStream(FindStream(name));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
bool Manager::RemoveStreamContinuation(ReaderFrontend* reader)
|
bool Manager::RemoveStreamContinuation(ReaderFrontend* reader)
|
||||||
{
|
{
|
||||||
Stream *i = FindStream(reader);
|
Stream *i = FindStream(reader);
|
||||||
|
|
|
@ -72,7 +72,7 @@ public:
|
||||||
/**
|
/**
|
||||||
* Deletes an existing input stream.
|
* Deletes an existing input stream.
|
||||||
*
|
*
|
||||||
* @param id The enum value corresponding the input stream.
|
* @param id The name of the input stream to be removed.
|
||||||
*
|
*
|
||||||
* This method corresponds directly to the internal BiF defined in
|
* This method corresponds directly to the internal BiF defined in
|
||||||
* input.bif, which just forwards here.
|
* input.bif, which just forwards here.
|
||||||
|
@ -88,6 +88,7 @@ protected:
|
||||||
friend class SendEntryMessage;
|
friend class SendEntryMessage;
|
||||||
friend class EndCurrentSendMessage;
|
friend class EndCurrentSendMessage;
|
||||||
friend class ReaderClosedMessage;
|
friend class ReaderClosedMessage;
|
||||||
|
friend class DisableMessage;
|
||||||
|
|
||||||
// For readers to write to input stream in direct mode (reporting
|
// For readers to write to input stream in direct mode (reporting
|
||||||
// new/deleted values directly). Functions take ownership of
|
// new/deleted values directly). Functions take ownership of
|
||||||
|
@ -119,11 +120,25 @@ protected:
|
||||||
// stream is still received.
|
// stream is still received.
|
||||||
bool RemoveStreamContinuation(ReaderFrontend* reader);
|
bool RemoveStreamContinuation(ReaderFrontend* reader);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes an existing input stream.
|
||||||
|
*
|
||||||
|
* @param frontend pointer to the frontend of the input stream to be removed.
|
||||||
|
*
|
||||||
|
* This method is used by the reader backends to remove a reader when it fails
|
||||||
|
* for some reason.
|
||||||
|
*/
|
||||||
|
bool RemoveStream(ReaderFrontend* frontend);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
class Stream;
|
class Stream;
|
||||||
class TableStream;
|
class TableStream;
|
||||||
class EventStream;
|
class EventStream;
|
||||||
|
|
||||||
|
// Actual RemoveStream implementation -- the function's public and
|
||||||
|
// protected definitions are wrappers around this function.
|
||||||
|
bool RemoveStream(Stream* i);
|
||||||
|
|
||||||
bool CreateStream(Stream*, RecordVal* description);
|
bool CreateStream(Stream*, RecordVal* description);
|
||||||
|
|
||||||
// SendEntry implementation for Table stream.
|
// SendEntry implementation for Table stream.
|
||||||
|
|
|
@ -113,6 +113,7 @@ public:
|
||||||
|
|
||||||
virtual bool Process()
|
virtual bool Process()
|
||||||
{
|
{
|
||||||
|
Object()->SetDisable();
|
||||||
return input_mgr->RemoveStreamContinuation(Object());
|
return input_mgr->RemoveStreamContinuation(Object());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,6 +130,12 @@ public:
|
||||||
virtual bool Process()
|
virtual bool Process()
|
||||||
{
|
{
|
||||||
Object()->SetDisable();
|
Object()->SetDisable();
|
||||||
|
// And - because we do not need disabled objects any more -
|
||||||
|
// there is no way to re-enable them, so simply delete them.
|
||||||
|
// This avoids the problem of having to periodically check if
|
||||||
|
// there are any disabled readers out there. As soon as a
|
||||||
|
// reader disables itself, it deletes itself.
|
||||||
|
input_mgr->RemoveStream(Object());
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -203,8 +210,7 @@ bool ReaderBackend::Init(string arg_source, ReaderMode arg_mode, const int arg_n
|
||||||
void ReaderBackend::Close()
|
void ReaderBackend::Close()
|
||||||
{
|
{
|
||||||
DoClose();
|
DoClose();
|
||||||
disabled = true;
|
disabled = true; // frontend disables itself when it gets the Close-message.
|
||||||
DisableFrontend();
|
|
||||||
SendOut(new ReaderClosedMessage(frontend));
|
SendOut(new ReaderClosedMessage(frontend));
|
||||||
|
|
||||||
if ( fields != 0 )
|
if ( fields != 0 )
|
||||||
|
|
|
@ -15,17 +15,24 @@ namespace input {
|
||||||
*/
|
*/
|
||||||
enum ReaderMode {
|
enum ReaderMode {
|
||||||
/**
|
/**
|
||||||
* TODO Bernhard.
|
* Manual refresh reader mode. The reader will read the file once,
|
||||||
|
* and send all read data back to the manager. After that, no automatic
|
||||||
|
* refresh should happen. Manual refreshes can be triggered from the
|
||||||
|
* scripting layer using force_update.
|
||||||
*/
|
*/
|
||||||
MODE_MANUAL,
|
MODE_MANUAL,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* TODO Bernhard.
|
* Automatic rereading mode. The reader should monitor the
|
||||||
|
* data source for changes continually. When the data source changes,
|
||||||
|
* either the whole file has to be resent using the SendEntry/EndCurrentSend functions.
|
||||||
*/
|
*/
|
||||||
MODE_REREAD,
|
MODE_REREAD,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* TODO Bernhard.
|
* Streaming reading mode. The reader should monitor the data source
|
||||||
|
* for new appended data. When new data is appended is has to be sent
|
||||||
|
* using the Put api functions.
|
||||||
*/
|
*/
|
||||||
MODE_STREAM
|
MODE_STREAM
|
||||||
};
|
};
|
||||||
|
|
|
@ -6,9 +6,6 @@
|
||||||
|
|
||||||
#include "threading/MsgThread.h"
|
#include "threading/MsgThread.h"
|
||||||
|
|
||||||
// FIXME: cleanup of disabled inputreaders is missing. we need this, because
|
|
||||||
// stuff can e.g. fail in init and might never be removed afterwards.
|
|
||||||
|
|
||||||
namespace input {
|
namespace input {
|
||||||
|
|
||||||
class InitMessage : public threading::InputMessage<ReaderBackend>
|
class InitMessage : public threading::InputMessage<ReaderBackend>
|
||||||
|
@ -106,6 +103,7 @@ void ReaderFrontend::Close()
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
disabled = true;
|
||||||
backend->SendIn(new CloseMessage(backend));
|
backend->SendIn(new CloseMessage(backend));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -79,6 +79,9 @@ bool Raw::CloseInput()
|
||||||
InternalError(Fmt("Trying to close closed file for stream %s", fname.c_str()));
|
InternalError(Fmt("Trying to close closed file for stream %s", fname.c_str()));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
#ifdef DEBUG
|
||||||
|
Debug(DBG_INPUT, "Raw reader starting close");
|
||||||
|
#endif
|
||||||
|
|
||||||
delete in;
|
delete in;
|
||||||
|
|
||||||
|
@ -90,6 +93,10 @@ bool Raw::CloseInput()
|
||||||
in = NULL;
|
in = NULL;
|
||||||
file = NULL;
|
file = NULL;
|
||||||
|
|
||||||
|
#ifdef DEBUG
|
||||||
|
Debug(DBG_INPUT, "Raw reader finished close");
|
||||||
|
#endif
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,7 +135,7 @@ bool Raw::DoInit(string path, ReaderMode mode, int num_fields, const Field* cons
|
||||||
execute = true;
|
execute = true;
|
||||||
fname = path.substr(0, fname.length() - 1);
|
fname = path.substr(0, fname.length() - 1);
|
||||||
|
|
||||||
if ( (mode != MODE_MANUAL) && (mode != MODE_STREAM) )
|
if ( (mode != MODE_MANUAL) )
|
||||||
{
|
{
|
||||||
Error(Fmt("Unsupported read mode %d for source %s in execution mode",
|
Error(Fmt("Unsupported read mode %d for source %s in execution mode",
|
||||||
mode, fname.c_str()));
|
mode, fname.c_str()));
|
||||||
|
@ -254,8 +261,14 @@ bool Raw::DoHeartbeat(double network_time, double current_time)
|
||||||
|
|
||||||
case MODE_REREAD:
|
case MODE_REREAD:
|
||||||
case MODE_STREAM:
|
case MODE_STREAM:
|
||||||
|
#ifdef DEBUG
|
||||||
|
Debug(DBG_INPUT, "Starting Heartbeat update");
|
||||||
|
#endif
|
||||||
Update(); // call update and not DoUpdate, because update
|
Update(); // call update and not DoUpdate, because update
|
||||||
// checks disabled.
|
// checks disabled.
|
||||||
|
#ifdef DEBUG
|
||||||
|
Debug(DBG_INPUT, "Finished with heartbeat update");
|
||||||
|
#endif
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
assert(false);
|
assert(false);
|
||||||
|
|
|
@ -1,145 +0,0 @@
|
||||||
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
|
|
||||||
{
|
|
||||||
print A::outfile, A::description;
|
|
||||||
print A::outfile, A::tpe;
|
|
||||||
print A::outfile, A::s;
|
|
||||||
A::try = A::try + 1;
|
|
||||||
if (9 == A::try)
|
|
||||||
{
|
|
||||||
print A::outfile, done;
|
|
||||||
close(A::outfile);
|
|
||||||
Input::remove(input);
|
|
||||||
}
|
|
||||||
|
|
||||||
}]
|
|
||||||
Input::EVENT_NEW
|
|
||||||
sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
|
|
||||||
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
|
|
||||||
{
|
|
||||||
print A::outfile, A::description;
|
|
||||||
print A::outfile, A::tpe;
|
|
||||||
print A::outfile, A::s;
|
|
||||||
A::try = A::try + 1;
|
|
||||||
if (9 == A::try)
|
|
||||||
{
|
|
||||||
print A::outfile, done;
|
|
||||||
close(A::outfile);
|
|
||||||
Input::remove(input);
|
|
||||||
}
|
|
||||||
|
|
||||||
}]
|
|
||||||
Input::EVENT_NEW
|
|
||||||
DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
|
|
||||||
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
|
|
||||||
{
|
|
||||||
print A::outfile, A::description;
|
|
||||||
print A::outfile, A::tpe;
|
|
||||||
print A::outfile, A::s;
|
|
||||||
A::try = A::try + 1;
|
|
||||||
if (9 == A::try)
|
|
||||||
{
|
|
||||||
print A::outfile, done;
|
|
||||||
close(A::outfile);
|
|
||||||
Input::remove(input);
|
|
||||||
}
|
|
||||||
|
|
||||||
}]
|
|
||||||
Input::EVENT_NEW
|
|
||||||
q3r3057fdf
|
|
||||||
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
|
|
||||||
{
|
|
||||||
print A::outfile, A::description;
|
|
||||||
print A::outfile, A::tpe;
|
|
||||||
print A::outfile, A::s;
|
|
||||||
A::try = A::try + 1;
|
|
||||||
if (9 == A::try)
|
|
||||||
{
|
|
||||||
print A::outfile, done;
|
|
||||||
close(A::outfile);
|
|
||||||
Input::remove(input);
|
|
||||||
}
|
|
||||||
|
|
||||||
}]
|
|
||||||
Input::EVENT_NEW
|
|
||||||
sdfs\d
|
|
||||||
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
|
|
||||||
{
|
|
||||||
print A::outfile, A::description;
|
|
||||||
print A::outfile, A::tpe;
|
|
||||||
print A::outfile, A::s;
|
|
||||||
A::try = A::try + 1;
|
|
||||||
if (9 == A::try)
|
|
||||||
{
|
|
||||||
print A::outfile, done;
|
|
||||||
close(A::outfile);
|
|
||||||
Input::remove(input);
|
|
||||||
}
|
|
||||||
|
|
||||||
}]
|
|
||||||
Input::EVENT_NEW
|
|
||||||
|
|
||||||
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
|
|
||||||
{
|
|
||||||
print A::outfile, A::description;
|
|
||||||
print A::outfile, A::tpe;
|
|
||||||
print A::outfile, A::s;
|
|
||||||
A::try = A::try + 1;
|
|
||||||
if (9 == A::try)
|
|
||||||
{
|
|
||||||
print A::outfile, done;
|
|
||||||
close(A::outfile);
|
|
||||||
Input::remove(input);
|
|
||||||
}
|
|
||||||
|
|
||||||
}]
|
|
||||||
Input::EVENT_NEW
|
|
||||||
dfsdf
|
|
||||||
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
|
|
||||||
{
|
|
||||||
print A::outfile, A::description;
|
|
||||||
print A::outfile, A::tpe;
|
|
||||||
print A::outfile, A::s;
|
|
||||||
A::try = A::try + 1;
|
|
||||||
if (9 == A::try)
|
|
||||||
{
|
|
||||||
print A::outfile, done;
|
|
||||||
close(A::outfile);
|
|
||||||
Input::remove(input);
|
|
||||||
}
|
|
||||||
|
|
||||||
}]
|
|
||||||
Input::EVENT_NEW
|
|
||||||
sdf
|
|
||||||
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
|
|
||||||
{
|
|
||||||
print A::outfile, A::description;
|
|
||||||
print A::outfile, A::tpe;
|
|
||||||
print A::outfile, A::s;
|
|
||||||
A::try = A::try + 1;
|
|
||||||
if (9 == A::try)
|
|
||||||
{
|
|
||||||
print A::outfile, done;
|
|
||||||
close(A::outfile);
|
|
||||||
Input::remove(input);
|
|
||||||
}
|
|
||||||
|
|
||||||
}]
|
|
||||||
Input::EVENT_NEW
|
|
||||||
3rw43wRRERLlL#RWERERERE.
|
|
||||||
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
|
|
||||||
{
|
|
||||||
print A::outfile, A::description;
|
|
||||||
print A::outfile, A::tpe;
|
|
||||||
print A::outfile, A::s;
|
|
||||||
A::try = A::try + 1;
|
|
||||||
if (9 == A::try)
|
|
||||||
{
|
|
||||||
print A::outfile, done;
|
|
||||||
close(A::outfile);
|
|
||||||
Input::remove(input);
|
|
||||||
}
|
|
||||||
|
|
||||||
}]
|
|
||||||
Input::EVENT_NEW
|
|
||||||
|
|
||||||
done
|
|
|
@ -1,58 +0,0 @@
|
||||||
#
|
|
||||||
# @TEST-EXEC: cp input1.log input.log
|
|
||||||
# @TEST-EXEC: btest-bg-run bro bro -b %INPUT
|
|
||||||
# @TEST-EXEC: sleep 3
|
|
||||||
# @TEST-EXEC: cat input2.log >> input.log
|
|
||||||
# @TEST-EXEC: sleep 3
|
|
||||||
# @TEST-EXEC: cat input3.log >> input.log
|
|
||||||
# @TEST-EXEC: btest-bg-wait -k 3
|
|
||||||
# @TEST-EXEC: btest-diff out
|
|
||||||
|
|
||||||
@TEST-START-FILE input1.log
|
|
||||||
sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
|
|
||||||
@TEST-END-FILE
|
|
||||||
|
|
||||||
@TEST-START-FILE input2.log
|
|
||||||
DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
|
|
||||||
q3r3057fdf
|
|
||||||
@TEST-END-FILE
|
|
||||||
|
|
||||||
@TEST-START-FILE input3.log
|
|
||||||
sdfs\d
|
|
||||||
|
|
||||||
dfsdf
|
|
||||||
sdf
|
|
||||||
3rw43wRRERLlL#RWERERERE.
|
|
||||||
|
|
||||||
@TEST-END-FILE
|
|
||||||
|
|
||||||
@load frameworks/communication/listen
|
|
||||||
|
|
||||||
module A;
|
|
||||||
|
|
||||||
type Val: record {
|
|
||||||
s: string;
|
|
||||||
};
|
|
||||||
|
|
||||||
global try: count;
|
|
||||||
global outfile: file;
|
|
||||||
|
|
||||||
event line(description: Input::EventDescription, tpe: Input::Event, s: string) {
|
|
||||||
print outfile, description;
|
|
||||||
print outfile, tpe;
|
|
||||||
print outfile, s;
|
|
||||||
try = try + 1;
|
|
||||||
|
|
||||||
if ( try == 9 ) {
|
|
||||||
print outfile, "done";
|
|
||||||
close(outfile);
|
|
||||||
Input::remove("input");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
event bro_init()
|
|
||||||
{
|
|
||||||
outfile = open ("../out");
|
|
||||||
try = 0;
|
|
||||||
Input::add_event([$source="tail -f ../input.log |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input", $fields=Val, $ev=line]);
|
|
||||||
}
|
|
Loading…
Add table
Add a link
Reference in a new issue