Merge remote-tracking branch 'origin/master' into topic/seth/elasticsearch

Conflicts:
	aux/binpac
	aux/bro-aux
	aux/broccoli
	aux/broctl
	scripts/base/frameworks/logging/__load__.bro
	src/logging.bif
This commit is contained in:
Seth Hall 2012-07-06 12:01:16 -04:00
commit 601d1cf37e
217 changed files with 7146 additions and 2574 deletions

View file

@ -60,6 +60,7 @@ struct Manager::Filter {
string path;
Val* path_val;
EnumVal* writer;
TableVal* config;
bool local;
bool remote;
double interval;
@ -83,6 +84,7 @@ struct Manager::WriterInfo {
double interval;
Func* postprocessor;
WriterFrontend* writer;
WriterBackend::WriterInfo info;
};
struct Manager::Stream {
@ -200,10 +202,10 @@ WriterBackend* Manager::CreateBackend(WriterFrontend* frontend, bro_int_t type)
assert(ld->factory);
frontend->ty_name = ld->name;
WriterBackend* backend = (*ld->factory)(frontend);
assert(backend);
frontend->ty_name = ld->name;
return backend;
}
@ -527,6 +529,7 @@ bool Manager::AddFilter(EnumVal* id, RecordVal* fval)
Val* log_remote = fval->LookupWithDefault(rtype->FieldOffset("log_remote"));
Val* interv = fval->LookupWithDefault(rtype->FieldOffset("interv"));
Val* postprocessor = fval->LookupWithDefault(rtype->FieldOffset("postprocessor"));
Val* config = fval->LookupWithDefault(rtype->FieldOffset("config"));
Filter* filter = new Filter;
filter->name = name->AsString()->CheckString();
@ -538,6 +541,7 @@ bool Manager::AddFilter(EnumVal* id, RecordVal* fval)
filter->remote = log_remote->AsBool();
filter->interval = interv->AsInterval();
filter->postprocessor = postprocessor ? postprocessor->AsFunc() : 0;
filter->config = config->Ref()->AsTableVal();
Unref(name);
Unref(pred);
@ -546,6 +550,7 @@ bool Manager::AddFilter(EnumVal* id, RecordVal* fval)
Unref(log_remote);
Unref(interv);
Unref(postprocessor);
Unref(config);
// Build the list of fields that the filter wants included, including
// potentially rolling out fields.
@ -773,8 +778,27 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
for ( int j = 0; j < filter->num_fields; ++j )
arg_fields[j] = new threading::Field(*filter->fields[j]);
WriterBackend::WriterInfo info;
info.path = path;
HashKey* k;
IterCookie* c = filter->config->AsTable()->InitForIteration();
TableEntryVal* v;
while ( (v = filter->config->AsTable()->NextEntry(k, c)) )
{
ListVal* index = filter->config->RecoverIndex(k);
string key = index->Index(0)->AsString()->CheckString();
string value = v->Value()->AsString()->CheckString();
info.config.insert(std::make_pair(key, value));
Unref(index);
delete k;
}
// CreateWriter() will set the other fields in info.
writer = CreateWriter(stream->id, filter->writer,
path, filter->num_fields,
info, filter->num_fields,
arg_fields, filter->local, filter->remote);
if ( ! writer )
@ -782,7 +806,6 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
Unref(columns);
return false;
}
}
// Alright, can do the write now.
@ -962,7 +985,7 @@ threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
return vals;
}
WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, const WriterBackend::WriterInfo& info,
int num_fields, const threading::Field* const* fields, bool local, bool remote)
{
Stream* stream = FindStream(id);
@ -972,25 +995,21 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
return false;
Stream::WriterMap::iterator w =
stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), path));
stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), info.path));
if ( w != stream->writers.end() )
// If we already have a writer for this. That's fine, we just
// return it.
return w->second->writer;
WriterFrontend* writer_obj = new WriterFrontend(id, writer, local, remote);
assert(writer_obj);
writer_obj->Init(path, num_fields, fields);
WriterInfo* winfo = new WriterInfo;
winfo->type = writer->Ref()->AsEnumVal();
winfo->writer = writer_obj;
winfo->writer = 0;
winfo->open_time = network_time;
winfo->rotation_timer = 0;
winfo->interval = 0;
winfo->postprocessor = 0;
winfo->info = info;
// Search for a corresponding filter for the writer/path pair and use its
// rotation settings. If no matching filter is found, fall back on
@ -1002,7 +1021,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
{
Filter* f = *it;
if ( f->writer->AsEnum() == writer->AsEnum() &&
f->path == winfo->writer->Path() )
f->path == info.path )
{
found_filter_match = true;
winfo->interval = f->interval;
@ -1018,13 +1037,24 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
winfo->interval = id->ID_Val()->AsInterval();
}
InstallRotationTimer(winfo);
stream->writers.insert(
Stream::WriterMap::value_type(Stream::WriterPathPair(writer->AsEnum(), path),
Stream::WriterMap::value_type(Stream::WriterPathPair(writer->AsEnum(), info.path),
winfo));
return writer_obj;
// Still need to set the WriterInfo's rotation parameters, which we
// computed above.
const char* base_time = log_rotate_base_time ?
log_rotate_base_time->AsString()->CheckString() : 0;
winfo->info.rotation_interval = winfo->interval;
winfo->info.rotation_base = parse_rotate_base_time(base_time);
winfo->writer = new WriterFrontend(id, writer, local, remote);
winfo->writer->Init(winfo->info, num_fields, fields);
InstallRotationTimer(winfo);
return winfo->writer;
}
void Manager::DeleteVals(int num_fields, threading::Value** vals)
@ -1102,7 +1132,7 @@ void Manager::SendAllWritersTo(RemoteSerializer::PeerID peer)
EnumVal writer_val(i->first.first, BifType::Enum::Log::Writer);
remote_serializer->SendLogCreateWriter(peer, (*s)->id,
&writer_val,
i->first.second,
i->second->info,
writer->NumFields(),
writer->Fields());
}
@ -1227,8 +1257,9 @@ void Manager::InstallRotationTimer(WriterInfo* winfo)
const char* base_time = log_rotate_base_time ?
log_rotate_base_time->AsString()->CheckString() : 0;
double base = parse_rotate_base_time(base_time);
double delta_t =
calc_next_rotate(rotation_interval, base_time);
calc_next_rotate(network_time, rotation_interval, base);
winfo->rotation_timer =
new RotationTimer(network_time + delta_t, winfo, true);
@ -1237,14 +1268,14 @@ void Manager::InstallRotationTimer(WriterInfo* winfo)
timer_mgr->Add(winfo->rotation_timer);
DBG_LOG(DBG_LOGGING, "Scheduled rotation timer for %s to %.6f",
winfo->writer->Path().c_str(), winfo->rotation_timer->Time());
winfo->writer->Name().c_str(), winfo->rotation_timer->Time());
}
}
void Manager::Rotate(WriterInfo* winfo)
{
DBG_LOG(DBG_LOGGING, "Rotating %s at %.6f",
winfo->writer->Path().c_str(), network_time);
winfo->writer->Name().c_str(), network_time);
// Build a temporary path for the writer to move the file to.
struct tm tm;
@ -1255,7 +1286,7 @@ void Manager::Rotate(WriterInfo* winfo)
localtime_r(&teatime, &tm);
strftime(buf, sizeof(buf), date_fmt, &tm);
string tmp = string(fmt("%s-%s", winfo->writer->Path().c_str(), buf));
string tmp = string(fmt("%s-%s", winfo->writer->Info().path.c_str(), buf));
// Trigger the rotation.
winfo->writer->Rotate(tmp, winfo->open_time, network_time, terminating);
@ -1273,7 +1304,7 @@ bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string o
return true;
DBG_LOG(DBG_LOGGING, "Finished rotating %s at %.6f, new name %s",
writer->Path().c_str(), network_time, new_name.c_str());
writer->Name().c_str(), network_time, new_name.c_str());
WriterInfo* winfo = FindWriter(writer);
if ( ! winfo )
@ -1283,7 +1314,7 @@ bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string o
RecordVal* info = new RecordVal(BifType::Record::Log::RotationInfo);
info->Assign(0, winfo->type->Ref());
info->Assign(1, new StringVal(new_name.c_str()));
info->Assign(2, new StringVal(winfo->writer->Path().c_str()));
info->Assign(2, new StringVal(winfo->writer->Info().path.c_str()));
info->Assign(3, new Val(open, TYPE_TIME));
info->Assign(4, new Val(close, TYPE_TIME));
info->Assign(5, new Val(terminating, TYPE_BOOL));

View file

@ -9,13 +9,14 @@
#include "../EventHandler.h"
#include "../RemoteSerializer.h"
#include "WriterBackend.h"
class SerializationFormat;
class RemoteSerializer;
class RotationTimer;
namespace logging {
class WriterBackend;
class WriterFrontend;
class RotationFinishedMessage;
@ -162,7 +163,7 @@ protected:
//// Function also used by the RemoteSerializer.
// Takes ownership of fields.
WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, string path,
WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, const WriterBackend::WriterInfo& info,
int num_fields, const threading::Field* const* fields,
bool local, bool remote);

View file

@ -4,6 +4,7 @@
#include "bro_inet_ntop.h"
#include "threading/SerialTypes.h"
#include "Manager.h"
#include "WriterBackend.h"
#include "WriterFrontend.h"
@ -60,14 +61,61 @@ public:
using namespace logging;
bool WriterBackend::WriterInfo::Read(SerializationFormat* fmt)
{
int size;
if ( ! (fmt->Read(&path, "path") &&
fmt->Read(&rotation_base, "rotation_base") &&
fmt->Read(&rotation_interval, "rotation_interval") &&
fmt->Read(&size, "config_size")) )
return false;
config.clear();
while ( size )
{
string value;
string key;
if ( ! (fmt->Read(&value, "config-value") && fmt->Read(&value, "config-key")) )
return false;
config.insert(std::make_pair(value, key));
}
return true;
}
bool WriterBackend::WriterInfo::Write(SerializationFormat* fmt) const
{
int size = config.size();
if ( ! (fmt->Write(path, "path") &&
fmt->Write(rotation_base, "rotation_base") &&
fmt->Write(rotation_interval, "rotation_interval") &&
fmt->Write(size, "config_size")) )
return false;
for ( config_map::const_iterator i = config.begin(); i != config.end(); ++i )
{
if ( ! (fmt->Write(i->first, "config-value") && fmt->Write(i->second, "config-key")) )
return false;
}
return true;
}
WriterBackend::WriterBackend(WriterFrontend* arg_frontend) : MsgThread()
{
path = "<path not yet set>";
num_fields = 0;
fields = 0;
buffering = true;
frontend = arg_frontend;
info.path = "<path not yet set>";
SetName(frontend->Name());
}
@ -108,17 +156,17 @@ void WriterBackend::DisableFrontend()
SendOut(new DisableMessage(frontend));
}
bool WriterBackend::Init(string arg_path, int arg_num_fields, const Field* const* arg_fields)
bool WriterBackend::Init(const WriterInfo& arg_info, int arg_num_fields, const Field* const* arg_fields, const string& frontend_name)
{
path = arg_path;
info = arg_info;
num_fields = arg_num_fields;
fields = arg_fields;
string name = Fmt("%s/%s", path.c_str(), frontend->Name().c_str());
string name = Fmt("%s/%s", info.path.c_str(), frontend_name.c_str());
SetName(name);
if ( ! DoInit(arg_path, arg_num_fields, arg_fields) )
if ( ! DoInit(arg_info, arg_num_fields, arg_fields) )
{
DisableFrontend();
return false;

View file

@ -5,12 +5,14 @@
#ifndef LOGGING_WRITERBACKEND_H
#define LOGGING_WRITERBACKEND_H
#include "Manager.h"
#include "threading/MsgThread.h"
class RemoteSerializer;
namespace logging {
class WriterFrontend;
/**
* Base class for writer implementation. When the logging::Manager creates a
* new logging filter, it instantiates a WriterFrontend. That then in turn
@ -41,21 +43,59 @@ public:
*/
virtual ~WriterBackend();
/**
* A struct passing information to the writer at initialization time.
*/
struct WriterInfo
{
typedef std::map<string, string> config_map;
/**
* A string left to the interpretation of the writer
* implementation; it corresponds to the 'path' value configured
* on the script-level for the logging filter.
*/
string path;
/**
* The rotation interval as configured for this writer.
*/
double rotation_interval;
/**
* The parsed value of log_rotate_base_time in seconds.
*/
double rotation_base;
/**
* A map of key/value pairs corresponding to the relevant
* filter's "config" table.
*/
std::map<string, string> config;
private:
friend class ::RemoteSerializer;
// Note, these need to be adapted when changing the struct's
// fields. They serialize/deserialize the struct.
bool Read(SerializationFormat* fmt);
bool Write(SerializationFormat* fmt) const;
};
/**
* One-time initialization of the writer to define the logged fields.
*
* @param path A string left to the interpretation of the writer
* implementation; it corresponds to the value configured on the
* script-level for the logging filter.
*
* @param num_fields The number of log fields for the stream.
* @param info Meta information for the writer.
* @param num_fields
*
* @param fields An array of size \a num_fields with the log fields.
* The methods takes ownership of the array.
*
* @param frontend_name The name of the front-end writer implementation.
*
* @return False if an error occured.
*/
bool Init(string path, int num_fields, const threading::Field* const* fields);
bool Init(const WriterInfo& info, int num_fields, const threading::Field* const* fields, const string& frontend_name);
/**
* Writes one log entry.
@ -108,9 +148,9 @@ public:
void DisableFrontend();
/**
* Returns the log path as passed into the constructor.
* Returns the additional writer information passed into the constructor.
*/
const string Path() const { return path; }
const WriterInfo& Info() const { return info; }
/**
* Returns the number of log fields as passed into the constructor.
@ -185,7 +225,7 @@ protected:
* disabled and eventually deleted. When returning false, an
* implementation should also call Error() to indicate what happened.
*/
virtual bool DoInit(string path, int num_fields,
virtual bool DoInit(const WriterInfo& info, int num_fields,
const threading::Field* const* fields) = 0;
/**
@ -299,7 +339,7 @@ private:
// this class, it's running in a different thread!
WriterFrontend* frontend;
string path; // Log path.
WriterInfo info; // Meta information as passed to Init().
int num_fields; // Number of log fields.
const threading::Field* const* fields; // Log fields.
bool buffering; // True if buffering is enabled.

View file

@ -2,6 +2,7 @@
#include "Net.h"
#include "threading/SerialTypes.h"
#include "Manager.h"
#include "WriterFrontend.h"
#include "WriterBackend.h"
@ -15,16 +16,18 @@ namespace logging {
class InitMessage : public threading::InputMessage<WriterBackend>
{
public:
InitMessage(WriterBackend* backend, const string path, const int num_fields, const Field* const* fields)
InitMessage(WriterBackend* backend, const WriterBackend::WriterInfo& info, const int num_fields, const Field* const* fields, const string& frontend_name)
: threading::InputMessage<WriterBackend>("Init", backend),
path(path), num_fields(num_fields), fields(fields) { }
info(info), num_fields(num_fields), fields(fields),
frontend_name(frontend_name) { }
virtual bool Process() { return Object()->Init(path, num_fields, fields); }
virtual bool Process() { return Object()->Init(info, num_fields, fields, frontend_name); }
private:
const string path;
WriterBackend::WriterInfo info;
const int num_fields;
const Field * const* fields;
const string frontend_name;
};
class RotateMessage : public threading::InputMessage<WriterBackend>
@ -134,10 +137,10 @@ WriterFrontend::~WriterFrontend()
string WriterFrontend::Name() const
{
if ( path.size() )
if ( ! info.path.size() )
return ty_name;
return ty_name + "/" + path;
return ty_name + "/" + info.path;
}
void WriterFrontend::Stop()
@ -149,7 +152,7 @@ void WriterFrontend::Stop()
backend->Stop();
}
void WriterFrontend::Init(string arg_path, int arg_num_fields, const Field* const * arg_fields)
void WriterFrontend::Init(const WriterBackend::WriterInfo& arg_info, int arg_num_fields, const Field* const * arg_fields)
{
if ( disabled )
return;
@ -157,19 +160,19 @@ void WriterFrontend::Init(string arg_path, int arg_num_fields, const Field* cons
if ( initialized )
reporter->InternalError("writer initialize twice");
path = arg_path;
info = arg_info;
num_fields = arg_num_fields;
fields = arg_fields;
initialized = true;
if ( backend )
backend->SendIn(new InitMessage(backend, arg_path, arg_num_fields, arg_fields));
backend->SendIn(new InitMessage(backend, arg_info, arg_num_fields, arg_fields, Name()));
if ( remote )
remote_serializer->SendLogCreateWriter(stream,
writer,
arg_path,
arg_info,
arg_num_fields,
arg_fields);
@ -183,7 +186,7 @@ void WriterFrontend::Write(int num_fields, Value** vals)
if ( remote )
remote_serializer->SendLogWrite(stream,
writer,
path,
info.path,
num_fields,
vals);

View file

@ -3,13 +3,13 @@
#ifndef LOGGING_WRITERFRONTEND_H
#define LOGGING_WRITERFRONTEND_H
#include "Manager.h"
#include "WriterBackend.h"
#include "threading/MsgThread.h"
namespace logging {
class WriterBackend;
class Manager;
/**
* Bridge class between the logging::Manager and backend writer threads. The
@ -68,7 +68,7 @@ public:
*
* This method must only be called from the main thread.
*/
void Init(string path, int num_fields, const threading::Field* const* fields);
void Init(const WriterBackend::WriterInfo& info, int num_fields, const threading::Field* const* fields);
/**
* Write out a record.
@ -169,9 +169,9 @@ public:
bool Disabled() { return disabled; }
/**
* Returns the log path as passed into the constructor.
* Returns the additional writer information as passed into the constructor.
*/
const string Path() const { return path; }
const WriterBackend::WriterInfo& Info() const { return info; }
/**
* Returns the number of log fields as passed into the constructor.
@ -207,7 +207,7 @@ protected:
bool remote; // True if loggin remotely.
string ty_name; // Name of the backend type. Set by the manager.
string path; // The log path.
WriterBackend::WriterInfo info; // The writer information.
int num_fields; // The number of log fields.
const threading::Field* const* fields; // The log fields.

View file

@ -69,8 +69,10 @@ bool Ascii::WriteHeaderField(const string& key, const string& val)
return (fwrite(str.c_str(), str.length(), 1, file) == 1);
}
bool Ascii::DoInit(string path, int num_fields, const Field* const * fields)
bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const * fields)
{
string path = info.path;
if ( output_to_stdout )
path = "/dev/stdout";
@ -290,7 +292,7 @@ bool Ascii::DoWrite(int num_fields, const Field* const * fields,
Value** vals)
{
if ( ! file )
DoInit(Path(), NumFields(), Fields());
DoInit(Info(), NumFields(), Fields());
desc.Clear();
@ -320,7 +322,7 @@ bool Ascii::DoWrite(int num_fields, const Field* const * fields,
bool Ascii::DoRotate(string rotated_path, double open, double close, bool terminating)
{
// Don't rotate special files or if there's not one currently open.
if ( ! file || IsSpecial(Path()) )
if ( ! file || IsSpecial(Info().path) )
return true;
fclose(file);

View file

@ -19,7 +19,7 @@ public:
static string LogExt();
protected:
virtual bool DoInit(string path, int num_fields,
virtual bool DoInit(const WriterInfo& info, int num_fields,
const threading::Field* const* fields);
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
threading::Value** vals);

View file

@ -263,7 +263,7 @@ bool DataSeries::OpenLog(string path)
return true;
}
bool DataSeries::DoInit(string path, int num_fields, const threading::Field* const * fields)
bool DataSeries::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const * fields)
{
// We first construct an XML schema thing (and, if ds_dump_schema is
// set, dump it to path + ".ds.xml"). Assuming that goes well, we
@ -298,11 +298,11 @@ bool DataSeries::DoInit(string path, int num_fields, const threading::Field* con
schema_list.push_back(val);
}
string schema = BuildDSSchemaFromFieldTypes(schema_list, path);
string schema = BuildDSSchemaFromFieldTypes(schema_list, info.path);
if( ds_dump_schema )
{
FILE* pFile = fopen ( string(path + ".ds.xml").c_str() , "wb" );
FILE* pFile = fopen ( string(info.path + ".ds.xml").c_str() , "wb" );
if( pFile )
{
@ -340,7 +340,7 @@ bool DataSeries::DoInit(string path, int num_fields, const threading::Field* con
log_type = log_types.registerTypePtr(schema);
log_series.setType(log_type);
return OpenLog(path);
return OpenLog(info.path);
}
bool DataSeries::DoFlush()
@ -401,7 +401,7 @@ bool DataSeries::DoRotate(string rotated_path, double open, double close, bool t
// size will be (much) larger.
CloseLog();
string dsname = Path() + ".ds";
string dsname = Info().path + ".ds";
string nname = rotated_path + ".ds";
rename(dsname.c_str(), nname.c_str());
@ -411,7 +411,7 @@ bool DataSeries::DoRotate(string rotated_path, double open, double close, bool t
return false;
}
return OpenLog(Path());
return OpenLog(Info().path);
}
bool DataSeries::DoSetBuf(bool enabled)

View file

@ -26,7 +26,7 @@ public:
protected:
// Overidden from WriterBackend.
virtual bool DoInit(string path, int num_fields,
virtual bool DoInit(const WriterInfo& info, int num_fields,
const threading::Field* const * fields);
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,

View file

@ -1,14 +1,41 @@
#include "None.h"
#include "NetVar.h"
using namespace logging;
using namespace writer;
bool None::DoInit(const WriterInfo& info, int num_fields,
const threading::Field* const * fields)
{
if ( BifConst::LogNone::debug )
{
std::cout << "[logging::writer::None]" << std::endl;
std::cout << " path=" << info.path << std::endl;
std::cout << " rotation_interval=" << info.rotation_interval << std::endl;
std::cout << " rotation_base=" << info.rotation_base << std::endl;
for ( std::map<string,string>::const_iterator i = info.config.begin(); i != info.config.end(); i++ )
std::cout << " config[" << i->first << "] = " << i->second << std::endl;
for ( int i = 0; i < num_fields; i++ )
{
const threading::Field* field = fields[i];
std::cout << " field " << field->name << ": "
<< type_name(field->type) << std::endl;
}
std::cout << std::endl;
}
return true;
}
bool None::DoRotate(string rotated_path, double open, double close, bool terminating)
{
if ( ! FinishedRotation(string("/dev/null"), Path(), open, close, terminating))
if ( ! FinishedRotation(string("/dev/null"), Info().path, open, close, terminating))
{
Error(Fmt("error rotating %s", Path().c_str()));
Error(Fmt("error rotating %s", Info().path.c_str()));
return false;
}

View file

@ -18,8 +18,8 @@ public:
{ return new None(frontend); }
protected:
virtual bool DoInit(string path, int num_fields,
const threading::Field* const * fields) { return true; }
virtual bool DoInit(const WriterInfo& info, int num_fields,
const threading::Field* const * fields);
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
threading::Value** vals) { return true; }
@ -27,7 +27,7 @@ protected:
virtual bool DoRotate(string rotated_path, double open,
double close, bool terminating);
virtual bool DoFlush() { return true; }
virtual bool DoFinish() { return true; }
virtual bool DoFinish() { WriterBackend::DoFinish(); return true; }
};
}