rename the update_finished event to end_of_data and make it fire in

more cases.

It will now not only fire after table-reads have been completed,
but also after the last event of a whole-file-read (or whole-db-read, etc.).

The interface also has been extended a bit to allow readers to
directly fire the event should they so choose. This allows the
event to be fired in direct table-setting/event-sending modes,
which was previously not possible.
This commit is contained in:
Bernhard Amann 2012-10-10 11:51:20 -07:00
parent 296686d5ca
commit a6d87fcab7
29 changed files with 101 additions and 94 deletions

View file

@ -98,12 +98,12 @@ been completed. Because of this, it is, for example, possible to call
will remain queued until the first read has been completed. will remain queued until the first read has been completed.
Once the input framework finishes reading from a data source, it fires Once the input framework finishes reading from a data source, it fires
the ``update_finished`` event. Once this event has been received all data the ``end_of_data`` event. Once this event has been received all data
from the input file is available in the table. from the input file is available in the table.
.. code:: bro .. code:: bro
event Input::update_finished(name: string, source: string) { event Input::end_of_data(name: string, source: string) {
# now all data is in the table # now all data is in the table
print blacklist; print blacklist;
} }
@ -129,7 +129,7 @@ deal with changing data files.
The first, very basic method is an explicit refresh of an input stream. When The first, very basic method is an explicit refresh of an input stream. When
an input stream is open, the function ``force_update`` can be called. This an input stream is open, the function ``force_update`` can be called. This
will trigger a complete refresh of the table; any changed elements from the will trigger a complete refresh of the table; any changed elements from the
file will be updated. After the update is finished the ``update_finished`` file will be updated. After the update is finished the ``end_of_data``
event will be raised. event will be raised.
In our example the call would look like: In our example the call would look like:
@ -142,7 +142,7 @@ The input framework also supports two automatic refresh modes. The first mode
continually checks if a file has been changed. If the file has been changed, it continually checks if a file has been changed. If the file has been changed, it
is re-read and the data in the Bro table is updated to reflect the current is re-read and the data in the Bro table is updated to reflect the current
state. Each time a change has been detected and all the new data has been state. Each time a change has been detected and all the new data has been
read into the table, the ``update_finished`` event is raised. read into the table, the ``end_of_data`` event is raised.
The second mode is a streaming mode. This mode assumes that the source data The second mode is a streaming mode. This mode assumes that the source data
file is an append-only file to which new data is continually appended. Bro file is an append-only file to which new data is continually appended. Bro
@ -150,7 +150,7 @@ continually checks for new data at the end of the file and will add the new
data to the table. If newer lines in the file have the same index as previous data to the table. If newer lines in the file have the same index as previous
lines, they will overwrite the values in the output table. Because of the lines, they will overwrite the values in the output table. Because of the
nature of streaming reads (data is continually added to the table), nature of streaming reads (data is continually added to the table),
the ``update_finished`` event is never raised when using streaming reads. the ``end_of_data`` event is never raised when using streaming reads.
The reading mode can be selected by setting the ``mode`` option of the The reading mode can be selected by setting the ``mode`` option of the
add_table call. Valid values are ``MANUAL`` (the default), ``REREAD`` add_table call. Valid values are ``MANUAL`` (the default), ``REREAD``

View file

@ -125,8 +125,8 @@ export {
## id: string value identifying the stream ## id: string value identifying the stream
global force_update: function(id: string) : bool; global force_update: function(id: string) : bool;
## Event that is called, when the update of a specific source is finished ## Event that is called, when the end of a data source has been reached, usually after an update
global update_finished: event(name: string, source:string); global end_of_data: event(name: string, source:string);
} }
@load base/input.bif @load base/input.bif

View file

@ -196,7 +196,7 @@ Manager::TableStream::~TableStream()
Manager::Manager() Manager::Manager()
{ {
update_finished = internal_handler("Input::update_finished"); end_of_data = internal_handler("Input::end_of_data");
} }
Manager::~Manager() Manager::~Manager()
@ -1169,8 +1169,12 @@ void Manager::EndCurrentSend(ReaderFrontend* reader)
DBG_LOG(DBG_INPUT, "Got EndCurrentSend stream %s", i->name.c_str()); DBG_LOG(DBG_INPUT, "Got EndCurrentSend stream %s", i->name.c_str());
#endif #endif
if ( i->stream_type == EVENT_STREAM ) // nothing to do.. if ( i->stream_type == EVENT_STREAM )
{
// just signal the end of the data source
SendEndOfData(i);
return; return;
}
assert(i->stream_type == TABLE_STREAM); assert(i->stream_type == TABLE_STREAM);
TableStream* stream = (TableStream*) i; TableStream* stream = (TableStream*) i;
@ -1251,12 +1255,27 @@ void Manager::EndCurrentSend(ReaderFrontend* reader)
stream->currDict->SetDeleteFunc(input_hash_delete_func); stream->currDict->SetDeleteFunc(input_hash_delete_func);
#ifdef DEBUG #ifdef DEBUG
DBG_LOG(DBG_INPUT, "EndCurrentSend complete for stream %s, queueing update_finished event", DBG_LOG(DBG_INPUT, "EndCurrentSend complete for stream %s",
i->name.c_str()); i->name.c_str());
#endif #endif
// Send event that the current update is indeed finished. SendEndOfData(i);
SendEvent(update_finished, 2, new StringVal(i->name.c_str()), new StringVal(i->info->source)); }
void Manager::SendEndOfData(ReaderFrontend* reader) {
Stream *i = FindStream(reader);
if ( i == 0 )
{
reporter->InternalError("Unknown reader in SendEndOfData");
return;
}
SendEndOfData(i);
}
void Manager::SendEndOfData(const Stream *i) {
SendEvent(end_of_data, 2, new StringVal(i->name.c_str()), new StringVal(i->info->source));
} }
void Manager::Put(ReaderFrontend* reader, Value* *vals) void Manager::Put(ReaderFrontend* reader, Value* *vals)

View file

@ -89,6 +89,7 @@ protected:
friend class EndCurrentSendMessage; friend class EndCurrentSendMessage;
friend class ReaderClosedMessage; friend class ReaderClosedMessage;
friend class DisableMessage; friend class DisableMessage;
friend class EndOfDataMessage;
// For readers to write to input stream in direct mode (reporting // For readers to write to input stream in direct mode (reporting
// new/deleted values directly). Functions take ownership of // new/deleted values directly). Functions take ownership of
@ -96,6 +97,9 @@ protected:
void Put(ReaderFrontend* reader, threading::Value* *vals); void Put(ReaderFrontend* reader, threading::Value* *vals);
void Clear(ReaderFrontend* reader); void Clear(ReaderFrontend* reader);
bool Delete(ReaderFrontend* reader, threading::Value* *vals); bool Delete(ReaderFrontend* reader, threading::Value* *vals);
// Trigger sending the End-of-Data event when the input source has
// finished reading. Just use in direct mode.
void SendEndOfData(ReaderFrontend* reader);
// For readers to write to input stream in indirect mode (manager is // For readers to write to input stream in indirect mode (manager is
// monitoring new/deleted values) Functions take ownership of // monitoring new/deleted values) Functions take ownership of
@ -154,16 +158,19 @@ private:
// equivalend in threading cannot be used, because we have support // equivalend in threading cannot be used, because we have support
// different types from the log framework // different types from the log framework
bool IsCompatibleType(BroType* t, bool atomic_only=false); bool IsCompatibleType(BroType* t, bool atomic_only=false);
// Check if a record is made up of compatible types and return a list // Check if a record is made up of compatible types and return a list
// of all fields that are in the record in order. Recursively unrolls // of all fields that are in the record in order. Recursively unrolls
// records // records
bool UnrollRecordType(vector<threading::Field*> *fields, const RecordType *rec, const string& nameprepend, bool allow_file_func); bool UnrollRecordType(vector<threading::Field*> *fields, const RecordType *rec, const string& nameprepend, bool allow_file_func);
// Send events // Send events
void SendEvent(EventHandlerPtr ev, const int numvals, ...); void SendEvent(EventHandlerPtr ev, const int numvals, ...);
void SendEvent(EventHandlerPtr ev, list<Val*> events); void SendEvent(EventHandlerPtr ev, list<Val*> events);
// Implementation of SendEndOfData (send end_of_data event)
void SendEndOfData(const Stream *i);
// Call predicate function and return result. // Call predicate function and return result.
bool CallPred(Func* pred_func, const int numvals, ...); bool CallPred(Func* pred_func, const int numvals, ...);
@ -193,6 +200,7 @@ private:
// Converts a Bro ListVal to a RecordVal given the record type. // Converts a Bro ListVal to a RecordVal given the record type.
RecordVal* ListValToRecordVal(ListVal* list, RecordType *request_type, int* position); RecordVal* ListValToRecordVal(ListVal* list, RecordType *request_type, int* position);
Stream* FindStream(const string &name); Stream* FindStream(const string &name);
Stream* FindStream(ReaderFrontend* reader); Stream* FindStream(ReaderFrontend* reader);
@ -200,7 +208,7 @@ private:
map<ReaderFrontend*, Stream*> readers; map<ReaderFrontend*, Stream*> readers;
EventHandlerPtr update_finished; EventHandlerPtr end_of_data;
}; };

View file

@ -108,6 +108,20 @@ public:
private: private:
}; };
class EndOfDataMessage : public threading::OutputMessage<ReaderFrontend> {
public:
EndOfDataMessage(ReaderFrontend* reader)
: threading::OutputMessage<ReaderFrontend>("EndOfData", reader) {}
virtual bool Process()
{
input_mgr->SendEndOfData(Object());
return true;
}
private:
};
class ReaderClosedMessage : public threading::OutputMessage<ReaderFrontend> { class ReaderClosedMessage : public threading::OutputMessage<ReaderFrontend> {
public: public:
ReaderClosedMessage(ReaderFrontend* reader) ReaderClosedMessage(ReaderFrontend* reader)
@ -183,6 +197,11 @@ void ReaderBackend::EndCurrentSend()
SendOut(new EndCurrentSendMessage(frontend)); SendOut(new EndCurrentSendMessage(frontend));
} }
void ReaderBackend::EndOfData()
{
SendOut(new EndOfDataMessage(frontend));
}
void ReaderBackend::SendEntry(Value* *vals) void ReaderBackend::SendEntry(Value* *vals)
{ {
SendOut(new SendEntryMessage(frontend, vals)); SendOut(new SendEntryMessage(frontend, vals));

View file

@ -281,6 +281,16 @@ protected:
*/ */
void Clear(); void Clear();
/**
* Method telling the manager that we finished reading the current
* data source. Will trigger an end_of_data event.
*
* Note: When using SendEntry/the tracking mode this is triggered
* automatically by EndCurrentSend(). Only use if not using the
* tracking mode. Otherwise the event will be sent twice.
*/
void EndOfData();
// Content-sending-functions (tracking mode): Only changed lines are propagated. // Content-sending-functions (tracking mode): Only changed lines are propagated.
/** /**

View file

@ -4,13 +4,6 @@ print outfile, A::description;
print outfile, A::tpe; print outfile, A::tpe;
print outfile, A::i; print outfile, A::i;
print outfile, A::b; print outfile, A::b;
try = try + 1;
if (7 == try)
{
close(outfile);
terminate();
}
}, config={ }, config={
}] }]
@ -23,13 +16,6 @@ print outfile, A::description;
print outfile, A::tpe; print outfile, A::tpe;
print outfile, A::i; print outfile, A::i;
print outfile, A::b; print outfile, A::b;
try = try + 1;
if (7 == try)
{
close(outfile);
terminate();
}
}, config={ }, config={
}] }]
@ -42,13 +28,6 @@ print outfile, A::description;
print outfile, A::tpe; print outfile, A::tpe;
print outfile, A::i; print outfile, A::i;
print outfile, A::b; print outfile, A::b;
try = try + 1;
if (7 == try)
{
close(outfile);
terminate();
}
}, config={ }, config={
}] }]
@ -61,13 +40,6 @@ print outfile, A::description;
print outfile, A::tpe; print outfile, A::tpe;
print outfile, A::i; print outfile, A::i;
print outfile, A::b; print outfile, A::b;
try = try + 1;
if (7 == try)
{
close(outfile);
terminate();
}
}, config={ }, config={
}] }]
@ -80,13 +52,6 @@ print outfile, A::description;
print outfile, A::tpe; print outfile, A::tpe;
print outfile, A::i; print outfile, A::i;
print outfile, A::b; print outfile, A::b;
try = try + 1;
if (7 == try)
{
close(outfile);
terminate();
}
}, config={ }, config={
}] }]
@ -99,13 +64,6 @@ print outfile, A::description;
print outfile, A::tpe; print outfile, A::tpe;
print outfile, A::i; print outfile, A::i;
print outfile, A::b; print outfile, A::b;
try = try + 1;
if (7 == try)
{
close(outfile);
terminate();
}
}, config={ }, config={
}] }]
@ -118,16 +76,10 @@ print outfile, A::description;
print outfile, A::tpe; print outfile, A::tpe;
print outfile, A::i; print outfile, A::i;
print outfile, A::b; print outfile, A::b;
try = try + 1;
if (7 == try)
{
close(outfile);
terminate();
}
}, config={ }, config={
}] }]
Input::EVENT_NEW Input::EVENT_NEW
7 7
T T
End-of-data

View file

@ -55,7 +55,7 @@ event bro_init()
Input::remove("ssh"); Input::remove("ssh");
} }
event Input::update_finished(name: string, source:string) event Input::end_of_data(name: string, source:string)
{ {
print outfile, servers; print outfile, servers;
print outfile, to_count(servers[-42]$ns); # try to actually use a string. If null-termination is wrong this will fail. print outfile, to_count(servers[-42]$ns); # try to actually use a string. If null-termination is wrong this will fail.

View file

@ -37,7 +37,7 @@ event bro_init()
Input::remove("ssh"); Input::remove("ssh");
} }
event Input::update_finished(name: string, source:string) event Input::end_of_data(name: string, source:string)
{ {
print outfile, servers; print outfile, servers;
close(outfile); close(outfile);

View file

@ -73,7 +73,7 @@ event bro_init()
} }
event Input::update_finished(name: string, source: string) event Input::end_of_data(name: string, source: string)
{ {
print outfile, "==========SERVERS============"; print outfile, "==========SERVERS============";
print outfile, servers; print outfile, servers;

View file

@ -40,7 +40,7 @@ event bro_init()
Input::remove("ssh"); Input::remove("ssh");
} }
event Input::update_finished(name: string, source:string) event Input::end_of_data(name: string, source:string)
{ {
print outfile, servers; print outfile, servers;
close(outfile); close(outfile);

View file

@ -22,7 +22,6 @@
@load frameworks/communication/listen @load frameworks/communication/listen
global outfile: file; global outfile: file;
global try: count;
module A; module A;
@ -37,18 +36,18 @@ event line(description: Input::EventDescription, tpe: Input::Event, i: int, b: b
print outfile, tpe; print outfile, tpe;
print outfile, i; print outfile, i;
print outfile, b; print outfile, b;
try = try + 1;
if ( try == 7 )
{
close(outfile);
terminate();
}
} }
event bro_init() event bro_init()
{ {
try = 0;
outfile = open("../out"); outfile = open("../out");
Input::add_event([$source="../input.log", $name="input", $fields=Val, $ev=line, $want_record=F]); Input::add_event([$source="../input.log", $name="input", $fields=Val, $ev=line, $want_record=F]);
Input::remove("input"); Input::remove("input");
} }
event Input::end_of_data(name: string, source:string)
{
print outfile, "End-of-data";
close(outfile);
terminate();
}

View file

@ -41,7 +41,7 @@ event bro_init()
Input::remove("ssh"); Input::remove("ssh");
} }
event Input::update_finished(name: string, source:string) event Input::end_of_data(name: string, source:string)
{ {
print outfile, servers; print outfile, servers;
terminate(); terminate();

View file

@ -38,7 +38,7 @@ event bro_init()
Input::remove("input"); Input::remove("input");
} }
event Input::update_finished(name: string, source: string) event Input::end_of_data(name: string, source: string)
{ {
print outfile, servers; print outfile, servers;
close(outfile); close(outfile);

View file

@ -38,7 +38,7 @@ event bro_init()
Input::remove("input"); Input::remove("input");
} }
event Input::update_finished(name: string, source: string) event Input::end_of_data(name: string, source: string)
{ {
print outfile, servers; print outfile, servers;
close(outfile); close(outfile);

View file

@ -48,7 +48,7 @@ event bro_init()
Input::remove("input"); Input::remove("input");
} }
event Input::update_finished(name: string, source: string) event Input::end_of_data(name: string, source: string)
{ {
print outfile, servers; print outfile, servers;
close(outfile); close(outfile);

View file

@ -43,7 +43,7 @@ event bro_init()
Input::remove("input"); Input::remove("input");
} }
event Input::update_finished(name: string, source: string) event Input::end_of_data(name: string, source: string)
{ {
print outfile, servers[1.2.3.4]; print outfile, servers[1.2.3.4];
print outfile, servers[1.2.3.5]; print outfile, servers[1.2.3.5];

View file

@ -47,7 +47,7 @@ event bro_init()
Input::remove("input"); Input::remove("input");
} }
event Input::update_finished(name: string, source: string) event Input::end_of_data(name: string, source: string)
{ {
if ( 1 in servers ) if ( 1 in servers )
print outfile, "VALID"; print outfile, "VALID";

View file

@ -51,7 +51,7 @@ event bro_init()
Input::remove("input"); Input::remove("input");
} }
event Input::update_finished(name: string, source: string) event Input::end_of_data(name: string, source: string)
{ {
print outfile, servers; print outfile, servers;
close(outfile); close(outfile);

View file

@ -94,7 +94,7 @@ event bro_init()
]); ]);
} }
event Input::update_finished(name: string, source: string) event Input::end_of_data(name: string, source: string)
{ {
try = try + 1; try = try + 1;
print outfile, fmt("Update_finished for %s, try %d", name, try); print outfile, fmt("Update_finished for %s, try %d", name, try);

View file

@ -48,7 +48,7 @@ event bro_init()
Input::remove("input"); Input::remove("input");
} }
event Input::update_finished(name: string, source: string) event Input::end_of_data(name: string, source: string)
{ {
print outfile, servers; print outfile, servers;
close(outfile); close(outfile);

View file

@ -45,7 +45,7 @@ event bro_init()
} }
} }
event Input::update_finished(name: string, source: string) event Input::end_of_data(name: string, source: string)
{ {
print outfile, name; print outfile, name;
print outfile, source; print outfile, source;

View file

@ -123,7 +123,7 @@ event bro_init()
} }
event Input::update_finished(name: string, source: string) event Input::end_of_data(name: string, source: string)
{ {
print outfile, "==========SERVERS============"; print outfile, "==========SERVERS============";
print outfile, servers; print outfile, servers;

View file

@ -38,7 +38,7 @@ event bro_init()
Input::remove("ssh"); Input::remove("ssh");
} }
event Input::update_finished(name: string, source:string) event Input::end_of_data(name: string, source:string)
{ {
print outfile, servers; print outfile, servers;
close(outfile); close(outfile);

View file

@ -38,7 +38,7 @@ event bro_init()
Input::remove("ssh"); Input::remove("ssh");
} }
event Input::update_finished(name: string, source:string) event Input::end_of_data(name: string, source:string)
{ {
print outfile, servers; print outfile, servers;
close(outfile); close(outfile);

View file

@ -42,7 +42,7 @@ event bro_init()
Input::remove("ssh"); Input::remove("ssh");
} }
event Input::update_finished(name: string, source:string) event Input::end_of_data(name: string, source:string)
{ {
print outfile, servers; print outfile, servers;
close(outfile); close(outfile);

View file

@ -62,7 +62,7 @@ event bro_init()
Input::remove("ssh"); Input::remove("ssh");
} }
event Input::update_finished(name: string, source:string) event Input::end_of_data(name: string, source:string)
{ {
print outfile, servers; print outfile, servers;
close(outfile); close(outfile);

View file

@ -113,7 +113,7 @@ event bro_init()
} }
event Input::update_finished(name: string, source: string) event Input::end_of_data(name: string, source: string)
{ {
print fin_out, "==========SERVERS============"; print fin_out, "==========SERVERS============";
#print fin_out, servers; #print fin_out, servers;

View file

@ -56,7 +56,7 @@ event bro_init()
Input::remove("ssh"); Input::remove("ssh");
} }
event Input::update_finished(name: string, source:string) event Input::end_of_data(name: string, source:string)
{ {
print outfile, servers; print outfile, servers;
close(outfile); close(outfile);