Rotation support.

This follows rather closely how rotation currently works in
rotate-logs.bro. logging.bro now defines:

        # Default rotation interval; zero disables rotation.
        const default_rotation_interval = 0secs &redef;

        # Default naming suffix format.
        const default_rotation_date_format = "%y-%m-%d_%H.%M.%S" &redef;

        # Default postprocessor for writers outputting into files.
        const default_rotation_postprocessor = "" &redef;

        # Default function to construct the name of the rotated file.
        # The default implementation includes
        # default_rotation_date_format into the file name.
        global default_rotation_path_func: function(info: RotationInfo) : string &redef;

Writer support for rotation is optional, usually it will only make
sense for file-based writers.

TODO: Currently, there's no way to customize rotation on a per file
basis, there are only the global defaults as described above.
Individual customization is coming next.
This commit is contained in:
Robin Sommer 2011-03-06 19:28:48 -08:00
parent 90af0d06c3
commit d6cef16f77
16 changed files with 387 additions and 68 deletions

View file

@ -3,6 +3,7 @@
#include "Event.h"
#include "EventHandler.h"
#include "NetVar.h"
#include "Net.h"
#include "LogWriterAscii.h"
@ -22,6 +23,7 @@ LogWriterDefinition log_writers[] = {
struct LogMgr::Filter {
string name;
EnumVal* id;
Func* pred;
Func* path_func;
string path;
@ -37,6 +39,12 @@ struct LogMgr::Filter {
~Filter();
};
struct LogMgr::WriterInfo {
double open_time;
Timer* rotation_timer;
LogWriter *writer;
};
struct LogMgr::Stream {
EnumVal* id;
string name;
@ -45,7 +53,9 @@ struct LogMgr::Stream {
list<Filter*> filters;
typedef pair<int, string> WriterPathPair;
typedef map<WriterPathPair, LogWriter *> WriterMap;
typedef map<WriterPathPair, WriterInfo*> WriterMap;
WriterMap writers; // Writers indexed by id/path pair.
~Stream();
@ -213,7 +223,15 @@ LogMgr::Stream::~Stream()
Unref(columns);
for ( WriterMap::iterator i = writers.begin(); i != writers.end(); i++ )
{
WriterInfo* winfo = i->second;
if ( winfo->rotation_timer )
timer_mgr->Cancel(winfo->rotation_timer);
delete winfo->writer;
delete i->second;
}
for ( list<Filter*>::iterator f = filters.begin(); f != filters.end(); ++f )
delete *f;
@ -248,7 +266,7 @@ void LogMgr::RemoveDisabledWriters(Stream* stream)
for ( Stream::WriterMap::iterator j = stream->writers.begin(); j != stream->writers.end(); j++ )
{
if ( j->second->Disabled() )
if ( j->second->writer->Disabled() )
{
delete j->second;
disabled.push_back(j->first);
@ -413,16 +431,26 @@ bool LogMgr::AddFilter(EnumVal* id, RecordVal* fval)
// Create a new Filter instance.
Val* pred = fval->Lookup(rtype->FieldOffset("pred"));
Val* path_func = fval->Lookup(rtype->FieldOffset("path_func"));
Val* name = fval->LookupWithDefault(rtype->FieldOffset("name"));
Val* pred = fval->LookupWithDefault(rtype->FieldOffset("pred"));
Val* path_func = fval->LookupWithDefault(rtype->FieldOffset("path_func"));
Val* log_local = fval->LookupWithDefault(rtype->FieldOffset("log_local"));
Val* log_remote = fval->LookupWithDefault(rtype->FieldOffset("log_remote"));
Filter* filter = new Filter;
filter->name = fval->Lookup(rtype->FieldOffset("name"))->AsString()->CheckString();
filter->name = name->AsString()->CheckString();
filter->id = id->Ref()->AsEnumVal();
filter->pred = pred ? pred->AsFunc() : 0;
filter->path_func = path_func ? path_func->AsFunc() : 0;
filter->writer = writer->Ref()->AsEnumVal();
filter->local = fval->LookupWithDefault(rtype->FieldOffset("log_local"))->AsBool();
filter->remote = fval->LookupWithDefault(rtype->FieldOffset("log_remote"))->AsBool();
filter->local = log_local->AsBool();
filter->remote = log_remote->AsBool();
Unref(name);
Unref(pred);
Unref(path_func);
Unref(log_local);
Unref(log_remote);
// TODO: Check that the predciate is of the right type.
@ -580,7 +608,7 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
if ( w != stream->writers.end() )
// We have a writer already.
writer = w->second;
writer = w->second->writer;
else
{
// No, need to create one.
@ -669,10 +697,13 @@ LogVal** LogMgr::RecordToFilterVals(Filter* filter, RecordVal* columns)
case TYPE_COUNT:
case TYPE_COUNTER:
case TYPE_PORT:
vals[i]->val.uint_val = val->InternalUnsigned();
break;
case TYPE_PORT:
vals[i]->val.uint_val = val->AsPortVal()->Port();
break;
case TYPE_SUBNET:
vals[i]->val.subnet_val = *val->AsSubNet();
break;
@ -719,7 +750,7 @@ LogWriter* LogMgr::CreateWriter(EnumVal* id, EnumVal* writer, string path, int n
if ( w != stream->writers.end() )
// If we already have a writer for this. That's fine, we just return
// it.
return w->second;
return w->second->writer;
// Need to instantiate a new writer.
@ -766,7 +797,13 @@ LogWriter* LogMgr::CreateWriter(EnumVal* id, EnumVal* writer, string path, int n
return 0;
}
stream->writers.insert(Stream::WriterMap::value_type(Stream::WriterPathPair(writer->AsEnum(), path), writer_obj));
WriterInfo* winfo = new WriterInfo;
winfo->writer = writer_obj;
winfo->open_time = network_time;
winfo->rotation_timer = 0;
InstallRotationTimer(winfo);
stream->writers.insert(Stream::WriterMap::value_type(Stream::WriterPathPair(writer->AsEnum(), path), winfo));
return writer_obj;
}
@ -799,7 +836,7 @@ bool LogMgr::Write(EnumVal* id, EnumVal* writer, string path, int num_fields, Lo
return false;
}
bool success = w->second->Write(num_fields, vals);
bool success = w->second->writer->Write(num_fields, vals);
DBG_LOG(DBG_LOGGING, "Wrote pre-filtered record to path '%s' on stream '%s' [%s]", path.c_str(), stream->name.c_str(), (success ? "ok" : "error"));
@ -817,7 +854,7 @@ void LogMgr::SendAllWritersTo(RemoteSerializer::PeerID peer)
for ( Stream::WriterMap::iterator i = stream->writers.begin(); i != stream->writers.end(); i++ )
{
LogWriter* writer = i->second;
LogWriter* writer = i->second->writer;
EnumVal writer_val(i->first.first, BifType::Enum::Log::Writer);
remote_serializer->SendLogCreateWriter(peer, (*s)->id, &writer_val, i->first.second, writer->NumFields(), writer->Fields());
}
@ -831,7 +868,7 @@ bool LogMgr::SetBuf(EnumVal* id, bool enabled)
return false;
for ( Stream::WriterMap::iterator i = stream->writers.begin(); i != stream->writers.end(); i++ )
i->second->SetBuf(enabled);
i->second->writer->SetBuf(enabled);
RemoveDisabledWriters(stream);
@ -845,7 +882,7 @@ bool LogMgr::Flush(EnumVal* id)
return false;
for ( Stream::WriterMap::iterator i = stream->writers.begin(); i != stream->writers.end(); i++ )
i->second->Flush();
i->second->writer->Flush();
RemoveDisabledWriters(stream);
@ -857,3 +894,102 @@ void LogMgr::Error(LogWriter* writer, const char* msg)
run_time(fmt("error with writer for %s: %s", writer->Path().c_str(), msg));
}
// Timer which on dispatching rotates the filter.
class RotationTimer : public Timer {
public:
RotationTimer(double t, LogMgr::WriterInfo* arg_winfo, bool arg_rotate)
: Timer(t, TIMER_ROTATE) { winfo = arg_winfo; rotate = arg_rotate; }
~RotationTimer();
void Dispatch(double t, int is_expire);
protected:
LogMgr::WriterInfo* winfo;
bool rotate;
};
RotationTimer::~RotationTimer()
{
if ( winfo->rotation_timer == this )
winfo->rotation_timer = 0;
}
void RotationTimer::Dispatch(double t, int is_expire)
{
winfo->rotation_timer = 0;
if ( rotate )
log_mgr->Rotate(winfo);
if ( ! is_expire )
{
winfo->open_time = network_time;
log_mgr->InstallRotationTimer(winfo);
}
}
void LogMgr::InstallRotationTimer(WriterInfo* winfo)
{
if ( terminating )
return;
if ( winfo->rotation_timer )
{
timer_mgr->Cancel(winfo->rotation_timer);
winfo->rotation_timer = 0;
}
double rotation_interval = BifConst::Log::default_rotation_interval;
if ( rotation_interval )
{
// When this is called for the first time, network_time can still be
// zero. If so, we set a timer which fires immediately but doesn't
// rotate when it expires.
if ( ! network_time )
winfo->rotation_timer = new RotationTimer(1, winfo, false);
else
{
if ( ! winfo->open_time )
winfo->open_time = network_time;
const char* base_time = log_rotate_base_time ?
log_rotate_base_time->AsString()->CheckString() : 0;
double delta_t =
calc_next_rotate(rotation_interval, base_time);
winfo->rotation_timer = new RotationTimer(network_time + delta_t, winfo, true);
}
timer_mgr->Add(winfo->rotation_timer);
DBG_LOG(DBG_LOGGING, "Scheduled rotation timer for %s to %.6f", winfo->writer->Path().c_str(), winfo->rotation_timer->Time());
}
}
void LogMgr::Rotate(WriterInfo* winfo)
{
DBG_LOG(DBG_LOGGING, "Rotating %s at %.6f", winfo->writer->Path().c_str(), network_time);
// Create the RotationInfo record.
RecordVal* info = new RecordVal(BifType::Record::Log::RotationInfo);
info->Assign(0, new StringVal(winfo->writer->Path().c_str()));
info->Assign(1, new Val(winfo->open_time, TYPE_TIME));
info->Assign(2, new Val(network_time, TYPE_TIME));
// Call the function building us the new path.
Func* rotation_path_func = internal_func("Log::default_rotation_path_func");
string rotation_postprocessor = BifConst::Log::default_rotation_postprocessor->AsString()->CheckString();
val_list vl(1);
vl.append(info);
Val* result = rotation_path_func->Call(&vl);
string new_path = result->AsString()->CheckString();
Unref(result);
winfo->writer->Rotate(new_path, rotation_postprocessor, winfo->open_time, network_time, terminating);
}