mirror of
https://github.com/zeek/zeek.git
synced 2025-10-04 23:58:20 +00:00
Merge remote-tracking branch 'origin/master' into topic/bernhard/sqlite
Conflicts: scripts/base/frameworks/logging/__load__.bro src/CMakeLists.txt src/logging.bif src/types.bif
This commit is contained in:
commit
da157c8ded
296 changed files with 4703 additions and 2175 deletions
|
@ -6,6 +6,7 @@
|
|||
#include "../EventHandler.h"
|
||||
#include "../NetVar.h"
|
||||
#include "../Net.h"
|
||||
#include "../Type.h"
|
||||
|
||||
#include "threading/Manager.h"
|
||||
#include "threading/SerialTypes.h"
|
||||
|
@ -17,6 +18,10 @@
|
|||
#include "writers/Ascii.h"
|
||||
#include "writers/None.h"
|
||||
|
||||
#ifdef USE_ELASTICSEARCH
|
||||
#include "writers/ElasticSearch.h"
|
||||
#endif
|
||||
|
||||
#ifdef USE_DATASERIES
|
||||
#include "writers/DataSeries.h"
|
||||
#endif
|
||||
|
@ -41,6 +46,11 @@ struct WriterDefinition {
|
|||
WriterDefinition log_writers[] = {
|
||||
{ BifEnum::Log::WRITER_NONE, "None", 0, writer::None::Instantiate },
|
||||
{ BifEnum::Log::WRITER_ASCII, "Ascii", 0, writer::Ascii::Instantiate },
|
||||
|
||||
#ifdef USE_ELASTICSEARCH
|
||||
{ BifEnum::Log::WRITER_ELASTICSEARCH, "ElasticSearch", 0, writer::ElasticSearch::Instantiate },
|
||||
#endif
|
||||
|
||||
#ifdef USE_DATASERIES
|
||||
{ BifEnum::Log::WRITER_DATASERIES, "DataSeries", 0, writer::DataSeries::Instantiate },
|
||||
#endif
|
||||
|
@ -84,7 +94,8 @@ struct Manager::WriterInfo {
|
|||
double interval;
|
||||
Func* postprocessor;
|
||||
WriterFrontend* writer;
|
||||
WriterBackend::WriterInfo info;
|
||||
WriterBackend::WriterInfo* info;
|
||||
string instantiating_filter;
|
||||
};
|
||||
|
||||
struct Manager::Stream {
|
||||
|
@ -127,6 +138,7 @@ Manager::Stream::~Stream()
|
|||
|
||||
Unref(winfo->type);
|
||||
delete winfo->writer;
|
||||
delete winfo->info;
|
||||
delete winfo;
|
||||
}
|
||||
|
||||
|
@ -205,7 +217,6 @@ WriterBackend* Manager::CreateBackend(WriterFrontend* frontend, bro_int_t type)
|
|||
WriterBackend* backend = (*ld->factory)(frontend);
|
||||
assert(backend);
|
||||
|
||||
frontend->ty_name = ld->name;
|
||||
return backend;
|
||||
}
|
||||
|
||||
|
@ -485,18 +496,17 @@ bool Manager::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt,
|
|||
return false;
|
||||
}
|
||||
|
||||
threading::Field* field = new threading::Field();
|
||||
field->name = new_path;
|
||||
field->type = t->Tag();
|
||||
field->optional = rt->FieldDecl(i)->FindAttr(ATTR_OPTIONAL);
|
||||
TypeTag st = TYPE_VOID;
|
||||
|
||||
if ( field->type == TYPE_TABLE )
|
||||
field->subtype = t->AsSetType()->Indices()->PureType()->Tag();
|
||||
if ( t->Tag() == TYPE_TABLE )
|
||||
st = t->AsSetType()->Indices()->PureType()->Tag();
|
||||
|
||||
else if ( field->type == TYPE_VECTOR )
|
||||
field->subtype = t->AsVectorType()->YieldType()->Tag();
|
||||
else if ( t->Tag() == TYPE_VECTOR )
|
||||
st = t->AsVectorType()->YieldType()->Tag();
|
||||
|
||||
filter->fields[filter->num_fields - 1] = field;
|
||||
bool optional = rt->FieldDecl(i)->FindAttr(ATTR_OPTIONAL);
|
||||
|
||||
filter->fields[filter->num_fields - 1] = new threading::Field(new_path.c_str(), 0, t->Tag(), st, optional);
|
||||
}
|
||||
|
||||
return true;
|
||||
|
@ -603,7 +613,7 @@ bool Manager::AddFilter(EnumVal* id, RecordVal* fval)
|
|||
{
|
||||
threading::Field* field = filter->fields[i];
|
||||
DBG_LOG(DBG_LOGGING, " field %10s: %s",
|
||||
field->name.c_str(), type_name(field->type));
|
||||
field->name, type_name(field->type));
|
||||
}
|
||||
#endif
|
||||
|
||||
|
@ -764,8 +774,18 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
|
|||
WriterFrontend* writer = 0;
|
||||
|
||||
if ( w != stream->writers.end() )
|
||||
{
|
||||
if ( w->second->instantiating_filter != filter->name )
|
||||
{
|
||||
reporter->Warning("Skipping write to filter '%s' on path '%s'"
|
||||
" because filter '%s' has already instantiated the same"
|
||||
" writer type for that path", filter->name.c_str(),
|
||||
filter->path.c_str(), w->second->instantiating_filter.c_str());
|
||||
continue;
|
||||
}
|
||||
// We know this writer already.
|
||||
writer = w->second->writer;
|
||||
}
|
||||
|
||||
else
|
||||
{
|
||||
|
@ -778,8 +798,9 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
|
|||
for ( int j = 0; j < filter->num_fields; ++j )
|
||||
arg_fields[j] = new threading::Field(*filter->fields[j]);
|
||||
|
||||
WriterBackend::WriterInfo info;
|
||||
info.path = path;
|
||||
WriterBackend::WriterInfo* info = new WriterBackend::WriterInfo;
|
||||
info->path = copy_string(path.c_str());
|
||||
info->network_time = network_time;
|
||||
|
||||
HashKey* k;
|
||||
IterCookie* c = filter->config->AsTable()->InitForIteration();
|
||||
|
@ -790,7 +811,7 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
|
|||
ListVal* index = filter->config->RecoverIndex(k);
|
||||
string key = index->Index(0)->AsString()->CheckString();
|
||||
string value = v->Value()->AsString()->CheckString();
|
||||
info.config.insert(std::make_pair(key, value));
|
||||
info->config.insert(std::make_pair(copy_string(key.c_str()), copy_string(value.c_str())));
|
||||
Unref(index);
|
||||
delete k;
|
||||
}
|
||||
|
@ -799,7 +820,7 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
|
|||
|
||||
writer = CreateWriter(stream->id, filter->writer,
|
||||
info, filter->num_fields,
|
||||
arg_fields, filter->local, filter->remote);
|
||||
arg_fields, filter->local, filter->remote, filter->name);
|
||||
|
||||
if ( ! writer )
|
||||
{
|
||||
|
@ -852,11 +873,16 @@ threading::Value* Manager::ValToLogVal(Val* val, BroType* ty)
|
|||
val->Type()->AsEnumType()->Lookup(val->InternalInt());
|
||||
|
||||
if ( s )
|
||||
lval->val.string_val = new string(s);
|
||||
{
|
||||
lval->val.string_val.data = copy_string(s);
|
||||
lval->val.string_val.length = strlen(s);
|
||||
}
|
||||
|
||||
else
|
||||
{
|
||||
val->Type()->Error("enum type does not contain value", val);
|
||||
lval->val.string_val = new string();
|
||||
lval->val.string_val.data = copy_string("");
|
||||
lval->val.string_val.length = 0;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -888,15 +914,20 @@ threading::Value* Manager::ValToLogVal(Val* val, BroType* ty)
|
|||
case TYPE_STRING:
|
||||
{
|
||||
const BroString* s = val->AsString();
|
||||
lval->val.string_val =
|
||||
new string((const char*) s->Bytes(), s->Len());
|
||||
char* buf = new char[s->Len()];
|
||||
memcpy(buf, s->Bytes(), s->Len());
|
||||
|
||||
lval->val.string_val.data = buf;
|
||||
lval->val.string_val.length = s->Len();
|
||||
break;
|
||||
}
|
||||
|
||||
case TYPE_FILE:
|
||||
{
|
||||
const BroFile* f = val->AsFile();
|
||||
lval->val.string_val = new string(f->Name());
|
||||
string s = f->Name();
|
||||
lval->val.string_val.data = copy_string(s.c_str());
|
||||
lval->val.string_val.length = s.size();
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -905,7 +936,9 @@ threading::Value* Manager::ValToLogVal(Val* val, BroType* ty)
|
|||
ODesc d;
|
||||
const Func* f = val->AsFunc();
|
||||
f->Describe(&d);
|
||||
lval->val.string_val = new string(d.Description());
|
||||
const char* s = d.Description();
|
||||
lval->val.string_val.data = copy_string(s);
|
||||
lval->val.string_val.length = strlen(s);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -985,34 +1018,33 @@ threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
|
|||
return vals;
|
||||
}
|
||||
|
||||
WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, const WriterBackend::WriterInfo& info,
|
||||
int num_fields, const threading::Field* const* fields, bool local, bool remote)
|
||||
WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info,
|
||||
int num_fields, const threading::Field* const* fields, bool local, bool remote,
|
||||
const string& instantiating_filter)
|
||||
{
|
||||
Stream* stream = FindStream(id);
|
||||
|
||||
if ( ! stream )
|
||||
// Don't know this stream.
|
||||
return false;
|
||||
return 0;
|
||||
|
||||
Stream::WriterMap::iterator w =
|
||||
stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), info.path));
|
||||
stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), info->path));
|
||||
|
||||
if ( w != stream->writers.end() )
|
||||
// If we already have a writer for this. That's fine, we just
|
||||
// return it.
|
||||
return w->second->writer;
|
||||
|
||||
WriterFrontend* writer_obj = new WriterFrontend(id, writer, local, remote);
|
||||
assert(writer_obj);
|
||||
|
||||
WriterInfo* winfo = new WriterInfo;
|
||||
winfo->type = writer->Ref()->AsEnumVal();
|
||||
winfo->writer = writer_obj;
|
||||
winfo->writer = 0;
|
||||
winfo->open_time = network_time;
|
||||
winfo->rotation_timer = 0;
|
||||
winfo->interval = 0;
|
||||
winfo->postprocessor = 0;
|
||||
winfo->info = info;
|
||||
winfo->instantiating_filter = instantiating_filter;
|
||||
|
||||
// Search for a corresponding filter for the writer/path pair and use its
|
||||
// rotation settings. If no matching filter is found, fall back on
|
||||
|
@ -1024,7 +1056,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, const Writer
|
|||
{
|
||||
Filter* f = *it;
|
||||
if ( f->writer->AsEnum() == writer->AsEnum() &&
|
||||
f->path == winfo->writer->info.path )
|
||||
f->path == info->path )
|
||||
{
|
||||
found_filter_match = true;
|
||||
winfo->interval = f->interval;
|
||||
|
@ -1040,10 +1072,8 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, const Writer
|
|||
winfo->interval = id->ID_Val()->AsInterval();
|
||||
}
|
||||
|
||||
InstallRotationTimer(winfo);
|
||||
|
||||
stream->writers.insert(
|
||||
Stream::WriterMap::value_type(Stream::WriterPathPair(writer->AsEnum(), info.path),
|
||||
Stream::WriterMap::value_type(Stream::WriterPathPair(writer->AsEnum(), info->path),
|
||||
winfo));
|
||||
|
||||
// Still need to set the WriterInfo's rotation parameters, which we
|
||||
|
@ -1051,12 +1081,15 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, const Writer
|
|||
const char* base_time = log_rotate_base_time ?
|
||||
log_rotate_base_time->AsString()->CheckString() : 0;
|
||||
|
||||
winfo->info.rotation_interval = winfo->interval;
|
||||
winfo->info.rotation_base = parse_rotate_base_time(base_time);
|
||||
winfo->info->rotation_interval = winfo->interval;
|
||||
winfo->info->rotation_base = parse_rotate_base_time(base_time);
|
||||
|
||||
writer_obj->Init(winfo->info, num_fields, fields);
|
||||
winfo->writer = new WriterFrontend(*winfo->info, id, writer, local, remote);
|
||||
winfo->writer->Init(num_fields, fields);
|
||||
|
||||
return writer_obj;
|
||||
InstallRotationTimer(winfo);
|
||||
|
||||
return winfo->writer;
|
||||
}
|
||||
|
||||
void Manager::DeleteVals(int num_fields, threading::Value** vals)
|
||||
|
@ -1134,7 +1167,7 @@ void Manager::SendAllWritersTo(RemoteSerializer::PeerID peer)
|
|||
EnumVal writer_val(i->first.first, BifType::Enum::Log::Writer);
|
||||
remote_serializer->SendLogCreateWriter(peer, (*s)->id,
|
||||
&writer_val,
|
||||
i->second->info,
|
||||
*i->second->info,
|
||||
writer->NumFields(),
|
||||
writer->Fields());
|
||||
}
|
||||
|
@ -1167,7 +1200,7 @@ bool Manager::Flush(EnumVal* id)
|
|||
|
||||
for ( Stream::WriterMap::iterator i = stream->writers.begin();
|
||||
i != stream->writers.end(); i++ )
|
||||
i->second->writer->Flush();
|
||||
i->second->writer->Flush(network_time);
|
||||
|
||||
RemoveDisabledWriters(stream);
|
||||
|
||||
|
@ -1270,14 +1303,14 @@ void Manager::InstallRotationTimer(WriterInfo* winfo)
|
|||
timer_mgr->Add(winfo->rotation_timer);
|
||||
|
||||
DBG_LOG(DBG_LOGGING, "Scheduled rotation timer for %s to %.6f",
|
||||
winfo->writer->Name().c_str(), winfo->rotation_timer->Time());
|
||||
winfo->writer->Name(), winfo->rotation_timer->Time());
|
||||
}
|
||||
}
|
||||
|
||||
void Manager::Rotate(WriterInfo* winfo)
|
||||
{
|
||||
DBG_LOG(DBG_LOGGING, "Rotating %s at %.6f",
|
||||
winfo->writer->Name().c_str(), network_time);
|
||||
winfo->writer->Name(), network_time);
|
||||
|
||||
// Build a temporary path for the writer to move the file to.
|
||||
struct tm tm;
|
||||
|
@ -1288,15 +1321,14 @@ void Manager::Rotate(WriterInfo* winfo)
|
|||
localtime_r(&teatime, &tm);
|
||||
strftime(buf, sizeof(buf), date_fmt, &tm);
|
||||
|
||||
string tmp = string(fmt("%s-%s", winfo->writer->Info().path.c_str(), buf));
|
||||
|
||||
// Trigger the rotation.
|
||||
const char* tmp = fmt("%s-%s", winfo->writer->Info().path, buf);
|
||||
winfo->writer->Rotate(tmp, winfo->open_time, network_time, terminating);
|
||||
|
||||
++rotations_pending;
|
||||
}
|
||||
|
||||
bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string old_name,
|
||||
bool Manager::FinishedRotation(WriterFrontend* writer, const char* new_name, const char* old_name,
|
||||
double open, double close, bool terminating)
|
||||
{
|
||||
--rotations_pending;
|
||||
|
@ -1306,7 +1338,7 @@ bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string o
|
|||
return true;
|
||||
|
||||
DBG_LOG(DBG_LOGGING, "Finished rotating %s at %.6f, new name %s",
|
||||
writer->Name().c_str(), network_time, new_name.c_str());
|
||||
writer->Name(), network_time, new_name);
|
||||
|
||||
WriterInfo* winfo = FindWriter(writer);
|
||||
if ( ! winfo )
|
||||
|
@ -1315,8 +1347,8 @@ bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string o
|
|||
// Create the RotationInfo record.
|
||||
RecordVal* info = new RecordVal(BifType::Record::Log::RotationInfo);
|
||||
info->Assign(0, winfo->type->Ref());
|
||||
info->Assign(1, new StringVal(new_name.c_str()));
|
||||
info->Assign(2, new StringVal(winfo->writer->Info().path.c_str()));
|
||||
info->Assign(1, new StringVal(new_name));
|
||||
info->Assign(2, new StringVal(winfo->writer->Info().path));
|
||||
info->Assign(3, new Val(open, TYPE_TIME));
|
||||
info->Assign(4, new Val(close, TYPE_TIME));
|
||||
info->Assign(5, new Val(terminating, TYPE_BOOL));
|
||||
|
|
|
@ -162,10 +162,10 @@ protected:
|
|||
|
||||
//// Function also used by the RemoteSerializer.
|
||||
|
||||
// Takes ownership of fields.
|
||||
WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, const WriterBackend::WriterInfo& info,
|
||||
// Takes ownership of fields and info.
|
||||
WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info,
|
||||
int num_fields, const threading::Field* const* fields,
|
||||
bool local, bool remote);
|
||||
bool local, bool remote, const string& instantiating_filter="");
|
||||
|
||||
// Takes ownership of values..
|
||||
bool Write(EnumVal* id, EnumVal* writer, string path,
|
||||
|
@ -175,7 +175,7 @@ protected:
|
|||
void SendAllWritersTo(RemoteSerializer::PeerID peer);
|
||||
|
||||
// Signals that a file has been rotated.
|
||||
bool FinishedRotation(WriterFrontend* writer, string new_name, string old_name,
|
||||
bool FinishedRotation(WriterFrontend* writer, const char* new_name, const char* old_name,
|
||||
double open, double close, bool terminating);
|
||||
|
||||
// Deletes the values as passed into Write().
|
||||
|
|
|
@ -18,20 +18,26 @@ namespace logging {
|
|||
class RotationFinishedMessage : public threading::OutputMessage<WriterFrontend>
|
||||
{
|
||||
public:
|
||||
RotationFinishedMessage(WriterFrontend* writer, string new_name, string old_name,
|
||||
RotationFinishedMessage(WriterFrontend* writer, const char* new_name, const char* old_name,
|
||||
double open, double close, bool terminating)
|
||||
: threading::OutputMessage<WriterFrontend>("RotationFinished", writer),
|
||||
new_name(new_name), old_name(old_name), open(open),
|
||||
new_name(copy_string(new_name)), old_name(copy_string(old_name)), open(open),
|
||||
close(close), terminating(terminating) { }
|
||||
|
||||
virtual ~RotationFinishedMessage()
|
||||
{
|
||||
delete [] new_name;
|
||||
delete [] old_name;
|
||||
}
|
||||
|
||||
virtual bool Process()
|
||||
{
|
||||
return log_mgr->FinishedRotation(Object(), new_name, old_name, open, close, terminating);
|
||||
}
|
||||
|
||||
private:
|
||||
string new_name;
|
||||
string old_name;
|
||||
const char* new_name;
|
||||
const char* old_name;
|
||||
double open;
|
||||
double close;
|
||||
bool terminating;
|
||||
|
@ -65,12 +71,17 @@ bool WriterBackend::WriterInfo::Read(SerializationFormat* fmt)
|
|||
{
|
||||
int size;
|
||||
|
||||
if ( ! (fmt->Read(&path, "path") &&
|
||||
string tmp_path;
|
||||
|
||||
if ( ! (fmt->Read(&tmp_path, "path") &&
|
||||
fmt->Read(&rotation_base, "rotation_base") &&
|
||||
fmt->Read(&rotation_interval, "rotation_interval") &&
|
||||
fmt->Read(&network_time, "network_time") &&
|
||||
fmt->Read(&size, "config_size")) )
|
||||
return false;
|
||||
|
||||
path = copy_string(tmp_path.c_str());
|
||||
|
||||
config.clear();
|
||||
|
||||
while ( size )
|
||||
|
@ -81,7 +92,7 @@ bool WriterBackend::WriterInfo::Read(SerializationFormat* fmt)
|
|||
if ( ! (fmt->Read(&value, "config-value") && fmt->Read(&value, "config-key")) )
|
||||
return false;
|
||||
|
||||
config.insert(std::make_pair(value, key));
|
||||
config.insert(std::make_pair(copy_string(value.c_str()), copy_string(key.c_str())));
|
||||
}
|
||||
|
||||
return true;
|
||||
|
@ -95,10 +106,11 @@ bool WriterBackend::WriterInfo::Write(SerializationFormat* fmt) const
|
|||
if ( ! (fmt->Write(path, "path") &&
|
||||
fmt->Write(rotation_base, "rotation_base") &&
|
||||
fmt->Write(rotation_interval, "rotation_interval") &&
|
||||
fmt->Write(network_time, "network_time") &&
|
||||
fmt->Write(size, "config_size")) )
|
||||
return false;
|
||||
|
||||
for ( config_map::const_iterator i = config.begin(); i != config.end(); ++i )
|
||||
for ( config_map::const_iterator i = config.begin(); i != config.end(); ++i )
|
||||
{
|
||||
if ( ! (fmt->Write(i->first, "config-value") && fmt->Write(i->second, "config-key")) )
|
||||
return false;
|
||||
|
@ -113,8 +125,7 @@ WriterBackend::WriterBackend(WriterFrontend* arg_frontend) : MsgThread()
|
|||
fields = 0;
|
||||
buffering = true;
|
||||
frontend = arg_frontend;
|
||||
|
||||
info.path = "<path not yet set>";
|
||||
info = new WriterInfo(frontend->Info());
|
||||
|
||||
SetName(frontend->Name());
|
||||
}
|
||||
|
@ -128,6 +139,8 @@ WriterBackend::~WriterBackend()
|
|||
|
||||
delete [] fields;
|
||||
}
|
||||
|
||||
delete info;
|
||||
}
|
||||
|
||||
void WriterBackend::DeleteVals(int num_writes, Value*** vals)
|
||||
|
@ -144,7 +157,7 @@ void WriterBackend::DeleteVals(int num_writes, Value*** vals)
|
|||
delete [] vals;
|
||||
}
|
||||
|
||||
bool WriterBackend::FinishedRotation(string new_name, string old_name,
|
||||
bool WriterBackend::FinishedRotation(const char* new_name, const char* old_name,
|
||||
double open, double close, bool terminating)
|
||||
{
|
||||
SendOut(new RotationFinishedMessage(frontend, new_name, old_name, open, close, terminating));
|
||||
|
@ -156,17 +169,12 @@ void WriterBackend::DisableFrontend()
|
|||
SendOut(new DisableMessage(frontend));
|
||||
}
|
||||
|
||||
bool WriterBackend::Init(const WriterInfo& arg_info, int arg_num_fields, const Field* const* arg_fields)
|
||||
bool WriterBackend::Init(int arg_num_fields, const Field* const* arg_fields)
|
||||
{
|
||||
info = arg_info;
|
||||
num_fields = arg_num_fields;
|
||||
fields = arg_fields;
|
||||
|
||||
string name = Fmt("%s/%s", info.path.c_str(), frontend->Name().c_str());
|
||||
|
||||
SetName(name);
|
||||
|
||||
if ( ! DoInit(arg_info, arg_num_fields, arg_fields) )
|
||||
if ( ! DoInit(*info, arg_num_fields, arg_fields) )
|
||||
{
|
||||
DisableFrontend();
|
||||
return false;
|
||||
|
@ -193,7 +201,6 @@ bool WriterBackend::Write(int arg_num_fields, int num_writes, Value*** vals)
|
|||
return false;
|
||||
}
|
||||
|
||||
#ifdef DEBUG
|
||||
// Double-check all the types match.
|
||||
for ( int j = 0; j < num_writes; j++ )
|
||||
{
|
||||
|
@ -201,17 +208,17 @@ bool WriterBackend::Write(int arg_num_fields, int num_writes, Value*** vals)
|
|||
{
|
||||
if ( vals[j][i]->type != fields[i]->type )
|
||||
{
|
||||
#ifdef DEBUG
|
||||
const char* msg = Fmt("Field type doesn't match in WriterBackend::Write() (%d vs. %d)",
|
||||
vals[j][i]->type, fields[i]->type);
|
||||
Debug(DBG_LOGGING, msg);
|
||||
|
||||
#endif
|
||||
DisableFrontend();
|
||||
DeleteVals(num_writes, vals);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
bool success = true;
|
||||
|
||||
|
@ -248,7 +255,7 @@ bool WriterBackend::SetBuf(bool enabled)
|
|||
return true;
|
||||
}
|
||||
|
||||
bool WriterBackend::Rotate(string rotated_path, double open,
|
||||
bool WriterBackend::Rotate(const char* rotated_path, double open,
|
||||
double close, bool terminating)
|
||||
{
|
||||
if ( ! DoRotate(rotated_path, open, close, terminating) )
|
||||
|
@ -260,9 +267,9 @@ bool WriterBackend::Rotate(string rotated_path, double open,
|
|||
return true;
|
||||
}
|
||||
|
||||
bool WriterBackend::Flush()
|
||||
bool WriterBackend::Flush(double network_time)
|
||||
{
|
||||
if ( ! DoFlush() )
|
||||
if ( ! DoFlush(network_time) )
|
||||
{
|
||||
DisableFrontend();
|
||||
return false;
|
||||
|
@ -271,13 +278,15 @@ bool WriterBackend::Flush()
|
|||
return true;
|
||||
}
|
||||
|
||||
bool WriterBackend::DoHeartbeat(double network_time, double current_time)
|
||||
bool WriterBackend::OnFinish(double network_time)
|
||||
{
|
||||
MsgThread::DoHeartbeat(network_time, current_time);
|
||||
return DoFinish(network_time);
|
||||
}
|
||||
|
||||
bool WriterBackend::OnHeartbeat(double network_time, double current_time)
|
||||
{
|
||||
SendOut(new FlushWriteBufferMessage(frontend));
|
||||
|
||||
return true;
|
||||
return DoHeartbeat(network_time, current_time);
|
||||
}
|
||||
|
||||
string WriterBackend::Render(const threading::Value::addr_t& addr) const
|
||||
|
|
|
@ -48,14 +48,17 @@ public:
|
|||
*/
|
||||
struct WriterInfo
|
||||
{
|
||||
typedef std::map<string, string> config_map;
|
||||
// Structure takes ownership of these strings.
|
||||
typedef std::map<const char*, const char*, CompareString> config_map;
|
||||
|
||||
/**
|
||||
* A string left to the interpretation of the writer
|
||||
* implementation; it corresponds to the value configured on
|
||||
* the script-level for the logging filter.
|
||||
* implementation; it corresponds to the 'path' value configured
|
||||
* on the script-level for the logging filter.
|
||||
*
|
||||
* Structure takes ownership of string.
|
||||
*/
|
||||
string path;
|
||||
const char* path;
|
||||
|
||||
/**
|
||||
* The rotation interval as configured for this writer.
|
||||
|
@ -67,13 +70,47 @@ public:
|
|||
*/
|
||||
double rotation_base;
|
||||
|
||||
/**
|
||||
* The network time when the writer is created.
|
||||
*/
|
||||
double network_time;
|
||||
|
||||
/**
|
||||
* A map of key/value pairs corresponding to the relevant
|
||||
* filter's "config" table.
|
||||
*/
|
||||
std::map<string, string> config;
|
||||
config_map config;
|
||||
|
||||
WriterInfo() : path(0), rotation_interval(0.0), rotation_base(0.0),
|
||||
network_time(0.0)
|
||||
{
|
||||
}
|
||||
|
||||
WriterInfo(const WriterInfo& other)
|
||||
{
|
||||
path = other.path ? copy_string(other.path) : 0;
|
||||
rotation_interval = other.rotation_interval;
|
||||
rotation_base = other.rotation_base;
|
||||
network_time = other.network_time;
|
||||
|
||||
for ( config_map::const_iterator i = other.config.begin(); i != other.config.end(); i++ )
|
||||
config.insert(std::make_pair(copy_string(i->first), copy_string(i->second)));
|
||||
}
|
||||
|
||||
~WriterInfo()
|
||||
{
|
||||
delete [] path;
|
||||
|
||||
for ( config_map::iterator i = config.begin(); i != config.end(); i++ )
|
||||
{
|
||||
delete [] i->first;
|
||||
delete [] i->second;
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
const WriterInfo& operator=(const WriterInfo& other); // Disable.
|
||||
|
||||
friend class ::RemoteSerializer;
|
||||
|
||||
// Note, these need to be adapted when changing the struct's
|
||||
|
@ -85,15 +122,16 @@ public:
|
|||
/**
|
||||
* One-time initialization of the writer to define the logged fields.
|
||||
*
|
||||
* @param info Meta information for the writer.
|
||||
* @param num_fields
|
||||
* @param num_fields
|
||||
*
|
||||
* @param fields An array of size \a num_fields with the log fields.
|
||||
* The methods takes ownership of the array.
|
||||
*
|
||||
* @param frontend_name The name of the front-end writer implementation.
|
||||
*
|
||||
* @return False if an error occured.
|
||||
*/
|
||||
bool Init(const WriterInfo& info, int num_fields, const threading::Field* const* fields);
|
||||
bool Init(int num_fields, const threading::Field* const* fields);
|
||||
|
||||
/**
|
||||
* Writes one log entry.
|
||||
|
@ -127,9 +165,11 @@ public:
|
|||
* Flushes any currently buffered output, assuming the writer
|
||||
* supports that. (If not, it will be ignored).
|
||||
*
|
||||
* @param network_time The network time when the flush was triggered.
|
||||
*
|
||||
* @return False if an error occured.
|
||||
*/
|
||||
bool Flush();
|
||||
bool Flush(double network_time);
|
||||
|
||||
/**
|
||||
* Triggers rotation, if the writer supports that. (If not, it will
|
||||
|
@ -137,7 +177,7 @@ public:
|
|||
*
|
||||
* @return False if an error occured.
|
||||
*/
|
||||
bool Rotate(string rotated_path, double open, double close, bool terminating);
|
||||
bool Rotate(const char* rotated_path, double open, double close, bool terminating);
|
||||
|
||||
/**
|
||||
* Disables the frontend that has instantiated this backend. Once
|
||||
|
@ -146,9 +186,9 @@ public:
|
|||
void DisableFrontend();
|
||||
|
||||
/**
|
||||
* Returns the additional writer information into the constructor.
|
||||
* Returns the additional writer information passed into the constructor.
|
||||
*/
|
||||
const WriterInfo& Info() const { return info; }
|
||||
const WriterInfo& Info() const { return *info; }
|
||||
|
||||
/**
|
||||
* Returns the number of log fields as passed into the constructor.
|
||||
|
@ -184,7 +224,7 @@ public:
|
|||
* @param terminating: True if the original rotation request occured
|
||||
* due to the main Bro process shutting down.
|
||||
*/
|
||||
bool FinishedRotation(string new_name, string old_name,
|
||||
bool FinishedRotation(const char* new_name, const char* old_name,
|
||||
double open, double close, bool terminating);
|
||||
|
||||
/** Helper method to render an IP address as a string.
|
||||
|
@ -211,6 +251,10 @@ public:
|
|||
*/
|
||||
string Render(double d) const;
|
||||
|
||||
// Overridden from MsgThread.
|
||||
virtual bool OnHeartbeat(double network_time, double current_time);
|
||||
virtual bool OnFinish(double network_time);
|
||||
|
||||
protected:
|
||||
friend class FinishMessage;
|
||||
|
||||
|
@ -270,8 +314,10 @@ protected:
|
|||
* will then be disabled and eventually deleted. When returning
|
||||
* false, an implementation should also call Error() to indicate what
|
||||
* happened.
|
||||
*
|
||||
* @param network_time The network time when the flush was triggered.
|
||||
*/
|
||||
virtual bool DoFlush() = 0;
|
||||
virtual bool DoFlush(double network_time) = 0;
|
||||
|
||||
/**
|
||||
* Writer-specific method implementing log rotation. Most directly
|
||||
|
@ -307,25 +353,24 @@ protected:
|
|||
* due the main Bro prcoess terminating (and not because we've
|
||||
* reached a regularly scheduled time for rotation).
|
||||
*/
|
||||
virtual bool DoRotate(string rotated_path, double open, double close,
|
||||
virtual bool DoRotate(const char* rotated_path, double open, double close,
|
||||
bool terminating) = 0;
|
||||
|
||||
/**
|
||||
* Writer-specific method called just before the threading system is
|
||||
* going to shutdown.
|
||||
* going to shutdown. It is assumed that once this messages returns,
|
||||
* the thread can be safely terminated.
|
||||
*
|
||||
* This method can be overridden but one must call
|
||||
* WriterBackend::DoFinish().
|
||||
* @param network_time The network time when the finish is triggered.
|
||||
*/
|
||||
virtual bool DoFinish() { return MsgThread::DoFinish(); }
|
||||
|
||||
virtual bool DoFinish(double network_time) = 0;
|
||||
/**
|
||||
* Triggered by regular heartbeat messages from the main thread.
|
||||
*
|
||||
* This method can be overridden but one must call
|
||||
* WriterBackend::DoHeartbeat().
|
||||
* This method can be overridden. Default implementation does
|
||||
* nothing.
|
||||
*/
|
||||
virtual bool DoHeartbeat(double network_time, double current_time);
|
||||
virtual bool DoHeartbeat(double network_time, double current_time) = 0;
|
||||
|
||||
private:
|
||||
/**
|
||||
|
@ -337,7 +382,7 @@ private:
|
|||
// this class, it's running in a different thread!
|
||||
WriterFrontend* frontend;
|
||||
|
||||
WriterInfo info; // Meta information as passed to Init().
|
||||
const WriterInfo* info; // Meta information.
|
||||
int num_fields; // Number of log fields.
|
||||
const threading::Field* const* fields; // Log fields.
|
||||
bool buffering; // True if buffering is enabled.
|
||||
|
|
|
@ -16,14 +16,15 @@ namespace logging {
|
|||
class InitMessage : public threading::InputMessage<WriterBackend>
|
||||
{
|
||||
public:
|
||||
InitMessage(WriterBackend* backend, const WriterBackend::WriterInfo& info, const int num_fields, const Field* const* fields)
|
||||
InitMessage(WriterBackend* backend, const int num_fields, const Field* const* fields)
|
||||
: threading::InputMessage<WriterBackend>("Init", backend),
|
||||
info(info), num_fields(num_fields), fields(fields) { }
|
||||
num_fields(num_fields), fields(fields)
|
||||
{}
|
||||
|
||||
virtual bool Process() { return Object()->Init(info, num_fields, fields); }
|
||||
|
||||
virtual bool Process() { return Object()->Init(num_fields, fields); }
|
||||
|
||||
private:
|
||||
WriterBackend::WriterInfo info;
|
||||
const int num_fields;
|
||||
const Field * const* fields;
|
||||
};
|
||||
|
@ -31,18 +32,20 @@ private:
|
|||
class RotateMessage : public threading::InputMessage<WriterBackend>
|
||||
{
|
||||
public:
|
||||
RotateMessage(WriterBackend* backend, WriterFrontend* frontend, const string rotated_path, const double open,
|
||||
RotateMessage(WriterBackend* backend, WriterFrontend* frontend, const char* rotated_path, const double open,
|
||||
const double close, const bool terminating)
|
||||
: threading::InputMessage<WriterBackend>("Rotate", backend),
|
||||
frontend(frontend),
|
||||
rotated_path(rotated_path), open(open),
|
||||
rotated_path(copy_string(rotated_path)), open(open),
|
||||
close(close), terminating(terminating) { }
|
||||
|
||||
virtual ~RotateMessage() { delete [] rotated_path; }
|
||||
|
||||
virtual bool Process() { return Object()->Rotate(rotated_path, open, close, terminating); }
|
||||
|
||||
private:
|
||||
WriterFrontend* frontend;
|
||||
const string rotated_path;
|
||||
const char* rotated_path;
|
||||
const double open;
|
||||
const double close;
|
||||
const bool terminating;
|
||||
|
@ -79,19 +82,13 @@ private:
|
|||
class FlushMessage : public threading::InputMessage<WriterBackend>
|
||||
{
|
||||
public:
|
||||
FlushMessage(WriterBackend* backend)
|
||||
: threading::InputMessage<WriterBackend>("Flush", backend) {}
|
||||
FlushMessage(WriterBackend* backend, double network_time)
|
||||
: threading::InputMessage<WriterBackend>("Flush", backend),
|
||||
network_time(network_time) {}
|
||||
|
||||
virtual bool Process() { return Object()->Flush(); }
|
||||
};
|
||||
|
||||
class FinishMessage : public threading::InputMessage<WriterBackend>
|
||||
{
|
||||
public:
|
||||
FinishMessage(WriterBackend* backend)
|
||||
: threading::InputMessage<WriterBackend>("Finish", backend) {}
|
||||
|
||||
virtual bool Process() { return Object()->DoFinish(); }
|
||||
virtual bool Process() { return Object()->Flush(network_time); }
|
||||
private:
|
||||
double network_time;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -100,7 +97,7 @@ public:
|
|||
|
||||
using namespace logging;
|
||||
|
||||
WriterFrontend::WriterFrontend(EnumVal* arg_stream, EnumVal* arg_writer, bool arg_local, bool arg_remote)
|
||||
WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVal* arg_stream, EnumVal* arg_writer, bool arg_local, bool arg_remote)
|
||||
{
|
||||
stream = arg_stream;
|
||||
writer = arg_writer;
|
||||
|
@ -113,7 +110,10 @@ WriterFrontend::WriterFrontend(EnumVal* arg_stream, EnumVal* arg_writer, bool ar
|
|||
remote = arg_remote;
|
||||
write_buffer = 0;
|
||||
write_buffer_pos = 0;
|
||||
ty_name = "<not set>";
|
||||
info = new WriterBackend::WriterInfo(arg_info);
|
||||
|
||||
const char* w = arg_writer->Type()->AsEnumType()->Lookup(arg_writer->InternalInt());
|
||||
name = copy_string(fmt("%s/%s", arg_info.path, w));
|
||||
|
||||
if ( local )
|
||||
{
|
||||
|
@ -131,26 +131,16 @@ WriterFrontend::~WriterFrontend()
|
|||
{
|
||||
Unref(stream);
|
||||
Unref(writer);
|
||||
}
|
||||
|
||||
string WriterFrontend::Name() const
|
||||
{
|
||||
if ( info.path.size() )
|
||||
return ty_name;
|
||||
|
||||
return ty_name + "/" + info.path;
|
||||
delete info;
|
||||
}
|
||||
|
||||
void WriterFrontend::Stop()
|
||||
{
|
||||
FlushWriteBuffer();
|
||||
SetDisable();
|
||||
|
||||
if ( backend )
|
||||
backend->Stop();
|
||||
}
|
||||
|
||||
void WriterFrontend::Init(const WriterBackend::WriterInfo& arg_info, int arg_num_fields, const Field* const * arg_fields)
|
||||
void WriterFrontend::Init(int arg_num_fields, const Field* const * arg_fields)
|
||||
{
|
||||
if ( disabled )
|
||||
return;
|
||||
|
@ -158,19 +148,18 @@ void WriterFrontend::Init(const WriterBackend::WriterInfo& arg_info, int arg_num
|
|||
if ( initialized )
|
||||
reporter->InternalError("writer initialize twice");
|
||||
|
||||
info = arg_info;
|
||||
num_fields = arg_num_fields;
|
||||
fields = arg_fields;
|
||||
|
||||
initialized = true;
|
||||
|
||||
if ( backend )
|
||||
backend->SendIn(new InitMessage(backend, arg_info, arg_num_fields, arg_fields));
|
||||
backend->SendIn(new InitMessage(backend, arg_num_fields, arg_fields));
|
||||
|
||||
if ( remote )
|
||||
remote_serializer->SendLogCreateWriter(stream,
|
||||
writer,
|
||||
arg_info,
|
||||
*info,
|
||||
arg_num_fields,
|
||||
arg_fields);
|
||||
|
||||
|
@ -184,7 +173,7 @@ void WriterFrontend::Write(int num_fields, Value** vals)
|
|||
if ( remote )
|
||||
remote_serializer->SendLogWrite(stream,
|
||||
writer,
|
||||
info.path,
|
||||
info->path,
|
||||
num_fields,
|
||||
vals);
|
||||
|
||||
|
@ -238,7 +227,7 @@ void WriterFrontend::SetBuf(bool enabled)
|
|||
FlushWriteBuffer();
|
||||
}
|
||||
|
||||
void WriterFrontend::Flush()
|
||||
void WriterFrontend::Flush(double network_time)
|
||||
{
|
||||
if ( disabled )
|
||||
return;
|
||||
|
@ -246,10 +235,10 @@ void WriterFrontend::Flush()
|
|||
FlushWriteBuffer();
|
||||
|
||||
if ( backend )
|
||||
backend->SendIn(new FlushMessage(backend));
|
||||
backend->SendIn(new FlushMessage(backend, network_time));
|
||||
}
|
||||
|
||||
void WriterFrontend::Rotate(string rotated_path, double open, double close, bool terminating)
|
||||
void WriterFrontend::Rotate(const char* rotated_path, double open, double close, bool terminating)
|
||||
{
|
||||
if ( disabled )
|
||||
return;
|
||||
|
@ -264,17 +253,6 @@ void WriterFrontend::Rotate(string rotated_path, double open, double close, bool
|
|||
log_mgr->FinishedRotation(0, "", rotated_path, open, close, terminating);
|
||||
}
|
||||
|
||||
void WriterFrontend::Finish()
|
||||
{
|
||||
if ( disabled )
|
||||
return;
|
||||
|
||||
FlushWriteBuffer();
|
||||
|
||||
if ( backend )
|
||||
backend->SendIn(new FinishMessage(backend));
|
||||
}
|
||||
|
||||
void WriterFrontend::DeleteVals(Value** vals)
|
||||
{
|
||||
// Note this code is duplicated in Manager::DeleteVals().
|
||||
|
|
|
@ -31,7 +31,11 @@ public:
|
|||
* script-level \c Log::Writer enum (e.g., \a WRITER_ASCII). The
|
||||
* frontend will internally instantiate a WriterBackend of the
|
||||
* corresponding type.
|
||||
*
|
||||
*
|
||||
* info: The meta information struct for the writer.
|
||||
*
|
||||
* writer_name: A descriptive name for the writer's type.
|
||||
*
|
||||
* local: If true, the writer will instantiate a local backend.
|
||||
*
|
||||
* remote: If true, the writer will forward all data to remote
|
||||
|
@ -39,7 +43,7 @@ public:
|
|||
*
|
||||
* Frontends must only be instantiated by the main thread.
|
||||
*/
|
||||
WriterFrontend(EnumVal* stream, EnumVal* writer, bool local, bool remote);
|
||||
WriterFrontend(const WriterBackend::WriterInfo& info, EnumVal* stream, EnumVal* writer, bool local, bool remote);
|
||||
|
||||
/**
|
||||
* Destructor.
|
||||
|
@ -50,7 +54,7 @@ public:
|
|||
|
||||
/**
|
||||
* Stops all output to this writer. Calling this methods disables all
|
||||
* message forwarding to the backend and stops the backend thread.
|
||||
* message forwarding to the backend.
|
||||
*
|
||||
* This method must only be called from the main thread.
|
||||
*/
|
||||
|
@ -68,7 +72,7 @@ public:
|
|||
*
|
||||
* This method must only be called from the main thread.
|
||||
*/
|
||||
void Init(const WriterBackend::WriterInfo& info, int num_fields, const threading::Field* const* fields);
|
||||
void Init(int num_fields, const threading::Field* const* fields);
|
||||
|
||||
/**
|
||||
* Write out a record.
|
||||
|
@ -114,8 +118,10 @@ public:
|
|||
* message back that will asynchronously call Disable().
|
||||
*
|
||||
* This method must only be called from the main thread.
|
||||
*
|
||||
* @param network_time The network time when the flush was triggered.
|
||||
*/
|
||||
void Flush();
|
||||
void Flush(double network_time);
|
||||
|
||||
/**
|
||||
* Triggers log rotation.
|
||||
|
@ -128,7 +134,7 @@ public:
|
|||
*
|
||||
* This method must only be called from the main thread.
|
||||
*/
|
||||
void Rotate(string rotated_path, double open, double close, bool terminating);
|
||||
void Rotate(const char* rotated_path, double open, double close, bool terminating);
|
||||
|
||||
/**
|
||||
* Finalizes writing to this tream.
|
||||
|
@ -138,8 +144,10 @@ public:
|
|||
* sends a message back that will asynchronously call Disable().
|
||||
*
|
||||
* This method must only be called from the main thread.
|
||||
*
|
||||
* @param network_time The network time when the finish was triggered.
|
||||
*/
|
||||
void Finish();
|
||||
void Finish(double network_time);
|
||||
|
||||
/**
|
||||
* Explicitly triggers a transfer of all potentially buffered Write()
|
||||
|
@ -171,7 +179,7 @@ public:
|
|||
/**
|
||||
* Returns the additional writer information as passed into the constructor.
|
||||
*/
|
||||
const WriterBackend::WriterInfo& Info() const { return info; }
|
||||
const WriterBackend::WriterInfo& Info() const { return *info; }
|
||||
|
||||
/**
|
||||
* Returns the number of log fields as passed into the constructor.
|
||||
|
@ -184,7 +192,7 @@ public:
|
|||
*
|
||||
* This method is safe to call from any thread.
|
||||
*/
|
||||
string Name() const;
|
||||
const char* Name() const { return name; }
|
||||
|
||||
/**
|
||||
* Returns the log fields as passed into the constructor.
|
||||
|
@ -206,8 +214,8 @@ protected:
|
|||
bool local; // True if logging locally.
|
||||
bool remote; // True if loggin remotely.
|
||||
|
||||
string ty_name; // Name of the backend type. Set by the manager.
|
||||
WriterBackend::WriterInfo info; // The writer information.
|
||||
const char* name; // Descriptive name of the
|
||||
WriterBackend::WriterInfo* info; // The writer information.
|
||||
int num_fields; // The number of log fields.
|
||||
const threading::Field* const* fields; // The log fields.
|
||||
|
||||
|
|
|
@ -2,6 +2,8 @@
|
|||
|
||||
#include <string>
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "NetVar.h"
|
||||
#include "threading/SerialTypes.h"
|
||||
|
@ -15,10 +17,11 @@ using threading::Field;
|
|||
|
||||
Ascii::Ascii(WriterFrontend* frontend) : WriterBackend(frontend)
|
||||
{
|
||||
file = 0;
|
||||
fd = 0;
|
||||
ascii_done = false;
|
||||
|
||||
output_to_stdout = BifConst::LogAscii::output_to_stdout;
|
||||
include_header = BifConst::LogAscii::include_header;
|
||||
include_meta = BifConst::LogAscii::include_meta;
|
||||
|
||||
separator_len = BifConst::LogAscii::separator->Len();
|
||||
separator = new char[separator_len];
|
||||
|
@ -40,10 +43,10 @@ Ascii::Ascii(WriterFrontend* frontend) : WriterBackend(frontend)
|
|||
memcpy(unset_field, BifConst::LogAscii::unset_field->Bytes(),
|
||||
unset_field_len);
|
||||
|
||||
header_prefix_len = BifConst::LogAscii::header_prefix->Len();
|
||||
header_prefix = new char[header_prefix_len];
|
||||
memcpy(header_prefix, BifConst::LogAscii::header_prefix->Bytes(),
|
||||
header_prefix_len);
|
||||
meta_prefix_len = BifConst::LogAscii::meta_prefix->Len();
|
||||
meta_prefix = new char[meta_prefix_len];
|
||||
memcpy(meta_prefix, BifConst::LogAscii::meta_prefix->Bytes(),
|
||||
meta_prefix_len);
|
||||
|
||||
desc.EnableEscaping();
|
||||
desc.AddEscapeSequence(separator, separator_len);
|
||||
|
@ -51,26 +54,46 @@ Ascii::Ascii(WriterFrontend* frontend) : WriterBackend(frontend)
|
|||
|
||||
Ascii::~Ascii()
|
||||
{
|
||||
if ( file )
|
||||
fclose(file);
|
||||
if ( ! ascii_done )
|
||||
{
|
||||
fprintf(stderr, "internal error: finish missing\n");
|
||||
abort();
|
||||
}
|
||||
|
||||
delete [] separator;
|
||||
delete [] set_separator;
|
||||
delete [] empty_field;
|
||||
delete [] unset_field;
|
||||
delete [] header_prefix;
|
||||
delete [] meta_prefix;
|
||||
}
|
||||
|
||||
bool Ascii::WriteHeaderField(const string& key, const string& val)
|
||||
{
|
||||
string str = string(header_prefix, header_prefix_len) +
|
||||
string str = string(meta_prefix, meta_prefix_len) +
|
||||
key + string(separator, separator_len) + val + "\n";
|
||||
|
||||
return (fwrite(str.c_str(), str.length(), 1, file) == 1);
|
||||
return safe_write(fd, str.c_str(), str.length());
|
||||
}
|
||||
|
||||
void Ascii::CloseFile(double t)
|
||||
{
|
||||
if ( ! fd )
|
||||
return;
|
||||
|
||||
if ( include_meta )
|
||||
{
|
||||
string ts = t ? Timestamp(t) : string("<abnormal termination>");
|
||||
WriteHeaderField("end", ts);
|
||||
}
|
||||
|
||||
close(fd);
|
||||
fd = 0;
|
||||
}
|
||||
|
||||
bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const * fields)
|
||||
{
|
||||
assert(! fd);
|
||||
|
||||
string path = info.path;
|
||||
|
||||
if ( output_to_stdout )
|
||||
|
@ -78,34 +101,39 @@ bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const *
|
|||
|
||||
fname = IsSpecial(path) ? path : path + "." + LogExt();
|
||||
|
||||
if ( ! (file = fopen(fname.c_str(), "w")) )
|
||||
fd = open(fname.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666);
|
||||
|
||||
if ( fd < 0 )
|
||||
{
|
||||
Error(Fmt("cannot open %s: %s", fname.c_str(),
|
||||
strerror(errno)));
|
||||
|
||||
Strerror(errno)));
|
||||
fd = 0;
|
||||
return false;
|
||||
}
|
||||
|
||||
if ( include_header )
|
||||
if ( include_meta )
|
||||
{
|
||||
string names;
|
||||
string types;
|
||||
|
||||
string str = string(header_prefix, header_prefix_len)
|
||||
string str = string(meta_prefix, meta_prefix_len)
|
||||
+ "separator " // Always use space as separator here.
|
||||
+ get_escaped_string(string(separator, separator_len), false)
|
||||
+ "\n";
|
||||
|
||||
if( fwrite(str.c_str(), str.length(), 1, file) != 1 )
|
||||
if ( ! safe_write(fd, str.c_str(), str.length()) )
|
||||
goto write_error;
|
||||
|
||||
string ts = Timestamp(info.network_time);
|
||||
|
||||
if ( ! (WriteHeaderField("set_separator", get_escaped_string(
|
||||
string(set_separator, set_separator_len), false)) &&
|
||||
WriteHeaderField("empty_field", get_escaped_string(
|
||||
string(empty_field, empty_field_len), false)) &&
|
||||
WriteHeaderField("unset_field", get_escaped_string(
|
||||
string(unset_field, unset_field_len), false)) &&
|
||||
WriteHeaderField("path", get_escaped_string(path, false))) )
|
||||
WriteHeaderField("path", get_escaped_string(path, false)) &&
|
||||
WriteHeaderField("start", ts)) )
|
||||
goto write_error;
|
||||
|
||||
for ( int i = 0; i < num_fields; ++i )
|
||||
|
@ -116,8 +144,8 @@ bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const *
|
|||
types += string(separator, separator_len);
|
||||
}
|
||||
|
||||
names += fields[i]->name;
|
||||
types += fields[i]->TypeName();
|
||||
names += string(fields[i]->name);
|
||||
types += fields[i]->TypeName().c_str();
|
||||
}
|
||||
|
||||
if ( ! (WriteHeaderField("fields", names)
|
||||
|
@ -128,21 +156,32 @@ bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const *
|
|||
return true;
|
||||
|
||||
write_error:
|
||||
Error(Fmt("error writing to %s: %s", fname.c_str(), strerror(errno)));
|
||||
Error(Fmt("error writing to %s: %s", fname.c_str(), Strerror(errno)));
|
||||
return false;
|
||||
}
|
||||
|
||||
bool Ascii::DoFlush()
|
||||
bool Ascii::DoFlush(double network_time)
|
||||
{
|
||||
fflush(file);
|
||||
fsync(fd);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Ascii::DoFinish()
|
||||
bool Ascii::DoFinish(double network_time)
|
||||
{
|
||||
return WriterBackend::DoFinish();
|
||||
if ( ascii_done )
|
||||
{
|
||||
fprintf(stderr, "internal error: duplicate finish\n");
|
||||
abort();
|
||||
}
|
||||
|
||||
ascii_done = true;
|
||||
|
||||
CloseFile(network_time);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool Ascii::DoWriteOne(ODesc* desc, Value* val, const Field* field)
|
||||
{
|
||||
if ( ! val->present )
|
||||
|
@ -198,8 +237,8 @@ bool Ascii::DoWriteOne(ODesc* desc, Value* val, const Field* field)
|
|||
case TYPE_FILE:
|
||||
case TYPE_FUNC:
|
||||
{
|
||||
int size = val->val.string_val->size();
|
||||
const char* data = val->val.string_val->data();
|
||||
int size = val->val.string_val.length;
|
||||
const char* data = val->val.string_val.data;
|
||||
|
||||
if ( ! size )
|
||||
{
|
||||
|
@ -280,8 +319,7 @@ bool Ascii::DoWriteOne(ODesc* desc, Value* val, const Field* field)
|
|||
}
|
||||
|
||||
default:
|
||||
Error(Fmt("unsupported field format %d for %s", val->type,
|
||||
field->name.c_str()));
|
||||
Error(Fmt("unsupported field format %d for %s", val->type, field->name));
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -291,7 +329,7 @@ bool Ascii::DoWriteOne(ODesc* desc, Value* val, const Field* field)
|
|||
bool Ascii::DoWrite(int num_fields, const Field* const * fields,
|
||||
Value** vals)
|
||||
{
|
||||
if ( ! file )
|
||||
if ( ! fd )
|
||||
DoInit(Info(), NumFields(), Fields());
|
||||
|
||||
desc.Clear();
|
||||
|
@ -307,31 +345,47 @@ bool Ascii::DoWrite(int num_fields, const Field* const * fields,
|
|||
|
||||
desc.AddRaw("\n", 1);
|
||||
|
||||
if ( fwrite(desc.Bytes(), desc.Len(), 1, file) != 1 )
|
||||
const char* bytes = (const char*)desc.Bytes();
|
||||
int len = desc.Len();
|
||||
|
||||
if ( strncmp(bytes, meta_prefix, meta_prefix_len) == 0 )
|
||||
{
|
||||
Error(Fmt("error writing to %s: %s", fname.c_str(), strerror(errno)));
|
||||
return false;
|
||||
// It would so escape the first character.
|
||||
char buf[16];
|
||||
snprintf(buf, sizeof(buf), "\\x%02x", bytes[0]);
|
||||
|
||||
if ( ! safe_write(fd, buf, strlen(buf)) )
|
||||
goto write_error;
|
||||
|
||||
++bytes;
|
||||
--len;
|
||||
}
|
||||
|
||||
if ( IsBuf() )
|
||||
fflush(file);
|
||||
if ( ! safe_write(fd, bytes, len) )
|
||||
goto write_error;
|
||||
|
||||
if ( IsBuf() )
|
||||
fsync(fd);
|
||||
|
||||
return true;
|
||||
|
||||
write_error:
|
||||
Error(Fmt("error writing to %s: %s", fname.c_str(), Strerror(errno)));
|
||||
return false;
|
||||
}
|
||||
|
||||
bool Ascii::DoRotate(string rotated_path, double open, double close, bool terminating)
|
||||
bool Ascii::DoRotate(const char* rotated_path, double open, double close, bool terminating)
|
||||
{
|
||||
// Don't rotate special files or if there's not one currently open.
|
||||
if ( ! file || IsSpecial(Info().path) )
|
||||
if ( ! fd || IsSpecial(Info().path) )
|
||||
return true;
|
||||
|
||||
fclose(file);
|
||||
file = 0;
|
||||
CloseFile(close);
|
||||
|
||||
string nname = rotated_path + "." + LogExt();
|
||||
string nname = string(rotated_path) + "." + LogExt();
|
||||
rename(fname.c_str(), nname.c_str());
|
||||
|
||||
if ( ! FinishedRotation(nname, fname, open, close, terminating) )
|
||||
if ( ! FinishedRotation(nname.c_str(), fname.c_str(), open, close, terminating) )
|
||||
{
|
||||
Error(Fmt("error rotating %s to %s", fname.c_str(), nname.c_str()));
|
||||
return false;
|
||||
|
@ -346,9 +400,33 @@ bool Ascii::DoSetBuf(bool enabled)
|
|||
return true;
|
||||
}
|
||||
|
||||
bool Ascii::DoHeartbeat(double network_time, double current_time)
|
||||
{
|
||||
// Nothing to do.
|
||||
return true;
|
||||
}
|
||||
|
||||
string Ascii::LogExt()
|
||||
{
|
||||
const char* ext = getenv("BRO_LOG_SUFFIX");
|
||||
if ( ! ext ) ext = "log";
|
||||
if ( ! ext )
|
||||
ext = "log";
|
||||
|
||||
return ext;
|
||||
}
|
||||
|
||||
string Ascii::Timestamp(double t)
|
||||
{
|
||||
time_t teatime = time_t(t);
|
||||
|
||||
struct tm tmbuf;
|
||||
struct tm* tm = localtime_r(&teatime, &tmbuf);
|
||||
|
||||
char tmp[128];
|
||||
const char* const date_fmt = "%Y-%m-%d-%H-%M-%S";
|
||||
strftime(tmp, sizeof(tmp), date_fmt, tm);
|
||||
|
||||
return tmp;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -24,23 +24,27 @@ protected:
|
|||
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
|
||||
threading::Value** vals);
|
||||
virtual bool DoSetBuf(bool enabled);
|
||||
virtual bool DoRotate(string rotated_path, double open,
|
||||
virtual bool DoRotate(const char* rotated_path, double open,
|
||||
double close, bool terminating);
|
||||
virtual bool DoFlush();
|
||||
virtual bool DoFinish();
|
||||
virtual bool DoFlush(double network_time);
|
||||
virtual bool DoFinish(double network_time);
|
||||
virtual bool DoHeartbeat(double network_time, double current_time);
|
||||
|
||||
private:
|
||||
bool IsSpecial(string path) { return path.find("/dev/") == 0; }
|
||||
bool DoWriteOne(ODesc* desc, threading::Value* val, const threading::Field* field);
|
||||
bool WriteHeaderField(const string& key, const string& value);
|
||||
void CloseFile(double t);
|
||||
string Timestamp(double t);
|
||||
|
||||
FILE* file;
|
||||
int fd;
|
||||
string fname;
|
||||
ODesc desc;
|
||||
bool ascii_done;
|
||||
|
||||
// Options set from the script-level.
|
||||
bool output_to_stdout;
|
||||
bool include_header;
|
||||
bool include_meta;
|
||||
|
||||
char* separator;
|
||||
int separator_len;
|
||||
|
@ -54,8 +58,8 @@ private:
|
|||
char* unset_field;
|
||||
int unset_field_len;
|
||||
|
||||
char* header_prefix;
|
||||
int header_prefix_len;
|
||||
char* meta_prefix;
|
||||
int meta_prefix_len;
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -78,10 +78,10 @@ std::string DataSeries::LogValueToString(threading::Value *val)
|
|||
case TYPE_STRING:
|
||||
case TYPE_FILE:
|
||||
case TYPE_FUNC:
|
||||
if ( ! val->val.string_val->size() )
|
||||
if ( ! val->val.string_val.length )
|
||||
return "";
|
||||
|
||||
return string(val->val.string_val->data(), val->val.string_val->size());
|
||||
return string(val->val.string_val.data, val->val.string_val.length);
|
||||
|
||||
case TYPE_TABLE:
|
||||
{
|
||||
|
@ -302,7 +302,8 @@ bool DataSeries::DoInit(const WriterInfo& info, int num_fields, const threading:
|
|||
|
||||
if( ds_dump_schema )
|
||||
{
|
||||
FILE* pFile = fopen ( string(info.path + ".ds.xml").c_str() , "wb" );
|
||||
string name = string(info.path) + ".ds.xml";
|
||||
FILE* pFile = fopen(name.c_str(), "wb" );
|
||||
|
||||
if( pFile )
|
||||
{
|
||||
|
@ -311,7 +312,7 @@ bool DataSeries::DoInit(const WriterInfo& info, int num_fields, const threading:
|
|||
}
|
||||
|
||||
else
|
||||
Error(Fmt("cannot dump schema: %s", strerror(errno)));
|
||||
Error(Fmt("cannot dump schema: %s", Strerror(errno)));
|
||||
}
|
||||
|
||||
compress_type = Extent::compress_all;
|
||||
|
@ -343,7 +344,7 @@ bool DataSeries::DoInit(const WriterInfo& info, int num_fields, const threading:
|
|||
return OpenLog(info.path);
|
||||
}
|
||||
|
||||
bool DataSeries::DoFlush()
|
||||
bool DataSeries::DoFlush(double network_time)
|
||||
{
|
||||
// Flushing is handled by DataSeries automatically, so this function
|
||||
// doesn't do anything.
|
||||
|
@ -366,11 +367,10 @@ void DataSeries::CloseLog()
|
|||
log_file = 0;
|
||||
}
|
||||
|
||||
bool DataSeries::DoFinish()
|
||||
bool DataSeries::DoFinish(double network_time)
|
||||
{
|
||||
CloseLog();
|
||||
|
||||
return WriterBackend::DoFinish();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool DataSeries::DoWrite(int num_fields, const threading::Field* const * fields,
|
||||
|
@ -395,17 +395,17 @@ bool DataSeries::DoWrite(int num_fields, const threading::Field* const * fields,
|
|||
return true;
|
||||
}
|
||||
|
||||
bool DataSeries::DoRotate(string rotated_path, double open, double close, bool terminating)
|
||||
bool DataSeries::DoRotate(const char* rotated_path, double open, double close, bool terminating)
|
||||
{
|
||||
// Note that if DS files are rotated too often, the aggregate log
|
||||
// size will be (much) larger.
|
||||
CloseLog();
|
||||
|
||||
string dsname = Info().path + ".ds";
|
||||
string nname = rotated_path + ".ds";
|
||||
string dsname = string(Info().path) + ".ds";
|
||||
string nname = string(rotated_path) + ".ds";
|
||||
rename(dsname.c_str(), nname.c_str());
|
||||
|
||||
if ( ! FinishedRotation(nname, dsname, open, close, terminating) )
|
||||
if ( ! FinishedRotation(nname.c_str(), dsname.c_str(), open, close, terminating) )
|
||||
{
|
||||
Error(Fmt("error rotating %s to %s", dsname.c_str(), nname.c_str()));
|
||||
return false;
|
||||
|
@ -420,4 +420,9 @@ bool DataSeries::DoSetBuf(bool enabled)
|
|||
return true;
|
||||
}
|
||||
|
||||
bool DataSeries::DoHeartbeat(double network_time, double current_time)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
#endif /* USE_DATASERIES */
|
||||
|
|
|
@ -32,10 +32,11 @@ protected:
|
|||
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
|
||||
threading::Value** vals);
|
||||
virtual bool DoSetBuf(bool enabled);
|
||||
virtual bool DoRotate(string rotated_path, double open,
|
||||
virtual bool DoRotate(const char* rotated_path, double open,
|
||||
double close, bool terminating);
|
||||
virtual bool DoFlush();
|
||||
virtual bool DoFinish();
|
||||
virtual bool DoFlush(double network_time);
|
||||
virtual bool DoFinish(double network_time);
|
||||
virtual bool DoHeartbeat(double network_time, double current_time);
|
||||
|
||||
private:
|
||||
static const size_t ROW_MIN = 2048; // Minimum extent size.
|
||||
|
|
416
src/logging/writers/ElasticSearch.cc
Normal file
416
src/logging/writers/ElasticSearch.cc
Normal file
|
@ -0,0 +1,416 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
//
|
||||
// This is experimental code that is not yet ready for production usage.
|
||||
//
|
||||
|
||||
|
||||
#include "config.h"
|
||||
|
||||
#ifdef USE_ELASTICSEARCH
|
||||
|
||||
#include "util.h" // Needs to come first for stdint.h
|
||||
|
||||
#include <string>
|
||||
#include <errno.h>
|
||||
|
||||
#include "BroString.h"
|
||||
#include "NetVar.h"
|
||||
#include "threading/SerialTypes.h"
|
||||
|
||||
#include <curl/curl.h>
|
||||
#include <curl/easy.h>
|
||||
|
||||
#include "ElasticSearch.h"
|
||||
|
||||
using namespace logging;
|
||||
using namespace writer;
|
||||
using threading::Value;
|
||||
using threading::Field;
|
||||
|
||||
ElasticSearch::ElasticSearch(WriterFrontend* frontend) : WriterBackend(frontend)
|
||||
{
|
||||
cluster_name_len = BifConst::LogElasticSearch::cluster_name->Len();
|
||||
cluster_name = new char[cluster_name_len + 1];
|
||||
memcpy(cluster_name, BifConst::LogElasticSearch::cluster_name->Bytes(), cluster_name_len);
|
||||
cluster_name[cluster_name_len] = 0;
|
||||
|
||||
index_prefix = string((const char*) BifConst::LogElasticSearch::index_prefix->Bytes(), BifConst::LogElasticSearch::index_prefix->Len());
|
||||
|
||||
es_server = string(Fmt("http://%s:%d", BifConst::LogElasticSearch::server_host->Bytes(),
|
||||
(int) BifConst::LogElasticSearch::server_port));
|
||||
bulk_url = string(Fmt("%s/_bulk", es_server.c_str()));
|
||||
|
||||
http_headers = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8");
|
||||
buffer.Clear();
|
||||
counter = 0;
|
||||
current_index = string();
|
||||
prev_index = string();
|
||||
last_send = current_time();
|
||||
failing = false;
|
||||
|
||||
transfer_timeout = BifConst::LogElasticSearch::transfer_timeout * 1000;
|
||||
|
||||
curl_handle = HTTPSetup();
|
||||
}
|
||||
|
||||
ElasticSearch::~ElasticSearch()
|
||||
{
|
||||
delete [] cluster_name;
|
||||
}
|
||||
|
||||
bool ElasticSearch::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const* fields)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ElasticSearch::DoFlush(double network_time)
|
||||
{
|
||||
BatchIndex();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ElasticSearch::DoFinish(double network_time)
|
||||
{
|
||||
BatchIndex();
|
||||
curl_slist_free_all(http_headers);
|
||||
curl_easy_cleanup(curl_handle);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ElasticSearch::BatchIndex()
|
||||
{
|
||||
curl_easy_reset(curl_handle);
|
||||
curl_easy_setopt(curl_handle, CURLOPT_URL, bulk_url.c_str());
|
||||
curl_easy_setopt(curl_handle, CURLOPT_POST, 1);
|
||||
curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t)buffer.Len());
|
||||
curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, buffer.Bytes());
|
||||
failing = ! HTTPSend(curl_handle);
|
||||
|
||||
// We are currently throwing the data out regardless of if the send failed. Fire and forget!
|
||||
buffer.Clear();
|
||||
counter = 0;
|
||||
last_send = current_time();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ElasticSearch::AddValueToBuffer(ODesc* b, Value* val)
|
||||
{
|
||||
switch ( val->type )
|
||||
{
|
||||
// ES treats 0 as false and any other value as true so bool types go here.
|
||||
case TYPE_BOOL:
|
||||
case TYPE_INT:
|
||||
b->Add(val->val.int_val);
|
||||
break;
|
||||
|
||||
case TYPE_COUNT:
|
||||
case TYPE_COUNTER:
|
||||
{
|
||||
// ElasticSearch doesn't seem to support unsigned 64bit ints.
|
||||
if ( val->val.uint_val >= INT64_MAX )
|
||||
{
|
||||
Error(Fmt("count value too large: %" PRIu64, val->val.uint_val));
|
||||
b->AddRaw("null", 4);
|
||||
}
|
||||
else
|
||||
b->Add(val->val.uint_val);
|
||||
break;
|
||||
}
|
||||
|
||||
case TYPE_PORT:
|
||||
b->Add(val->val.port_val.port);
|
||||
break;
|
||||
|
||||
case TYPE_SUBNET:
|
||||
b->AddRaw("\"", 1);
|
||||
b->Add(Render(val->val.subnet_val));
|
||||
b->AddRaw("\"", 1);
|
||||
break;
|
||||
|
||||
case TYPE_ADDR:
|
||||
b->AddRaw("\"", 1);
|
||||
b->Add(Render(val->val.addr_val));
|
||||
b->AddRaw("\"", 1);
|
||||
break;
|
||||
|
||||
case TYPE_DOUBLE:
|
||||
case TYPE_INTERVAL:
|
||||
b->Add(val->val.double_val);
|
||||
break;
|
||||
|
||||
case TYPE_TIME:
|
||||
{
|
||||
// ElasticSearch uses milliseconds for timestamps and json only
|
||||
// supports signed ints (uints can be too large).
|
||||
uint64_t ts = (uint64_t) (val->val.double_val * 1000);
|
||||
if ( ts >= INT64_MAX )
|
||||
{
|
||||
Error(Fmt("time value too large: %" PRIu64, ts));
|
||||
b->AddRaw("null", 4);
|
||||
}
|
||||
else
|
||||
b->Add(ts);
|
||||
break;
|
||||
}
|
||||
|
||||
case TYPE_ENUM:
|
||||
case TYPE_STRING:
|
||||
case TYPE_FILE:
|
||||
case TYPE_FUNC:
|
||||
{
|
||||
b->AddRaw("\"", 1);
|
||||
for ( int i = 0; i < val->val.string_val.length; ++i )
|
||||
{
|
||||
char c = val->val.string_val.data[i];
|
||||
// 2byte Unicode escape special characters.
|
||||
if ( c < 32 || c > 126 || c == '\n' || c == '"' || c == '\'' || c == '\\' || c == '&' )
|
||||
{
|
||||
static const char hex_chars[] = "0123456789abcdef";
|
||||
b->AddRaw("\\u00", 4);
|
||||
b->AddRaw(&hex_chars[(c & 0xf0) >> 4], 1);
|
||||
b->AddRaw(&hex_chars[c & 0x0f], 1);
|
||||
}
|
||||
else
|
||||
b->AddRaw(&c, 1);
|
||||
}
|
||||
b->AddRaw("\"", 1);
|
||||
break;
|
||||
}
|
||||
|
||||
case TYPE_TABLE:
|
||||
{
|
||||
b->AddRaw("[", 1);
|
||||
for ( int j = 0; j < val->val.set_val.size; j++ )
|
||||
{
|
||||
if ( j > 0 )
|
||||
b->AddRaw(",", 1);
|
||||
AddValueToBuffer(b, val->val.set_val.vals[j]);
|
||||
}
|
||||
b->AddRaw("]", 1);
|
||||
break;
|
||||
}
|
||||
|
||||
case TYPE_VECTOR:
|
||||
{
|
||||
b->AddRaw("[", 1);
|
||||
for ( int j = 0; j < val->val.vector_val.size; j++ )
|
||||
{
|
||||
if ( j > 0 )
|
||||
b->AddRaw(",", 1);
|
||||
AddValueToBuffer(b, val->val.vector_val.vals[j]);
|
||||
}
|
||||
b->AddRaw("]", 1);
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ElasticSearch::AddFieldToBuffer(ODesc *b, Value* val, const Field* field)
|
||||
{
|
||||
if ( ! val->present )
|
||||
return false;
|
||||
|
||||
b->AddRaw("\"", 1);
|
||||
b->Add(field->name);
|
||||
b->AddRaw("\":", 2);
|
||||
AddValueToBuffer(b, val);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ElasticSearch::DoWrite(int num_fields, const Field* const * fields,
|
||||
Value** vals)
|
||||
{
|
||||
if ( current_index.empty() )
|
||||
UpdateIndex(network_time, Info().rotation_interval, Info().rotation_base);
|
||||
|
||||
// Our action line looks like:
|
||||
buffer.AddRaw("{\"index\":{\"_index\":\"", 20);
|
||||
buffer.Add(current_index);
|
||||
buffer.AddRaw("\",\"_type\":\"", 11);
|
||||
buffer.Add(Info().path);
|
||||
buffer.AddRaw("\"}}\n", 4);
|
||||
|
||||
buffer.AddRaw("{", 1);
|
||||
for ( int i = 0; i < num_fields; i++ )
|
||||
{
|
||||
if ( i > 0 && buffer.Bytes()[buffer.Len()] != ',' && vals[i]->present )
|
||||
buffer.AddRaw(",", 1);
|
||||
AddFieldToBuffer(&buffer, vals[i], fields[i]);
|
||||
}
|
||||
buffer.AddRaw("}\n", 2);
|
||||
|
||||
counter++;
|
||||
if ( counter >= BifConst::LogElasticSearch::max_batch_size ||
|
||||
uint(buffer.Len()) >= BifConst::LogElasticSearch::max_byte_size )
|
||||
BatchIndex();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ElasticSearch::UpdateIndex(double now, double rinterval, double rbase)
|
||||
{
|
||||
if ( rinterval == 0 )
|
||||
{
|
||||
// if logs aren't being rotated, don't use a rotation oriented index name.
|
||||
current_index = index_prefix;
|
||||
}
|
||||
else
|
||||
{
|
||||
double nr = calc_next_rotate(now, rinterval, rbase);
|
||||
double interval_beginning = now - (rinterval - nr);
|
||||
|
||||
struct tm tm;
|
||||
char buf[128];
|
||||
time_t teatime = (time_t)interval_beginning;
|
||||
localtime_r(&teatime, &tm);
|
||||
strftime(buf, sizeof(buf), "%Y%m%d%H%M", &tm);
|
||||
|
||||
prev_index = current_index;
|
||||
current_index = index_prefix + "-" + buf;
|
||||
|
||||
// Send some metadata about this index.
|
||||
buffer.AddRaw("{\"index\":{\"_index\":\"@", 21);
|
||||
buffer.Add(index_prefix);
|
||||
buffer.AddRaw("-meta\",\"_type\":\"index\",\"_id\":\"", 30);
|
||||
buffer.Add(current_index);
|
||||
buffer.AddRaw("-", 1);
|
||||
buffer.Add(Info().rotation_base);
|
||||
buffer.AddRaw("-", 1);
|
||||
buffer.Add(Info().rotation_interval);
|
||||
buffer.AddRaw("\"}}\n{\"name\":\"", 13);
|
||||
buffer.Add(current_index);
|
||||
buffer.AddRaw("\",\"start\":", 10);
|
||||
buffer.Add(interval_beginning);
|
||||
buffer.AddRaw(",\"end\":", 7);
|
||||
buffer.Add(interval_beginning+rinterval);
|
||||
buffer.AddRaw("}\n", 2);
|
||||
}
|
||||
|
||||
//printf("%s - prev:%s current:%s\n", Info().path.c_str(), prev_index.c_str(), current_index.c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool ElasticSearch::DoRotate(const char* rotated_path, double open, double close, bool terminating)
|
||||
{
|
||||
// Update the currently used index to the new rotation interval.
|
||||
UpdateIndex(close, Info().rotation_interval, Info().rotation_base);
|
||||
|
||||
// Only do this stuff if there was a previous index.
|
||||
if ( ! prev_index.empty() )
|
||||
{
|
||||
// FIXME: I think this section is taking too long and causing the thread to die.
|
||||
|
||||
// Compress the previous index
|
||||
//curl_easy_reset(curl_handle);
|
||||
//curl_easy_setopt(curl_handle, CURLOPT_URL, Fmt("%s/%s/_settings", es_server.c_str(), prev_index.c_str()));
|
||||
//curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, "PUT");
|
||||
//curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, "{\"index\":{\"store.compress.stored\":\"true\"}}");
|
||||
//curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) 42);
|
||||
//HTTPSend(curl_handle);
|
||||
|
||||
// Optimize the previous index.
|
||||
// TODO: make this into variables.
|
||||
//curl_easy_reset(curl_handle);
|
||||
//curl_easy_setopt(curl_handle, CURLOPT_URL, Fmt("%s/%s/_optimize?max_num_segments=1&wait_for_merge=false", es_server.c_str(), prev_index.c_str()));
|
||||
//HTTPSend(curl_handle);
|
||||
}
|
||||
|
||||
if ( ! FinishedRotation(current_index.c_str(), prev_index.c_str(), open, close, terminating) )
|
||||
{
|
||||
Error(Fmt("error rotating %s to %s", prev_index.c_str(), current_index.c_str()));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ElasticSearch::DoSetBuf(bool enabled)
|
||||
{
|
||||
// Nothing to do.
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ElasticSearch::DoHeartbeat(double network_time, double current_time)
|
||||
{
|
||||
if ( last_send > 0 && buffer.Len() > 0 &&
|
||||
current_time-last_send > BifConst::LogElasticSearch::max_batch_interval )
|
||||
{
|
||||
BatchIndex();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
CURL* ElasticSearch::HTTPSetup()
|
||||
{
|
||||
CURL* handle = curl_easy_init();
|
||||
if ( ! handle )
|
||||
{
|
||||
Error("cURL did not initialize correctly.");
|
||||
return 0;
|
||||
}
|
||||
|
||||
return handle;
|
||||
}
|
||||
|
||||
bool ElasticSearch::HTTPReceive(void* ptr, int size, int nmemb, void* userdata)
|
||||
{
|
||||
//TODO: Do some verification on the result?
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ElasticSearch::HTTPSend(CURL *handle)
|
||||
{
|
||||
curl_easy_setopt(handle, CURLOPT_HTTPHEADER, http_headers);
|
||||
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, &logging::writer::ElasticSearch::HTTPReceive); // This gets called with the result.
|
||||
// HTTP 1.1 likes to use chunked encoded transfers, which aren't good for speed.
|
||||
// The best (only?) way to disable that is to just use HTTP 1.0
|
||||
curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0);
|
||||
|
||||
//curl_easy_setopt(handle, CURLOPT_TIMEOUT_MS, transfer_timeout);
|
||||
|
||||
CURLcode return_code = curl_easy_perform(handle);
|
||||
|
||||
switch ( return_code )
|
||||
{
|
||||
case CURLE_COULDNT_CONNECT:
|
||||
case CURLE_COULDNT_RESOLVE_HOST:
|
||||
case CURLE_WRITE_ERROR:
|
||||
case CURLE_RECV_ERROR:
|
||||
{
|
||||
if ( ! failing )
|
||||
Error(Fmt("ElasticSearch server may not be accessible."));
|
||||
}
|
||||
|
||||
case CURLE_OPERATION_TIMEDOUT:
|
||||
{
|
||||
if ( ! failing )
|
||||
Warning(Fmt("HTTP operation with elasticsearch server timed out at %" PRIu64 " msecs.", transfer_timeout));
|
||||
}
|
||||
|
||||
case CURLE_OK:
|
||||
{
|
||||
uint http_code = 0;
|
||||
curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_code);
|
||||
if ( http_code == 200 )
|
||||
// Hopefully everything goes through here.
|
||||
return true;
|
||||
else if ( ! failing )
|
||||
Error(Fmt("Received a non-successful status code back from ElasticSearch server, check the elasticsearch server log."));
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
}
|
||||
}
|
||||
// The "successful" return happens above
|
||||
return false;
|
||||
}
|
||||
|
||||
#endif
|
81
src/logging/writers/ElasticSearch.h
Normal file
81
src/logging/writers/ElasticSearch.h
Normal file
|
@ -0,0 +1,81 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
//
|
||||
// Log writer for writing to an ElasticSearch database
|
||||
//
|
||||
// This is experimental code that is not yet ready for production usage.
|
||||
//
|
||||
|
||||
#ifndef LOGGING_WRITER_ELASTICSEARCH_H
|
||||
#define LOGGING_WRITER_ELASTICSEARCH_H
|
||||
|
||||
#include <curl/curl.h>
|
||||
#include "../WriterBackend.h"
|
||||
|
||||
namespace logging { namespace writer {
|
||||
|
||||
class ElasticSearch : public WriterBackend {
|
||||
public:
|
||||
ElasticSearch(WriterFrontend* frontend);
|
||||
~ElasticSearch();
|
||||
|
||||
static WriterBackend* Instantiate(WriterFrontend* frontend)
|
||||
{ return new ElasticSearch(frontend); }
|
||||
static string LogExt();
|
||||
|
||||
protected:
|
||||
// Overidden from WriterBackend.
|
||||
|
||||
virtual bool DoInit(const WriterInfo& info, int num_fields,
|
||||
const threading::Field* const* fields);
|
||||
|
||||
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
|
||||
threading::Value** vals);
|
||||
virtual bool DoSetBuf(bool enabled);
|
||||
virtual bool DoRotate(const char* rotated_path, double open,
|
||||
double close, bool terminating);
|
||||
virtual bool DoFlush(double network_time);
|
||||
virtual bool DoFinish(double network_time);
|
||||
virtual bool DoHeartbeat(double network_time, double current_time);
|
||||
|
||||
private:
|
||||
bool AddFieldToBuffer(ODesc *b, threading::Value* val, const threading::Field* field);
|
||||
bool AddValueToBuffer(ODesc *b, threading::Value* val);
|
||||
bool BatchIndex();
|
||||
bool SendMappings();
|
||||
bool UpdateIndex(double now, double rinterval, double rbase);
|
||||
|
||||
CURL* HTTPSetup();
|
||||
bool HTTPReceive(void* ptr, int size, int nmemb, void* userdata);
|
||||
bool HTTPSend(CURL *handle);
|
||||
|
||||
// Buffers, etc.
|
||||
ODesc buffer;
|
||||
uint64 counter;
|
||||
double last_send;
|
||||
string current_index;
|
||||
string prev_index;
|
||||
|
||||
CURL* curl_handle;
|
||||
|
||||
// From scripts
|
||||
char* cluster_name;
|
||||
int cluster_name_len;
|
||||
|
||||
string es_server;
|
||||
string bulk_url;
|
||||
|
||||
struct curl_slist *http_headers;
|
||||
|
||||
string path;
|
||||
string index_prefix;
|
||||
uint64 transfer_timeout;
|
||||
bool failing;
|
||||
|
||||
uint64 batch_size;
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#endif
|
|
@ -1,4 +1,6 @@
|
|||
|
||||
#include <algorithm>
|
||||
|
||||
#include "None.h"
|
||||
#include "NetVar.h"
|
||||
|
||||
|
@ -15,8 +17,17 @@ bool None::DoInit(const WriterInfo& info, int num_fields,
|
|||
std::cout << " rotation_interval=" << info.rotation_interval << std::endl;
|
||||
std::cout << " rotation_base=" << info.rotation_base << std::endl;
|
||||
|
||||
for ( std::map<string,string>::const_iterator i = info.config.begin(); i != info.config.end(); i++ )
|
||||
std::cout << " config[" << i->first << "] = " << i->second << std::endl;
|
||||
// Output the config sorted by keys.
|
||||
|
||||
std::vector<std::pair<string, string> > keys;
|
||||
|
||||
for ( WriterInfo::config_map::const_iterator i = info.config.begin(); i != info.config.end(); i++ )
|
||||
keys.push_back(std::make_pair(i->first, i->second));
|
||||
|
||||
std::sort(keys.begin(), keys.end());
|
||||
|
||||
for ( std::vector<std::pair<string,string> >::const_iterator i = keys.begin(); i != keys.end(); i++ )
|
||||
std::cout << " config[" << (*i).first << "] = " << (*i).second << std::endl;
|
||||
|
||||
for ( int i = 0; i < num_fields; i++ )
|
||||
{
|
||||
|
@ -31,11 +42,11 @@ bool None::DoInit(const WriterInfo& info, int num_fields,
|
|||
return true;
|
||||
}
|
||||
|
||||
bool None::DoRotate(string rotated_path, double open, double close, bool terminating)
|
||||
bool None::DoRotate(const char* rotated_path, double open, double close, bool terminating)
|
||||
{
|
||||
if ( ! FinishedRotation(string("/dev/null"), Info().path, open, close, terminating))
|
||||
if ( ! FinishedRotation("/dev/null", Info().path, open, close, terminating))
|
||||
{
|
||||
Error(Fmt("error rotating %s", Info().path.c_str()));
|
||||
Error(Fmt("error rotating %s", Info().path));
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -24,10 +24,11 @@ protected:
|
|||
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
|
||||
threading::Value** vals) { return true; }
|
||||
virtual bool DoSetBuf(bool enabled) { return true; }
|
||||
virtual bool DoRotate(string rotated_path, double open,
|
||||
virtual bool DoRotate(const char* rotated_path, double open,
|
||||
double close, bool terminating);
|
||||
virtual bool DoFlush() { return true; }
|
||||
virtual bool DoFinish() { WriterBackend::DoFinish(); return true; }
|
||||
virtual bool DoFlush(double network_time) { return true; }
|
||||
virtual bool DoFinish(double network_time) { return true; }
|
||||
virtual bool DoHeartbeat(double network_time, double current_time) { return true; }
|
||||
};
|
||||
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue