Merge remote-tracking branch 'origin/topic/bernhard/input-end-of-data'

* origin/topic/bernhard/input-end-of-data:
  rename the update_finished event to end_of_data and make it fire in more cases.

Closes #894.
This commit is contained in:
Robin Sommer 2012-10-12 09:46:41 -07:00
commit 503412e472
32 changed files with 119 additions and 95 deletions

10
CHANGES
View file

@ -1,4 +1,14 @@
2.1-68 | 2012-10-12 09:46:41 -0700
* Rename the Input Framework's update_finished event to end_of_data.
It will now not only fire after table-reads have been completed,
but also after the last event of a whole-file-read (or
whole-db-read, etc.). (Bernhard Amann)
* Fix for DNS log problem when a DNS response is seen with 0 RRs.
(Seth Hall)
2.1-64 | 2012-10-12 09:36:41 -0700
* Teach --disable-dataseries/--disable-elasticsearch to ./configure.

5
NEWS
View file

@ -32,6 +32,11 @@ Changed Functionality
- "this" is no longer a reserved keyword.
- The Input Framework's update_finished event has been renamed to
end_of_data. It will now not only fire after table-reads have been
completed, but also after the last event of a whole-file-read (or
whole-db-read, etc.).
Bro 2.1
-------

View file

@ -1 +1 @@
2.1-64
2.1-68

View file

@ -98,12 +98,12 @@ been completed. Because of this, it is, for example, possible to call
will remain queued until the first read has been completed.
Once the input framework finishes reading from a data source, it fires
the ``update_finished`` event. Once this event has been received all data
the ``end_of_data`` event. Once this event has been received all data
from the input file is available in the table.
.. code:: bro
event Input::update_finished(name: string, source: string) {
event Input::end_of_data(name: string, source: string) {
# now all data is in the table
print blacklist;
}
@ -129,7 +129,7 @@ deal with changing data files.
The first, very basic method is an explicit refresh of an input stream. When
an input stream is open, the function ``force_update`` can be called. This
will trigger a complete refresh of the table; any changed elements from the
file will be updated. After the update is finished the ``update_finished``
file will be updated. After the update is finished the ``end_of_data``
event will be raised.
In our example the call would look like:
@ -142,7 +142,7 @@ The input framework also supports two automatic refresh modes. The first mode
continually checks if a file has been changed. If the file has been changed, it
is re-read and the data in the Bro table is updated to reflect the current
state. Each time a change has been detected and all the new data has been
read into the table, the ``update_finished`` event is raised.
read into the table, the ``end_of_data`` event is raised.
The second mode is a streaming mode. This mode assumes that the source data
file is an append-only file to which new data is continually appended. Bro
@ -150,7 +150,7 @@ continually checks for new data at the end of the file and will add the new
data to the table. If newer lines in the file have the same index as previous
lines, they will overwrite the values in the output table. Because of the
nature of streaming reads (data is continually added to the table),
the ``update_finished`` event is never raised when using streaming reads.
the ``end_of_data`` event is never raised when using streaming reads.
The reading mode can be selected by setting the ``mode`` option of the
add_table call. Valid values are ``MANUAL`` (the default), ``REREAD``

View file

@ -114,7 +114,8 @@ export {
## description: `TableDescription` record describing the source.
global add_event: function(description: Input::EventDescription) : bool;
## Remove a input stream. Returns true on success and false if the named stream was not found.
## Remove a input stream. Returns true on success and false if the named stream was
## not found.
##
## id: string value identifying the stream to be removed
global remove: function(id: string) : bool;
@ -125,8 +126,9 @@ export {
## id: string value identifying the stream
global force_update: function(id: string) : bool;
## Event that is called, when the update of a specific source is finished
global update_finished: event(name: string, source:string);
## Event that is called, when the end of a data source has been reached, including
## after an update.
global end_of_data: event(name: string, source:string);
}
@load base/input.bif

View file

@ -196,7 +196,7 @@ Manager::TableStream::~TableStream()
Manager::Manager()
{
update_finished = internal_handler("Input::update_finished");
end_of_data = internal_handler("Input::end_of_data");
}
Manager::~Manager()
@ -1172,8 +1172,12 @@ void Manager::EndCurrentSend(ReaderFrontend* reader)
DBG_LOG(DBG_INPUT, "Got EndCurrentSend stream %s", i->name.c_str());
#endif
if ( i->stream_type == EVENT_STREAM ) // nothing to do..
if ( i->stream_type == EVENT_STREAM )
{
// just signal the end of the data source
SendEndOfData(i);
return;
}
assert(i->stream_type == TABLE_STREAM);
TableStream* stream = (TableStream*) i;
@ -1254,12 +1258,29 @@ void Manager::EndCurrentSend(ReaderFrontend* reader)
stream->currDict->SetDeleteFunc(input_hash_delete_func);
#ifdef DEBUG
DBG_LOG(DBG_INPUT, "EndCurrentSend complete for stream %s, queueing update_finished event",
DBG_LOG(DBG_INPUT, "EndCurrentSend complete for stream %s",
i->name.c_str());
#endif
// Send event that the current update is indeed finished.
SendEvent(update_finished, 2, new StringVal(i->name.c_str()), new StringVal(i->info->source));
SendEndOfData(i);
}
void Manager::SendEndOfData(ReaderFrontend* reader)
{
Stream *i = FindStream(reader);
if ( i == 0 )
{
reporter->InternalError("Unknown reader in SendEndOfData");
return;
}
SendEndOfData(i);
}
void Manager::SendEndOfData(const Stream *i)
{
SendEvent(end_of_data, 2, new StringVal(i->name.c_str()), new StringVal(i->info->source));
}
void Manager::Put(ReaderFrontend* reader, Value* *vals)

View file

@ -89,6 +89,7 @@ protected:
friend class EndCurrentSendMessage;
friend class ReaderClosedMessage;
friend class DisableMessage;
friend class EndOfDataMessage;
// For readers to write to input stream in direct mode (reporting
// new/deleted values directly). Functions take ownership of
@ -96,6 +97,9 @@ protected:
void Put(ReaderFrontend* reader, threading::Value* *vals);
void Clear(ReaderFrontend* reader);
bool Delete(ReaderFrontend* reader, threading::Value* *vals);
// Trigger sending the End-of-Data event when the input source has
// finished reading. Just use in direct mode.
void SendEndOfData(ReaderFrontend* reader);
// For readers to write to input stream in indirect mode (manager is
// monitoring new/deleted values) Functions take ownership of
@ -154,7 +158,6 @@ private:
// equivalend in threading cannot be used, because we have support
// different types from the log framework
bool IsCompatibleType(BroType* t, bool atomic_only=false);
// Check if a record is made up of compatible types and return a list
// of all fields that are in the record in order. Recursively unrolls
// records
@ -164,6 +167,9 @@ private:
void SendEvent(EventHandlerPtr ev, const int numvals, ...);
void SendEvent(EventHandlerPtr ev, list<Val*> events);
// Implementation of SendEndOfData (send end_of_data event).
void SendEndOfData(const Stream *i);
// Call predicate function and return result.
bool CallPred(Func* pred_func, const int numvals, ...);
@ -200,7 +206,7 @@ private:
map<ReaderFrontend*, Stream*> readers;
EventHandlerPtr update_finished;
EventHandlerPtr end_of_data;
};

View file

@ -108,6 +108,20 @@ public:
private:
};
class EndOfDataMessage : public threading::OutputMessage<ReaderFrontend> {
public:
EndOfDataMessage(ReaderFrontend* reader)
: threading::OutputMessage<ReaderFrontend>("EndOfData", reader) {}
virtual bool Process()
{
input_mgr->SendEndOfData(Object());
return true;
}
private:
};
class ReaderClosedMessage : public threading::OutputMessage<ReaderFrontend> {
public:
ReaderClosedMessage(ReaderFrontend* reader)
@ -183,6 +197,11 @@ void ReaderBackend::EndCurrentSend()
SendOut(new EndCurrentSendMessage(frontend));
}
void ReaderBackend::EndOfData()
{
SendOut(new EndOfDataMessage(frontend));
}
void ReaderBackend::SendEntry(Value* *vals)
{
SendOut(new SendEntryMessage(frontend, vals));

View file

@ -281,6 +281,16 @@ protected:
*/
void Clear();
/**
* Method telling the manager that we finished reading the current
* data source. Will trigger an end_of_data event.
*
* Note: When using SendEntry as the tracking mode this is triggered
* automatically by EndCurrentSend(). Only use if not using the
* tracking mode. Otherwise the event will be sent twice.
*/
void EndOfData();
// Content-sending-functions (tracking mode): Only changed lines are propagated.
/**

View file

@ -4,13 +4,6 @@ print outfile, A::description;
print outfile, A::tpe;
print outfile, A::i;
print outfile, A::b;
try = try + 1;
if (7 == try)
{
close(outfile);
terminate();
}
}, config={
}]
@ -23,13 +16,6 @@ print outfile, A::description;
print outfile, A::tpe;
print outfile, A::i;
print outfile, A::b;
try = try + 1;
if (7 == try)
{
close(outfile);
terminate();
}
}, config={
}]
@ -42,13 +28,6 @@ print outfile, A::description;
print outfile, A::tpe;
print outfile, A::i;
print outfile, A::b;
try = try + 1;
if (7 == try)
{
close(outfile);
terminate();
}
}, config={
}]
@ -61,13 +40,6 @@ print outfile, A::description;
print outfile, A::tpe;
print outfile, A::i;
print outfile, A::b;
try = try + 1;
if (7 == try)
{
close(outfile);
terminate();
}
}, config={
}]
@ -80,13 +52,6 @@ print outfile, A::description;
print outfile, A::tpe;
print outfile, A::i;
print outfile, A::b;
try = try + 1;
if (7 == try)
{
close(outfile);
terminate();
}
}, config={
}]
@ -99,13 +64,6 @@ print outfile, A::description;
print outfile, A::tpe;
print outfile, A::i;
print outfile, A::b;
try = try + 1;
if (7 == try)
{
close(outfile);
terminate();
}
}, config={
}]
@ -118,16 +76,10 @@ print outfile, A::description;
print outfile, A::tpe;
print outfile, A::i;
print outfile, A::b;
try = try + 1;
if (7 == try)
{
close(outfile);
terminate();
}
}, config={
}]
Input::EVENT_NEW
7
T
End-of-data

View file

@ -55,7 +55,7 @@ event bro_init()
Input::remove("ssh");
}
event Input::update_finished(name: string, source:string)
event Input::end_of_data(name: string, source:string)
{
print outfile, servers;
print outfile, to_count(servers[-42]$ns); # try to actually use a string. If null-termination is wrong this will fail.

View file

@ -37,7 +37,7 @@ event bro_init()
Input::remove("ssh");
}
event Input::update_finished(name: string, source:string)
event Input::end_of_data(name: string, source:string)
{
print outfile, servers;
close(outfile);

View file

@ -73,7 +73,7 @@ event bro_init()
}
event Input::update_finished(name: string, source: string)
event Input::end_of_data(name: string, source: string)
{
print outfile, "==========SERVERS============";
print outfile, servers;

View file

@ -40,7 +40,7 @@ event bro_init()
Input::remove("ssh");
}
event Input::update_finished(name: string, source:string)
event Input::end_of_data(name: string, source:string)
{
print outfile, servers;
close(outfile);

View file

@ -22,7 +22,6 @@
@load frameworks/communication/listen
global outfile: file;
global try: count;
module A;
@ -37,18 +36,18 @@ event line(description: Input::EventDescription, tpe: Input::Event, i: int, b: b
print outfile, tpe;
print outfile, i;
print outfile, b;
try = try + 1;
if ( try == 7 )
{
close(outfile);
terminate();
}
}
event bro_init()
{
try = 0;
outfile = open("../out");
Input::add_event([$source="../input.log", $name="input", $fields=Val, $ev=line, $want_record=F]);
Input::remove("input");
}
event Input::end_of_data(name: string, source:string)
{
print outfile, "End-of-data";
close(outfile);
terminate();
}

View file

@ -41,7 +41,7 @@ event bro_init()
Input::remove("ssh");
}
event Input::update_finished(name: string, source:string)
event Input::end_of_data(name: string, source:string)
{
print outfile, servers;
terminate();

View file

@ -38,7 +38,7 @@ event bro_init()
Input::remove("input");
}
event Input::update_finished(name: string, source: string)
event Input::end_of_data(name: string, source: string)
{
print outfile, servers;
close(outfile);

View file

@ -38,7 +38,7 @@ event bro_init()
Input::remove("input");
}
event Input::update_finished(name: string, source: string)
event Input::end_of_data(name: string, source: string)
{
print outfile, servers;
close(outfile);

View file

@ -48,7 +48,7 @@ event bro_init()
Input::remove("input");
}
event Input::update_finished(name: string, source: string)
event Input::end_of_data(name: string, source: string)
{
print outfile, servers;
close(outfile);

View file

@ -43,7 +43,7 @@ event bro_init()
Input::remove("input");
}
event Input::update_finished(name: string, source: string)
event Input::end_of_data(name: string, source: string)
{
print outfile, servers[1.2.3.4];
print outfile, servers[1.2.3.5];

View file

@ -47,7 +47,7 @@ event bro_init()
Input::remove("input");
}
event Input::update_finished(name: string, source: string)
event Input::end_of_data(name: string, source: string)
{
if ( 1 in servers )
print outfile, "VALID";

View file

@ -51,7 +51,7 @@ event bro_init()
Input::remove("input");
}
event Input::update_finished(name: string, source: string)
event Input::end_of_data(name: string, source: string)
{
print outfile, servers;
close(outfile);

View file

@ -94,7 +94,7 @@ event bro_init()
]);
}
event Input::update_finished(name: string, source: string)
event Input::end_of_data(name: string, source: string)
{
try = try + 1;
print outfile, fmt("Update_finished for %s, try %d", name, try);

View file

@ -48,7 +48,7 @@ event bro_init()
Input::remove("input");
}
event Input::update_finished(name: string, source: string)
event Input::end_of_data(name: string, source: string)
{
print outfile, servers;
close(outfile);

View file

@ -45,7 +45,7 @@ event bro_init()
}
}
event Input::update_finished(name: string, source: string)
event Input::end_of_data(name: string, source: string)
{
print outfile, name;
print outfile, source;

View file

@ -123,7 +123,7 @@ event bro_init()
}
event Input::update_finished(name: string, source: string)
event Input::end_of_data(name: string, source: string)
{
print outfile, "==========SERVERS============";
print outfile, servers;

View file

@ -38,7 +38,7 @@ event bro_init()
Input::remove("ssh");
}
event Input::update_finished(name: string, source:string)
event Input::end_of_data(name: string, source:string)
{
print outfile, servers;
close(outfile);

View file

@ -38,7 +38,7 @@ event bro_init()
Input::remove("ssh");
}
event Input::update_finished(name: string, source:string)
event Input::end_of_data(name: string, source:string)
{
print outfile, servers;
close(outfile);

View file

@ -42,7 +42,7 @@ event bro_init()
Input::remove("ssh");
}
event Input::update_finished(name: string, source:string)
event Input::end_of_data(name: string, source:string)
{
print outfile, servers;
close(outfile);

View file

@ -62,7 +62,7 @@ event bro_init()
Input::remove("ssh");
}
event Input::update_finished(name: string, source:string)
event Input::end_of_data(name: string, source:string)
{
print outfile, servers;
close(outfile);

View file

@ -113,7 +113,7 @@ event bro_init()
}
event Input::update_finished(name: string, source: string)
event Input::end_of_data(name: string, source: string)
{
print fin_out, "==========SERVERS============";
#print fin_out, servers;

View file

@ -56,7 +56,7 @@ event bro_init()
Input::remove("ssh");
}
event Input::update_finished(name: string, source:string)
event Input::end_of_data(name: string, source:string)
{
print outfile, servers;
close(outfile);