zeek/src/logging/Manager.cc

1319 lines
30 KiB
C++

// See the file "COPYING" in the main distribution directory for copyright.
#include <algorithm>
#include "../Event.h"
#include "../EventHandler.h"
#include "../NetVar.h"
#include "../Net.h"
#include "threading/Manager.h"
#include "threading/SerialTypes.h"
#include "Manager.h"
#include "WriterFrontend.h"
#include "WriterBackend.h"
#include "writers/Ascii.h"
#include "writers/None.h"
#ifdef USE_ELASTICSEARCH
#include "writers/ElasticSearch.h"
#endif
#ifdef USE_DATASERIES
#include "writers/DataSeries.h"
#endif
using namespace logging;
// Structure describing a log writer type.
struct WriterDefinition {
bro_int_t type; // The type.
const char *name; // Descriptive name for error messages.
bool (*init)(); // An optional one-time initialization function.
WriterBackend* (*factory)(WriterFrontend* frontend); // A factory function creating instances.
};
// Static table defining all availabel log writers.
WriterDefinition log_writers[] = {
{ BifEnum::Log::WRITER_NONE, "None", 0, writer::None::Instantiate },
{ BifEnum::Log::WRITER_ASCII, "Ascii", 0, writer::Ascii::Instantiate },
#ifdef USE_ELASTICSEARCH
{ BifEnum::Log::WRITER_ELASTICSEARCH, "ElasticSearch", 0, writer::ElasticSearch::Instantiate },
#endif
#ifdef USE_DATASERIES
{ BifEnum::Log::WRITER_DATASERIES, "DataSeries", 0, writer::DataSeries::Instantiate },
#endif
// End marker, don't touch.
{ BifEnum::Log::WRITER_DEFAULT, "None", 0, (WriterBackend* (*)(WriterFrontend* frontend))0 }
};
struct Manager::Filter {
string name;
EnumVal* id;
Func* pred;
Func* path_func;
string path;
Val* path_val;
EnumVal* writer;
bool local;
bool remote;
double interval;
Func* postprocessor;
int num_fields;
threading::Field** fields;
// Vector indexed by field number. Each element is a list of record
// indices defining a path leading to the value across potential
// sub-records.
vector<list<int> > indices;
~Filter();
};
struct Manager::WriterInfo {
EnumVal* type;
double open_time;
Timer* rotation_timer;
double interval;
Func* postprocessor;
WriterFrontend* writer;
};
struct Manager::Stream {
EnumVal* id;
bool enabled;
string name;
RecordType* columns;
EventHandlerPtr event;
list<Filter*> filters;
typedef pair<int, string> WriterPathPair;
typedef map<WriterPathPair, WriterInfo*> WriterMap;
WriterMap writers; // Writers indexed by id/path pair.
~Stream();
};
Manager::Filter::~Filter()
{
for ( int i = 0; i < num_fields; ++i )
delete fields[i];
free(fields);
Unref(path_val);
}
Manager::Stream::~Stream()
{
Unref(columns);
for ( WriterMap::iterator i = writers.begin(); i != writers.end(); i++ )
{
WriterInfo* winfo = i->second;
if ( winfo->rotation_timer )
timer_mgr->Cancel(winfo->rotation_timer);
Unref(winfo->type);
delete winfo->writer;
delete winfo;
}
for ( list<Filter*>::iterator f = filters.begin(); f != filters.end(); ++f )
delete *f;
}
Manager::Manager()
{
rotations_pending = 0;
}
Manager::~Manager()
{
for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s )
delete *s;
}
list<string> Manager::SupportedFormats()
{
list<string> formats;
for ( WriterDefinition* ld = log_writers; ld->type != BifEnum::Log::WRITER_DEFAULT; ++ld )
formats.push_back(ld->name);
return formats;
}
WriterBackend* Manager::CreateBackend(WriterFrontend* frontend, bro_int_t type)
{
WriterDefinition* ld = log_writers;
while ( true )
{
if ( ld->type == BifEnum::Log::WRITER_DEFAULT )
{
reporter->Error("unknown writer type requested");
return 0;
}
if ( ld->type != type )
{
// Not the right one.
++ld;
continue;
}
// If the writer has an init function, call it.
if ( ld->init )
{
if ( (*ld->init)() )
// Clear the init function so that we won't
// call it again later.
ld->init = 0;
else
{
// Init failed, disable by deleting factory
// function.
ld->factory = 0;
reporter->Error("initialization of writer %s failed", ld->name);
return 0;
}
}
if ( ! ld->factory )
// Oops, we can't instantiate this guy.
return 0;
// All done.
break;
}
assert(ld->factory);
WriterBackend* backend = (*ld->factory)(frontend);
assert(backend);
frontend->ty_name = ld->name;
return backend;
}
Manager::Stream* Manager::FindStream(EnumVal* id)
{
unsigned int idx = id->AsEnum();
if ( idx >= streams.size() || ! streams[idx] )
return 0;
return streams[idx];
}
Manager::WriterInfo* Manager::FindWriter(WriterFrontend* writer)
{
for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s )
{
if ( ! *s )
continue;
for ( Stream::WriterMap::iterator i = (*s)->writers.begin(); i != (*s)->writers.end(); i++ )
{
WriterInfo* winfo = i->second;
if ( winfo->writer == writer )
return winfo;
}
}
return 0;
}
void Manager::RemoveDisabledWriters(Stream* stream)
{
list<Stream::WriterPathPair> disabled;
for ( Stream::WriterMap::iterator j = stream->writers.begin(); j != stream->writers.end(); j++ )
{
if ( j->second->writer->Disabled() )
{
j->second->writer->Stop();
delete j->second;
disabled.push_back(j->first);
}
}
for ( list<Stream::WriterPathPair>::iterator j = disabled.begin(); j != disabled.end(); j++ )
stream->writers.erase(*j);
}
bool Manager::CreateStream(EnumVal* id, RecordVal* sval)
{
RecordType* rtype = sval->Type()->AsRecordType();
if ( ! same_type(rtype, BifType::Record::Log::Stream, 0) )
{
reporter->Error("sval argument not of right type");
return false;
}
RecordType* columns = sval->Lookup(rtype->FieldOffset("columns"))
->AsType()->AsTypeType()->Type()->AsRecordType();
bool log_attr_present = false;
for ( int i = 0; i < columns->NumFields(); i++ )
{
if ( ! (columns->FieldDecl(i)->FindAttr(ATTR_LOG)) )
continue;
if ( ! threading::Value::IsCompatibleType(columns->FieldType(i)) )
{
reporter->Error("type of field '%s' is not support for logging output",
columns->FieldName(i));
return false;
}
log_attr_present = true;
}
if ( ! log_attr_present )
{
reporter->Error("logged record type does not have any &log attributes");
return false;
}
Val* event_val = sval->Lookup(rtype->FieldOffset("ev"));
Func* event = event_val ? event_val->AsFunc() : 0;
if ( event )
{
// Make sure the event is prototyped as expected.
FuncType* etype = event->FType()->AsFuncType();
if ( ! etype->IsEvent() )
{
reporter->Error("stream event is a function, not an event");
return false;
}
const type_list* args = etype->ArgTypes()->Types();
if ( args->length() != 1 )
{
reporter->Error("stream event must take a single argument");
return false;
}
if ( ! same_type((*args)[0], columns) )
{
reporter->Error("stream event's argument type does not match column record type");
return new Val(0, TYPE_BOOL);
}
}
// Make sure the vector has an entries for all streams up to the one
// given.
unsigned int idx = id->AsEnum();
while ( idx >= streams.size() )
streams.push_back(0);
if ( streams[idx] )
// We already know this one, delete the previous definition.
delete streams[idx];
// Create new stream.
streams[idx] = new Stream;
streams[idx]->id = id->Ref()->AsEnumVal();
streams[idx]->enabled = true;
streams[idx]->name = id->Type()->AsEnumType()->Lookup(idx);
streams[idx]->event = event ? event_registry->Lookup(event->GetID()->Name()) : 0;
streams[idx]->columns = columns->Ref()->AsRecordType();
DBG_LOG(DBG_LOGGING, "Created new logging stream '%s', raising event %s",
streams[idx]->name.c_str(), event ? streams[idx]->event->Name() : "<none>");
return true;
}
bool Manager::EnableStream(EnumVal* id)
{
Stream* stream = FindStream(id);
if ( ! stream )
return false;
if ( stream->enabled )
return true;
stream->enabled = true;
DBG_LOG(DBG_LOGGING, "Reenabled logging stream '%s'", stream->name.c_str());
return true;
}
bool Manager::DisableStream(EnumVal* id)
{
Stream* stream = FindStream(id);
if ( ! stream )
return false;
if ( ! stream->enabled )
return true;
stream->enabled = false;
DBG_LOG(DBG_LOGGING, "Disabled logging stream '%s'", stream->name.c_str());
return true;
}
// Helper for recursive record field unrolling.
bool Manager::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt,
TableVal* include, TableVal* exclude, string path, list<int> indices)
{
for ( int i = 0; i < rt->NumFields(); ++i )
{
BroType* t = rt->FieldType(i);
// Ignore if &log not specified.
if ( ! rt->FieldDecl(i)->FindAttr(ATTR_LOG) )
continue;
list<int> new_indices = indices;
new_indices.push_back(i);
// Build path name.
string new_path;
if ( ! path.size() )
new_path = rt->FieldName(i);
else
new_path = path + "." + rt->FieldName(i);
if ( t->InternalType() == TYPE_INTERNAL_OTHER )
{
if ( t->Tag() == TYPE_RECORD )
{
// Recurse.
if ( ! TraverseRecord(stream, filter,
t->AsRecordType(),
include,
exclude,
new_path,
new_indices) )
return false;
continue;
}
else if ( t->Tag() == TYPE_TABLE &&
t->AsTableType()->IsSet() )
{
// That's ok, we handle it below.
}
else if ( t->Tag() == TYPE_VECTOR )
{
// That's ok, we handle it below.
}
else if ( t->Tag() == TYPE_FILE )
{
// That's ok, we handle it below.
}
else if ( t->Tag() == TYPE_FUNC )
{
// That's ok, we handle it below.
}
else
{
reporter->Error("unsupported field type for log column");
return false;
}
}
// If include fields are specified, only include if explicitly listed.
if ( include )
{
StringVal* new_path_val = new StringVal(new_path.c_str());
bool result = include->Lookup(new_path_val);
Unref(new_path_val);
if ( ! result )
continue;
}
// If exclude fields are specified, do not only include if listed.
if ( exclude )
{
StringVal* new_path_val = new StringVal(new_path.c_str());
bool result = exclude->Lookup(new_path_val);
Unref(new_path_val);
if ( result )
continue;
}
// Alright, we want this field.
filter->indices.push_back(new_indices);
filter->fields = (threading::Field**)
realloc(filter->fields,
sizeof(threading::Field) * ++filter->num_fields);
if ( ! filter->fields )
{
reporter->Error("out of memory in add_filter");
return false;
}
threading::Field* field = new threading::Field();
field->name = new_path;
field->type = t->Tag();
field->optional = rt->FieldDecl(i)->FindAttr(ATTR_OPTIONAL);
if ( field->type == TYPE_TABLE )
field->subtype = t->AsSetType()->Indices()->PureType()->Tag();
else if ( field->type == TYPE_VECTOR )
field->subtype = t->AsVectorType()->YieldType()->Tag();
filter->fields[filter->num_fields - 1] = field;
}
return true;
}
bool Manager::AddFilter(EnumVal* id, RecordVal* fval)
{
RecordType* rtype = fval->Type()->AsRecordType();
if ( ! same_type(rtype, BifType::Record::Log::Filter, 0) )
{
reporter->Error("filter argument not of right type");
return false;
}
Stream* stream = FindStream(id);
if ( ! stream )
return false;
// Find the right writer type.
int idx = rtype->FieldOffset("writer");
EnumVal* writer = fval->LookupWithDefault(idx)->AsEnumVal();
// Create a new Filter instance.
Val* name = fval->LookupWithDefault(rtype->FieldOffset("name"));
Val* pred = fval->LookupWithDefault(rtype->FieldOffset("pred"));
Val* path_func = fval->LookupWithDefault(rtype->FieldOffset("path_func"));
Val* log_local = fval->LookupWithDefault(rtype->FieldOffset("log_local"));
Val* log_remote = fval->LookupWithDefault(rtype->FieldOffset("log_remote"));
Val* interv = fval->LookupWithDefault(rtype->FieldOffset("interv"));
Val* postprocessor = fval->LookupWithDefault(rtype->FieldOffset("postprocessor"));
Filter* filter = new Filter;
filter->name = name->AsString()->CheckString();
filter->id = id->Ref()->AsEnumVal();
filter->pred = pred ? pred->AsFunc() : 0;
filter->path_func = path_func ? path_func->AsFunc() : 0;
filter->writer = writer->Ref()->AsEnumVal();
filter->local = log_local->AsBool();
filter->remote = log_remote->AsBool();
filter->interval = interv->AsInterval();
filter->postprocessor = postprocessor ? postprocessor->AsFunc() : 0;
Unref(name);
Unref(pred);
Unref(path_func);
Unref(log_local);
Unref(log_remote);
Unref(interv);
Unref(postprocessor);
// Build the list of fields that the filter wants included, including
// potentially rolling out fields.
Val* include = fval->Lookup(rtype->FieldOffset("include"));
Val* exclude = fval->Lookup(rtype->FieldOffset("exclude"));
filter->num_fields = 0;
filter->fields = 0;
if ( ! TraverseRecord(stream, filter, stream->columns,
include ? include->AsTableVal() : 0,
exclude ? exclude->AsTableVal() : 0,
"", list<int>()) )
return false;
// Get the path for the filter.
Val* path_val = fval->Lookup(rtype->FieldOffset("path"));
if ( path_val )
{
filter->path = path_val->AsString()->CheckString();
filter->path_val = path_val->Ref();
}
else
{
// If no path is given, it's derived based upon the value returned by
// the first call to the filter's path_func (during first write).
filter->path_val = 0;
}
// Remove any filter with the same name we might already have.
RemoveFilter(id, filter->name);
// Add the new one.
stream->filters.push_back(filter);
#ifdef DEBUG
ODesc desc;
writer->Describe(&desc);
DBG_LOG(DBG_LOGGING, "Created new filter '%s' for stream '%s'",
filter->name.c_str(), stream->name.c_str());
DBG_LOG(DBG_LOGGING, " writer : %s", desc.Description());
DBG_LOG(DBG_LOGGING, " path : %s", filter->path.c_str());
DBG_LOG(DBG_LOGGING, " path_func : %s", (filter->path_func ? "set" : "not set"));
DBG_LOG(DBG_LOGGING, " pred : %s", (filter->pred ? "set" : "not set"));
for ( int i = 0; i < filter->num_fields; i++ )
{
threading::Field* field = filter->fields[i];
DBG_LOG(DBG_LOGGING, " field %10s: %s",
field->name.c_str(), type_name(field->type));
}
#endif
return true;
}
bool Manager::RemoveFilter(EnumVal* id, StringVal* name)
{
return RemoveFilter(id, name->AsString()->CheckString());
}
bool Manager::RemoveFilter(EnumVal* id, string name)
{
Stream* stream = FindStream(id);
if ( ! stream )
return false;
for ( list<Filter*>::iterator i = stream->filters.begin();
i != stream->filters.end(); ++i )
{
if ( (*i)->name == name )
{
Filter* filter = *i;
stream->filters.erase(i);
DBG_LOG(DBG_LOGGING, "Removed filter '%s' from stream '%s'",
filter->name.c_str(), stream->name.c_str());
delete filter;
return true;
}
}
// If we don't find the filter, we don't treat that as an error.
DBG_LOG(DBG_LOGGING, "No filter '%s' for removing from stream '%s'",
name.c_str(), stream->name.c_str());
return true;
}
bool Manager::Write(EnumVal* id, RecordVal* columns)
{
bool error = false;
Stream* stream = FindStream(id);
if ( ! stream )
return false;
if ( ! stream->enabled )
return true;
columns = columns->CoerceTo(stream->columns);
if ( ! columns )
{
reporter->Error("incompatible log record type");
return false;
}
// Raise the log event.
if ( stream->event )
{
val_list* vl = new val_list(1);
vl->append(columns->Ref());
mgr.QueueEvent(stream->event, vl, SOURCE_LOCAL);
}
// Send to each of our filters.
for ( list<Filter*>::iterator i = stream->filters.begin();
i != stream->filters.end(); ++i )
{
Filter* filter = *i;
string path = filter->path;
if ( filter->pred )
{
// See whether the predicates indicates that we want
// to log this record.
val_list vl(1);
vl.append(columns->Ref());
int result = 1;
try
{
Val* v = filter->pred->Call(&vl);
result = v->AsBool();
Unref(v);
}
catch ( InterpreterException& e )
{ /* Already reported. */ }
if ( ! result )
continue;
}
if ( filter->path_func )
{
val_list vl(3);
vl.append(id->Ref());
Val* path_arg;
if ( filter->path_val )
path_arg = filter->path_val->Ref();
else
path_arg = new StringVal("");
vl.append(path_arg);
Val* rec_arg;
BroType* rt = filter->path_func->FType()->Args()->FieldType("rec");
if ( rt->Tag() == TYPE_RECORD )
rec_arg = columns->CoerceTo(rt->AsRecordType(), true);
else
// Can be TYPE_ANY here.
rec_arg = columns->Ref();
vl.append(rec_arg);
Val* v = 0;
try
{
v = filter->path_func->Call(&vl);
}
catch ( InterpreterException& e )
{
return false;
}
if ( ! v->Type()->Tag() == TYPE_STRING )
{
reporter->Error("path_func did not return string");
Unref(v);
return false;
}
if ( ! filter->path_val )
{
filter->path = v->AsString()->CheckString();
filter->path_val = v->Ref();
}
path = v->AsString()->CheckString();
Unref(v);
#ifdef DEBUG
DBG_LOG(DBG_LOGGING, "Path function for filter '%s' on stream '%s' return '%s'",
filter->name.c_str(), stream->name.c_str(), path.c_str());
#endif
}
// See if we already have a writer for this path.
Stream::WriterMap::iterator w =
stream->writers.find(Stream::WriterPathPair(filter->writer->AsEnum(), path));
WriterFrontend* writer = 0;
if ( w != stream->writers.end() )
// We know this writer already.
writer = w->second->writer;
else
{
// No, need to create one.
// Copy the fields for WriterFrontend::Init() as it
// will take ownership.
threading::Field** arg_fields = new threading::Field*[filter->num_fields];
for ( int j = 0; j < filter->num_fields; ++j )
arg_fields[j] = new threading::Field(*filter->fields[j]);
writer = CreateWriter(stream->id, filter->writer,
path, filter->num_fields,
arg_fields, filter->local, filter->remote);
if ( ! writer )
{
Unref(columns);
return false;
}
}
// Alright, can do the write now.
threading::Value** vals = RecordToFilterVals(stream, filter, columns);
// Write takes ownership of vals.
assert(writer);
writer->Write(filter->num_fields, vals);
#ifdef DEBUG
DBG_LOG(DBG_LOGGING, "Wrote record to filter '%s' on stream '%s'",
filter->name.c_str(), stream->name.c_str());
#endif
}
Unref(columns);
if ( error )
RemoveDisabledWriters(stream);
return true;
}
threading::Value* Manager::ValToLogVal(Val* val, BroType* ty)
{
if ( ! ty )
ty = val->Type();
if ( ! val )
return new threading::Value(ty->Tag(), false);
threading::Value* lval = new threading::Value(ty->Tag());
switch ( lval->type ) {
case TYPE_BOOL:
case TYPE_INT:
lval->val.int_val = val->InternalInt();
break;
case TYPE_ENUM:
{
const char* s =
val->Type()->AsEnumType()->Lookup(val->InternalInt());
if ( s )
lval->val.string_val = new string(s);
else
{
val->Type()->Error("enum type does not contain value", val);
lval->val.string_val = new string();
}
break;
}
case TYPE_COUNT:
case TYPE_COUNTER:
lval->val.uint_val = val->InternalUnsigned();
break;
case TYPE_PORT:
lval->val.port_val.port = val->AsPortVal()->Port();
lval->val.port_val.proto = val->AsPortVal()->PortType();
break;
case TYPE_SUBNET:
val->AsSubNet().ConvertToThreadingValue(&lval->val.subnet_val);
break;
case TYPE_ADDR:
val->AsAddr().ConvertToThreadingValue(&lval->val.addr_val);
break;
case TYPE_DOUBLE:
case TYPE_TIME:
case TYPE_INTERVAL:
lval->val.double_val = val->InternalDouble();
break;
case TYPE_STRING:
{
const BroString* s = val->AsString();
lval->val.string_val =
new string((const char*) s->Bytes(), s->Len());
break;
}
case TYPE_FILE:
{
const BroFile* f = val->AsFile();
lval->val.string_val = new string(f->Name());
break;
}
case TYPE_FUNC:
{
ODesc d;
const Func* f = val->AsFunc();
f->Describe(&d);
lval->val.string_val = new string(d.Description());
break;
}
case TYPE_TABLE:
{
ListVal* set = val->AsTableVal()->ConvertToPureList();
if ( ! set )
// ConvertToPureList has reported an internal warning
// already. Just keep going by making something up.
set = new ListVal(TYPE_INT);
lval->val.set_val.size = set->Length();
lval->val.set_val.vals = new threading::Value* [lval->val.set_val.size];
for ( int i = 0; i < lval->val.set_val.size; i++ )
lval->val.set_val.vals[i] = ValToLogVal(set->Index(i));
Unref(set);
break;
}
case TYPE_VECTOR:
{
VectorVal* vec = val->AsVectorVal();
lval->val.vector_val.size = vec->Size();
lval->val.vector_val.vals =
new threading::Value* [lval->val.vector_val.size];
for ( int i = 0; i < lval->val.vector_val.size; i++ )
{
lval->val.vector_val.vals[i] =
ValToLogVal(vec->Lookup(i),
vec->Type()->YieldType());
}
break;
}
default:
reporter->InternalError("unsupported type for log_write");
}
return lval;
}
threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
RecordVal* columns)
{
threading::Value** vals = new threading::Value*[filter->num_fields];
for ( int i = 0; i < filter->num_fields; ++i )
{
TypeTag type = TYPE_ERROR;
Val* val = columns;
// For each field, first find the right value, which can
// potentially be nested inside other records.
list<int>& indices = filter->indices[i];
for ( list<int>::iterator j = indices.begin(); j != indices.end(); ++j )
{
type = val->Type()->AsRecordType()->FieldType(*j)->Tag();
val = val->AsRecordVal()->Lookup(*j);
if ( ! val )
{
// Value, or any of its parents, is not set.
vals[i] = new threading::Value(filter->fields[i]->type, false);
break;
}
}
if ( val )
vals[i] = ValToLogVal(val);
}
return vals;
}
WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
int num_fields, const threading::Field* const* fields, bool local, bool remote)
{
Stream* stream = FindStream(id);
if ( ! stream )
// Don't know this stream.
return false;
Stream::WriterMap::iterator w =
stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), path));
if ( w != stream->writers.end() )
// If we already have a writer for this. That's fine, we just
// return it.
return w->second->writer;
WriterFrontend* writer_obj = new WriterFrontend(id, writer, local, remote);
assert(writer_obj);
writer_obj->Init(path, num_fields, fields);
WriterInfo* winfo = new WriterInfo;
winfo->type = writer->Ref()->AsEnumVal();
winfo->writer = writer_obj;
winfo->open_time = network_time;
winfo->rotation_timer = 0;
winfo->interval = 0;
winfo->postprocessor = 0;
// Search for a corresponding filter for the writer/path pair and use its
// rotation settings. If no matching filter is found, fall back on
// looking up the logging framework's default rotation interval.
bool found_filter_match = false;
list<Filter*>::const_iterator it;
for ( it = stream->filters.begin(); it != stream->filters.end(); ++it )
{
Filter* f = *it;
if ( f->writer->AsEnum() == writer->AsEnum() &&
f->path == winfo->writer->Path() )
{
found_filter_match = true;
winfo->interval = f->interval;
winfo->postprocessor = f->postprocessor;
break;
}
}
if ( ! found_filter_match )
{
ID* id = global_scope()->Lookup("Log::default_rotation_interval");
assert(id);
winfo->interval = id->ID_Val()->AsInterval();
}
InstallRotationTimer(winfo);
stream->writers.insert(
Stream::WriterMap::value_type(Stream::WriterPathPair(writer->AsEnum(), path),
winfo));
return writer_obj;
}
void Manager::DeleteVals(int num_fields, threading::Value** vals)
{
// Note this code is duplicated in WriterBackend::DeleteVals().
for ( int i = 0; i < num_fields; i++ )
delete vals[i];
delete [] vals;
}
bool Manager::Write(EnumVal* id, EnumVal* writer, string path, int num_fields,
threading::Value** vals)
{
Stream* stream = FindStream(id);
if ( ! stream )
{
// Don't know this stream.
#ifdef DEBUG
ODesc desc;
id->Describe(&desc);
DBG_LOG(DBG_LOGGING, "unknown stream %s in Manager::Write()",
desc.Description());
#endif
DeleteVals(num_fields, vals);
return false;
}
if ( ! stream->enabled )
{
DeleteVals(num_fields, vals);
return true;
}
Stream::WriterMap::iterator w =
stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), path));
if ( w == stream->writers.end() )
{
// Don't know this writer.
#ifdef DEBUG
ODesc desc;
id->Describe(&desc);
DBG_LOG(DBG_LOGGING, "unknown writer %s in Manager::Write()",
desc.Description());
#endif
DeleteVals(num_fields, vals);
return false;
}
w->second->writer->Write(num_fields, vals);
DBG_LOG(DBG_LOGGING,
"Wrote pre-filtered record to path '%s' on stream '%s'",
path.c_str(), stream->name.c_str());
return true;
}
void Manager::SendAllWritersTo(RemoteSerializer::PeerID peer)
{
for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s )
{
Stream* stream = (*s);
if ( ! stream )
continue;
for ( Stream::WriterMap::iterator i = stream->writers.begin();
i != stream->writers.end(); i++ )
{
WriterFrontend* writer = i->second->writer;
EnumVal writer_val(i->first.first, BifType::Enum::Log::Writer);
remote_serializer->SendLogCreateWriter(peer, (*s)->id,
&writer_val,
i->first.second,
writer->NumFields(),
writer->Fields());
}
}
}
bool Manager::SetBuf(EnumVal* id, bool enabled)
{
Stream* stream = FindStream(id);
if ( ! stream )
return false;
for ( Stream::WriterMap::iterator i = stream->writers.begin();
i != stream->writers.end(); i++ )
i->second->writer->SetBuf(enabled);
RemoveDisabledWriters(stream);
return true;
}
bool Manager::Flush(EnumVal* id)
{
Stream* stream = FindStream(id);
if ( ! stream )
return false;
if ( ! stream->enabled )
return true;
for ( Stream::WriterMap::iterator i = stream->writers.begin();
i != stream->writers.end(); i++ )
i->second->writer->Flush();
RemoveDisabledWriters(stream);
return true;
}
void Manager::Terminate()
{
// Make sure we process all the pending rotations.
while ( rotations_pending )
{
thread_mgr->ForceProcessing(); // A blatant layering violation ...
usleep(1000);
}
for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s )
{
if ( ! *s )
continue;
Flush((*s)->id);
}
}
// Timer which on dispatching rotates the filter.
class RotationTimer : public Timer {
public:
RotationTimer(double t, Manager::WriterInfo* arg_winfo, bool arg_rotate)
: Timer(t, TIMER_ROTATE)
{
winfo = arg_winfo;
rotate = arg_rotate;
}
~RotationTimer();
void Dispatch(double t, int is_expire);
protected:
Manager::WriterInfo* winfo;
bool rotate;
};
RotationTimer::~RotationTimer()
{
if ( winfo->rotation_timer == this )
winfo->rotation_timer = 0;
}
void RotationTimer::Dispatch(double t, int is_expire)
{
winfo->rotation_timer = 0;
if ( rotate )
log_mgr->Rotate(winfo);
if ( ! is_expire )
{
winfo->open_time = network_time;
log_mgr->InstallRotationTimer(winfo);
}
}
void Manager::InstallRotationTimer(WriterInfo* winfo)
{
if ( terminating )
return;
if ( winfo->rotation_timer )
{
timer_mgr->Cancel(winfo->rotation_timer);
winfo->rotation_timer = 0;
}
double rotation_interval = winfo->interval;
if ( rotation_interval )
{
// When this is called for the first time, network_time can still be
// zero. If so, we set a timer which fires immediately but doesn't
// rotate when it expires.
if ( ! network_time )
winfo->rotation_timer = new RotationTimer(1, winfo, false);
else
{
if ( ! winfo->open_time )
winfo->open_time = network_time;
const char* base_time = log_rotate_base_time ?
log_rotate_base_time->AsString()->CheckString() : 0;
double delta_t =
calc_next_rotate(rotation_interval, base_time);
winfo->rotation_timer =
new RotationTimer(network_time + delta_t, winfo, true);
}
timer_mgr->Add(winfo->rotation_timer);
DBG_LOG(DBG_LOGGING, "Scheduled rotation timer for %s to %.6f",
winfo->writer->Path().c_str(), winfo->rotation_timer->Time());
}
}
void Manager::Rotate(WriterInfo* winfo)
{
DBG_LOG(DBG_LOGGING, "Rotating %s at %.6f",
winfo->writer->Path().c_str(), network_time);
// Build a temporary path for the writer to move the file to.
struct tm tm;
char buf[128];
const char* const date_fmt = "%y-%m-%d_%H.%M.%S";
time_t teatime = (time_t)winfo->open_time;
localtime_r(&teatime, &tm);
strftime(buf, sizeof(buf), date_fmt, &tm);
string tmp = string(fmt("%s-%s", winfo->writer->Path().c_str(), buf));
// Trigger the rotation.
winfo->writer->Rotate(tmp, winfo->open_time, network_time, terminating);
++rotations_pending;
}
bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string old_name,
double open, double close, bool terminating)
{
--rotations_pending;
if ( ! writer )
// Writer didn't produce local output.
return true;
DBG_LOG(DBG_LOGGING, "Finished rotating %s at %.6f, new name %s",
writer->Path().c_str(), network_time, new_name.c_str());
WriterInfo* winfo = FindWriter(writer);
if ( ! winfo )
return true;
// Create the RotationInfo record.
RecordVal* info = new RecordVal(BifType::Record::Log::RotationInfo);
info->Assign(0, winfo->type->Ref());
info->Assign(1, new StringVal(new_name.c_str()));
info->Assign(2, new StringVal(winfo->writer->Path().c_str()));
info->Assign(3, new Val(open, TYPE_TIME));
info->Assign(4, new Val(close, TYPE_TIME));
info->Assign(5, new Val(terminating, TYPE_BOOL));
Func* func = winfo->postprocessor;
if ( ! func )
{
ID* id = global_scope()->Lookup("Log::__default_rotation_postprocessor");
assert(id);
func = id->ID_Val()->AsFunc();
}
assert(func);
// Call the postprocessor function.
val_list vl(1);
vl.append(info);
int result = 0;
try
{
Val* v = func->Call(&vl);
result = v->AsBool();
Unref(v);
}
catch ( InterpreterException& e )
{ /* Already reported. */ }
return result;
}