mirror of
https://github.com/zeek/zeek.git
synced 2025-10-05 16:18:19 +00:00
Reworking log writer API to make it easier to pass additional
information to a writer's initialization method. However, for now the information provided is still the same.
This commit is contained in:
parent
0ca0119f2a
commit
b38d1e1ec2
14 changed files with 112 additions and 70 deletions
|
@ -2503,17 +2503,17 @@ bool RemoteSerializer::ProcessRemotePrint()
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RemoteSerializer::SendLogCreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, const threading::Field* const * fields)
|
bool RemoteSerializer::SendLogCreateWriter(EnumVal* id, EnumVal* writer, const logging::WriterBackend::WriterInfo& info, int num_fields, const threading::Field* const * fields)
|
||||||
{
|
{
|
||||||
loop_over_list(peers, i)
|
loop_over_list(peers, i)
|
||||||
{
|
{
|
||||||
SendLogCreateWriter(peers[i]->id, id, writer, path, num_fields, fields);
|
SendLogCreateWriter(peers[i]->id, id, writer, info, num_fields, fields);
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RemoteSerializer::SendLogCreateWriter(PeerID peer_id, EnumVal* id, EnumVal* writer, string path, int num_fields, const threading::Field* const * fields)
|
bool RemoteSerializer::SendLogCreateWriter(PeerID peer_id, EnumVal* id, EnumVal* writer, const logging::WriterBackend::WriterInfo& info, int num_fields, const threading::Field* const * fields)
|
||||||
{
|
{
|
||||||
SetErrorDescr("logging");
|
SetErrorDescr("logging");
|
||||||
|
|
||||||
|
@ -2535,8 +2535,8 @@ bool RemoteSerializer::SendLogCreateWriter(PeerID peer_id, EnumVal* id, EnumVal*
|
||||||
|
|
||||||
bool success = fmt.Write(id->AsEnum(), "id") &&
|
bool success = fmt.Write(id->AsEnum(), "id") &&
|
||||||
fmt.Write(writer->AsEnum(), "writer") &&
|
fmt.Write(writer->AsEnum(), "writer") &&
|
||||||
fmt.Write(path, "path") &&
|
fmt.Write(num_fields, "num_fields") &&
|
||||||
fmt.Write(num_fields, "num_fields");
|
info.Write(&fmt);
|
||||||
|
|
||||||
if ( ! success )
|
if ( ! success )
|
||||||
goto error;
|
goto error;
|
||||||
|
@ -2691,13 +2691,13 @@ bool RemoteSerializer::ProcessLogCreateWriter()
|
||||||
fmt.StartRead(current_args->data, current_args->len);
|
fmt.StartRead(current_args->data, current_args->len);
|
||||||
|
|
||||||
int id, writer;
|
int id, writer;
|
||||||
string path;
|
|
||||||
int num_fields;
|
int num_fields;
|
||||||
|
logging::WriterBackend::WriterInfo info;
|
||||||
|
|
||||||
bool success = fmt.Read(&id, "id") &&
|
bool success = fmt.Read(&id, "id") &&
|
||||||
fmt.Read(&writer, "writer") &&
|
fmt.Read(&writer, "writer") &&
|
||||||
fmt.Read(&path, "path") &&
|
fmt.Read(&num_fields, "num_fields") &&
|
||||||
fmt.Read(&num_fields, "num_fields");
|
info.Read(&fmt);
|
||||||
|
|
||||||
if ( ! success )
|
if ( ! success )
|
||||||
goto error;
|
goto error;
|
||||||
|
@ -2716,7 +2716,7 @@ bool RemoteSerializer::ProcessLogCreateWriter()
|
||||||
id_val = new EnumVal(id, BifType::Enum::Log::ID);
|
id_val = new EnumVal(id, BifType::Enum::Log::ID);
|
||||||
writer_val = new EnumVal(writer, BifType::Enum::Log::Writer);
|
writer_val = new EnumVal(writer, BifType::Enum::Log::Writer);
|
||||||
|
|
||||||
if ( ! log_mgr->CreateWriter(id_val, writer_val, path, num_fields, fields, true, false) )
|
if ( ! log_mgr->CreateWriter(id_val, writer_val, info, num_fields, fields, true, false) )
|
||||||
goto error;
|
goto error;
|
||||||
|
|
||||||
Unref(id_val);
|
Unref(id_val);
|
||||||
|
|
|
@ -9,6 +9,7 @@
|
||||||
#include "IOSource.h"
|
#include "IOSource.h"
|
||||||
#include "Stats.h"
|
#include "Stats.h"
|
||||||
#include "File.h"
|
#include "File.h"
|
||||||
|
#include "logging/WriterBackend.h"
|
||||||
|
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
@ -104,10 +105,10 @@ public:
|
||||||
bool SendPrintHookEvent(BroFile* f, const char* txt, size_t len);
|
bool SendPrintHookEvent(BroFile* f, const char* txt, size_t len);
|
||||||
|
|
||||||
// Send a request to create a writer on a remote side.
|
// Send a request to create a writer on a remote side.
|
||||||
bool SendLogCreateWriter(PeerID peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const threading::Field* const * fields);
|
bool SendLogCreateWriter(PeerID peer, EnumVal* id, EnumVal* writer, const logging::WriterBackend::WriterInfo& info, int num_fields, const threading::Field* const * fields);
|
||||||
|
|
||||||
// Broadcasts a request to create a writer.
|
// Broadcasts a request to create a writer.
|
||||||
bool SendLogCreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, const threading::Field* const * fields);
|
bool SendLogCreateWriter(EnumVal* id, EnumVal* writer, const logging::WriterBackend::WriterInfo& info, int num_fields, const threading::Field* const * fields);
|
||||||
|
|
||||||
// Broadcast a log entry to everybody interested.
|
// Broadcast a log entry to everybody interested.
|
||||||
bool SendLogWrite(EnumVal* id, EnumVal* writer, string path, int num_fields, const threading::Value* const * vals);
|
bool SendLogWrite(EnumVal* id, EnumVal* writer, string path, int num_fields, const threading::Value* const * vals);
|
||||||
|
|
|
@ -74,6 +74,7 @@ struct Manager::WriterInfo {
|
||||||
double interval;
|
double interval;
|
||||||
Func* postprocessor;
|
Func* postprocessor;
|
||||||
WriterFrontend* writer;
|
WriterFrontend* writer;
|
||||||
|
WriterBackend::WriterInfo info;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct Manager::Stream {
|
struct Manager::Stream {
|
||||||
|
@ -764,8 +765,11 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
|
||||||
for ( int j = 0; j < filter->num_fields; ++j )
|
for ( int j = 0; j < filter->num_fields; ++j )
|
||||||
arg_fields[j] = new threading::Field(*filter->fields[j]);
|
arg_fields[j] = new threading::Field(*filter->fields[j]);
|
||||||
|
|
||||||
|
WriterBackend::WriterInfo info;
|
||||||
|
info.path = path;
|
||||||
|
|
||||||
writer = CreateWriter(stream->id, filter->writer,
|
writer = CreateWriter(stream->id, filter->writer,
|
||||||
path, filter->num_fields,
|
info, filter->num_fields,
|
||||||
arg_fields, filter->local, filter->remote);
|
arg_fields, filter->local, filter->remote);
|
||||||
|
|
||||||
if ( ! writer )
|
if ( ! writer )
|
||||||
|
@ -953,7 +957,7 @@ threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
|
||||||
return vals;
|
return vals;
|
||||||
}
|
}
|
||||||
|
|
||||||
WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
|
WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, const WriterBackend::WriterInfo& info,
|
||||||
int num_fields, const threading::Field* const* fields, bool local, bool remote)
|
int num_fields, const threading::Field* const* fields, bool local, bool remote)
|
||||||
{
|
{
|
||||||
Stream* stream = FindStream(id);
|
Stream* stream = FindStream(id);
|
||||||
|
@ -963,7 +967,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
Stream::WriterMap::iterator w =
|
Stream::WriterMap::iterator w =
|
||||||
stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), path));
|
stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), info.path));
|
||||||
|
|
||||||
if ( w != stream->writers.end() )
|
if ( w != stream->writers.end() )
|
||||||
// If we already have a writer for this. That's fine, we just
|
// If we already have a writer for this. That's fine, we just
|
||||||
|
@ -973,7 +977,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
|
||||||
WriterFrontend* writer_obj = new WriterFrontend(id, writer, local, remote);
|
WriterFrontend* writer_obj = new WriterFrontend(id, writer, local, remote);
|
||||||
assert(writer_obj);
|
assert(writer_obj);
|
||||||
|
|
||||||
writer_obj->Init(path, num_fields, fields);
|
writer_obj->Init(info, num_fields, fields);
|
||||||
|
|
||||||
WriterInfo* winfo = new WriterInfo;
|
WriterInfo* winfo = new WriterInfo;
|
||||||
winfo->type = writer->Ref()->AsEnumVal();
|
winfo->type = writer->Ref()->AsEnumVal();
|
||||||
|
@ -982,6 +986,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
|
||||||
winfo->rotation_timer = 0;
|
winfo->rotation_timer = 0;
|
||||||
winfo->interval = 0;
|
winfo->interval = 0;
|
||||||
winfo->postprocessor = 0;
|
winfo->postprocessor = 0;
|
||||||
|
winfo->info = info;
|
||||||
|
|
||||||
// Search for a corresponding filter for the writer/path pair and use its
|
// Search for a corresponding filter for the writer/path pair and use its
|
||||||
// rotation settings. If no matching filter is found, fall back on
|
// rotation settings. If no matching filter is found, fall back on
|
||||||
|
@ -993,7 +998,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
|
||||||
{
|
{
|
||||||
Filter* f = *it;
|
Filter* f = *it;
|
||||||
if ( f->writer->AsEnum() == writer->AsEnum() &&
|
if ( f->writer->AsEnum() == writer->AsEnum() &&
|
||||||
f->path == winfo->writer->Path() )
|
f->path == winfo->writer->info.path )
|
||||||
{
|
{
|
||||||
found_filter_match = true;
|
found_filter_match = true;
|
||||||
winfo->interval = f->interval;
|
winfo->interval = f->interval;
|
||||||
|
@ -1012,7 +1017,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
|
||||||
InstallRotationTimer(winfo);
|
InstallRotationTimer(winfo);
|
||||||
|
|
||||||
stream->writers.insert(
|
stream->writers.insert(
|
||||||
Stream::WriterMap::value_type(Stream::WriterPathPair(writer->AsEnum(), path),
|
Stream::WriterMap::value_type(Stream::WriterPathPair(writer->AsEnum(), info.path),
|
||||||
winfo));
|
winfo));
|
||||||
|
|
||||||
return writer_obj;
|
return writer_obj;
|
||||||
|
@ -1093,7 +1098,7 @@ void Manager::SendAllWritersTo(RemoteSerializer::PeerID peer)
|
||||||
EnumVal writer_val(i->first.first, BifType::Enum::Log::Writer);
|
EnumVal writer_val(i->first.first, BifType::Enum::Log::Writer);
|
||||||
remote_serializer->SendLogCreateWriter(peer, (*s)->id,
|
remote_serializer->SendLogCreateWriter(peer, (*s)->id,
|
||||||
&writer_val,
|
&writer_val,
|
||||||
i->first.second,
|
i->second->info,
|
||||||
writer->NumFields(),
|
writer->NumFields(),
|
||||||
writer->Fields());
|
writer->Fields());
|
||||||
}
|
}
|
||||||
|
@ -1246,7 +1251,7 @@ void Manager::Rotate(WriterInfo* winfo)
|
||||||
localtime_r(&teatime, &tm);
|
localtime_r(&teatime, &tm);
|
||||||
strftime(buf, sizeof(buf), date_fmt, &tm);
|
strftime(buf, sizeof(buf), date_fmt, &tm);
|
||||||
|
|
||||||
string tmp = string(fmt("%s-%s", winfo->writer->Path().c_str(), buf));
|
string tmp = string(fmt("%s-%s", winfo->writer->Info().path.c_str(), buf));
|
||||||
|
|
||||||
// Trigger the rotation.
|
// Trigger the rotation.
|
||||||
winfo->writer->Rotate(tmp, winfo->open_time, network_time, terminating);
|
winfo->writer->Rotate(tmp, winfo->open_time, network_time, terminating);
|
||||||
|
@ -1274,7 +1279,7 @@ bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string o
|
||||||
RecordVal* info = new RecordVal(BifType::Record::Log::RotationInfo);
|
RecordVal* info = new RecordVal(BifType::Record::Log::RotationInfo);
|
||||||
info->Assign(0, winfo->type->Ref());
|
info->Assign(0, winfo->type->Ref());
|
||||||
info->Assign(1, new StringVal(new_name.c_str()));
|
info->Assign(1, new StringVal(new_name.c_str()));
|
||||||
info->Assign(2, new StringVal(winfo->writer->Path().c_str()));
|
info->Assign(2, new StringVal(winfo->writer->Info().path.c_str()));
|
||||||
info->Assign(3, new Val(open, TYPE_TIME));
|
info->Assign(3, new Val(open, TYPE_TIME));
|
||||||
info->Assign(4, new Val(close, TYPE_TIME));
|
info->Assign(4, new Val(close, TYPE_TIME));
|
||||||
info->Assign(5, new Val(terminating, TYPE_BOOL));
|
info->Assign(5, new Val(terminating, TYPE_BOOL));
|
||||||
|
|
|
@ -9,13 +9,14 @@
|
||||||
#include "../EventHandler.h"
|
#include "../EventHandler.h"
|
||||||
#include "../RemoteSerializer.h"
|
#include "../RemoteSerializer.h"
|
||||||
|
|
||||||
|
#include "WriterBackend.h"
|
||||||
|
|
||||||
class SerializationFormat;
|
class SerializationFormat;
|
||||||
class RemoteSerializer;
|
class RemoteSerializer;
|
||||||
class RotationTimer;
|
class RotationTimer;
|
||||||
|
|
||||||
namespace logging {
|
namespace logging {
|
||||||
|
|
||||||
class WriterBackend;
|
|
||||||
class WriterFrontend;
|
class WriterFrontend;
|
||||||
class RotationFinishedMessage;
|
class RotationFinishedMessage;
|
||||||
|
|
||||||
|
@ -162,7 +163,7 @@ protected:
|
||||||
//// Function also used by the RemoteSerializer.
|
//// Function also used by the RemoteSerializer.
|
||||||
|
|
||||||
// Takes ownership of fields.
|
// Takes ownership of fields.
|
||||||
WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, string path,
|
WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, const WriterBackend::WriterInfo& info,
|
||||||
int num_fields, const threading::Field* const* fields,
|
int num_fields, const threading::Field* const* fields,
|
||||||
bool local, bool remote);
|
bool local, bool remote);
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
#include "bro_inet_ntop.h"
|
#include "bro_inet_ntop.h"
|
||||||
#include "threading/SerialTypes.h"
|
#include "threading/SerialTypes.h"
|
||||||
|
|
||||||
|
#include "Manager.h"
|
||||||
#include "WriterBackend.h"
|
#include "WriterBackend.h"
|
||||||
#include "WriterFrontend.h"
|
#include "WriterFrontend.h"
|
||||||
|
|
||||||
|
@ -60,14 +61,25 @@ public:
|
||||||
|
|
||||||
using namespace logging;
|
using namespace logging;
|
||||||
|
|
||||||
|
bool WriterBackend::WriterInfo::Read(SerializationFormat* fmt)
|
||||||
|
{
|
||||||
|
return fmt->Read(&path, "path");
|
||||||
|
}
|
||||||
|
|
||||||
|
bool WriterBackend::WriterInfo::Write(SerializationFormat* fmt) const
|
||||||
|
{
|
||||||
|
return fmt->Write(path, "path");
|
||||||
|
}
|
||||||
|
|
||||||
WriterBackend::WriterBackend(WriterFrontend* arg_frontend) : MsgThread()
|
WriterBackend::WriterBackend(WriterFrontend* arg_frontend) : MsgThread()
|
||||||
{
|
{
|
||||||
path = "<path not yet set>";
|
|
||||||
num_fields = 0;
|
num_fields = 0;
|
||||||
fields = 0;
|
fields = 0;
|
||||||
buffering = true;
|
buffering = true;
|
||||||
frontend = arg_frontend;
|
frontend = arg_frontend;
|
||||||
|
|
||||||
|
info.path = "<path not yet set>";
|
||||||
|
|
||||||
SetName(frontend->Name());
|
SetName(frontend->Name());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,17 +120,17 @@ void WriterBackend::DisableFrontend()
|
||||||
SendOut(new DisableMessage(frontend));
|
SendOut(new DisableMessage(frontend));
|
||||||
}
|
}
|
||||||
|
|
||||||
bool WriterBackend::Init(string arg_path, int arg_num_fields, const Field* const* arg_fields)
|
bool WriterBackend::Init(const WriterInfo& arg_info, int arg_num_fields, const Field* const* arg_fields)
|
||||||
{
|
{
|
||||||
path = arg_path;
|
info = arg_info;
|
||||||
num_fields = arg_num_fields;
|
num_fields = arg_num_fields;
|
||||||
fields = arg_fields;
|
fields = arg_fields;
|
||||||
|
|
||||||
string name = Fmt("%s/%s", path.c_str(), frontend->Name().c_str());
|
string name = Fmt("%s/%s", info.path.c_str(), frontend->Name().c_str());
|
||||||
|
|
||||||
SetName(name);
|
SetName(name);
|
||||||
|
|
||||||
if ( ! DoInit(arg_path, arg_num_fields, arg_fields) )
|
if ( ! DoInit(arg_info, arg_num_fields, arg_fields) )
|
||||||
{
|
{
|
||||||
DisableFrontend();
|
DisableFrontend();
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -5,12 +5,14 @@
|
||||||
#ifndef LOGGING_WRITERBACKEND_H
|
#ifndef LOGGING_WRITERBACKEND_H
|
||||||
#define LOGGING_WRITERBACKEND_H
|
#define LOGGING_WRITERBACKEND_H
|
||||||
|
|
||||||
#include "Manager.h"
|
|
||||||
|
|
||||||
#include "threading/MsgThread.h"
|
#include "threading/MsgThread.h"
|
||||||
|
|
||||||
|
class RemoteSerializer;
|
||||||
|
|
||||||
namespace logging {
|
namespace logging {
|
||||||
|
|
||||||
|
class WriterFrontend;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base class for writer implementation. When the logging::Manager creates a
|
* Base class for writer implementation. When the logging::Manager creates a
|
||||||
* new logging filter, it instantiates a WriterFrontend. That then in turn
|
* new logging filter, it instantiates a WriterFrontend. That then in turn
|
||||||
|
@ -41,21 +43,39 @@ public:
|
||||||
*/
|
*/
|
||||||
virtual ~WriterBackend();
|
virtual ~WriterBackend();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A struct passing information to the writer at initialization time.
|
||||||
|
*/
|
||||||
|
struct WriterInfo
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* A string left to the interpretation of the writer
|
||||||
|
* implementation; it corresponds to the value configured on
|
||||||
|
* the script-level for the logging filter.
|
||||||
|
*/
|
||||||
|
string path;
|
||||||
|
|
||||||
|
private:
|
||||||
|
friend class ::RemoteSerializer;
|
||||||
|
|
||||||
|
// Note, these need to be adapted when changing the struct's
|
||||||
|
// fields. They serialize/deserialize the struct.
|
||||||
|
bool Read(SerializationFormat* fmt);
|
||||||
|
bool Write(SerializationFormat* fmt) const;
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* One-time initialization of the writer to define the logged fields.
|
* One-time initialization of the writer to define the logged fields.
|
||||||
*
|
*
|
||||||
* @param path A string left to the interpretation of the writer
|
* @param info Meta information for the writer.
|
||||||
* implementation; it corresponds to the value configured on the
|
* @param num_fields
|
||||||
* script-level for the logging filter.
|
|
||||||
*
|
|
||||||
* @param num_fields The number of log fields for the stream.
|
|
||||||
*
|
*
|
||||||
* @param fields An array of size \a num_fields with the log fields.
|
* @param fields An array of size \a num_fields with the log fields.
|
||||||
* The methods takes ownership of the array.
|
* The methods takes ownership of the array.
|
||||||
*
|
*
|
||||||
* @return False if an error occured.
|
* @return False if an error occured.
|
||||||
*/
|
*/
|
||||||
bool Init(string path, int num_fields, const threading::Field* const* fields);
|
bool Init(const WriterInfo& info, int num_fields, const threading::Field* const* fields);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writes one log entry.
|
* Writes one log entry.
|
||||||
|
@ -108,9 +128,9 @@ public:
|
||||||
void DisableFrontend();
|
void DisableFrontend();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the log path as passed into the constructor.
|
* Returns the additional writer information into the constructor.
|
||||||
*/
|
*/
|
||||||
const string Path() const { return path; }
|
const WriterInfo& Info() const { return info; }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the number of log fields as passed into the constructor.
|
* Returns the number of log fields as passed into the constructor.
|
||||||
|
@ -185,7 +205,7 @@ protected:
|
||||||
* disabled and eventually deleted. When returning false, an
|
* disabled and eventually deleted. When returning false, an
|
||||||
* implementation should also call Error() to indicate what happened.
|
* implementation should also call Error() to indicate what happened.
|
||||||
*/
|
*/
|
||||||
virtual bool DoInit(string path, int num_fields,
|
virtual bool DoInit(const WriterInfo& info, int num_fields,
|
||||||
const threading::Field* const* fields) = 0;
|
const threading::Field* const* fields) = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -299,7 +319,7 @@ private:
|
||||||
// this class, it's running in a different thread!
|
// this class, it's running in a different thread!
|
||||||
WriterFrontend* frontend;
|
WriterFrontend* frontend;
|
||||||
|
|
||||||
string path; // Log path.
|
WriterInfo info; // Meta information as passed to Init().
|
||||||
int num_fields; // Number of log fields.
|
int num_fields; // Number of log fields.
|
||||||
const threading::Field* const* fields; // Log fields.
|
const threading::Field* const* fields; // Log fields.
|
||||||
bool buffering; // True if buffering is enabled.
|
bool buffering; // True if buffering is enabled.
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
#include "Net.h"
|
#include "Net.h"
|
||||||
#include "threading/SerialTypes.h"
|
#include "threading/SerialTypes.h"
|
||||||
|
|
||||||
|
#include "Manager.h"
|
||||||
#include "WriterFrontend.h"
|
#include "WriterFrontend.h"
|
||||||
#include "WriterBackend.h"
|
#include "WriterBackend.h"
|
||||||
|
|
||||||
|
@ -15,14 +16,14 @@ namespace logging {
|
||||||
class InitMessage : public threading::InputMessage<WriterBackend>
|
class InitMessage : public threading::InputMessage<WriterBackend>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
InitMessage(WriterBackend* backend, const string path, const int num_fields, const Field* const* fields)
|
InitMessage(WriterBackend* backend, const WriterBackend::WriterInfo& info, const int num_fields, const Field* const* fields)
|
||||||
: threading::InputMessage<WriterBackend>("Init", backend),
|
: threading::InputMessage<WriterBackend>("Init", backend),
|
||||||
path(path), num_fields(num_fields), fields(fields) { }
|
info(info), num_fields(num_fields), fields(fields) { }
|
||||||
|
|
||||||
virtual bool Process() { return Object()->Init(path, num_fields, fields); }
|
virtual bool Process() { return Object()->Init(info, num_fields, fields); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
const string path;
|
WriterBackend::WriterInfo info;
|
||||||
const int num_fields;
|
const int num_fields;
|
||||||
const Field * const* fields;
|
const Field * const* fields;
|
||||||
};
|
};
|
||||||
|
@ -134,10 +135,10 @@ WriterFrontend::~WriterFrontend()
|
||||||
|
|
||||||
string WriterFrontend::Name() const
|
string WriterFrontend::Name() const
|
||||||
{
|
{
|
||||||
if ( path.size() )
|
if ( info.path.size() )
|
||||||
return ty_name;
|
return ty_name;
|
||||||
|
|
||||||
return ty_name + "/" + path;
|
return ty_name + "/" + info.path;
|
||||||
}
|
}
|
||||||
|
|
||||||
void WriterFrontend::Stop()
|
void WriterFrontend::Stop()
|
||||||
|
@ -149,7 +150,7 @@ void WriterFrontend::Stop()
|
||||||
backend->Stop();
|
backend->Stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
void WriterFrontend::Init(string arg_path, int arg_num_fields, const Field* const * arg_fields)
|
void WriterFrontend::Init(const WriterBackend::WriterInfo& arg_info, int arg_num_fields, const Field* const * arg_fields)
|
||||||
{
|
{
|
||||||
if ( disabled )
|
if ( disabled )
|
||||||
return;
|
return;
|
||||||
|
@ -157,19 +158,19 @@ void WriterFrontend::Init(string arg_path, int arg_num_fields, const Field* cons
|
||||||
if ( initialized )
|
if ( initialized )
|
||||||
reporter->InternalError("writer initialize twice");
|
reporter->InternalError("writer initialize twice");
|
||||||
|
|
||||||
path = arg_path;
|
info = arg_info;
|
||||||
num_fields = arg_num_fields;
|
num_fields = arg_num_fields;
|
||||||
fields = arg_fields;
|
fields = arg_fields;
|
||||||
|
|
||||||
initialized = true;
|
initialized = true;
|
||||||
|
|
||||||
if ( backend )
|
if ( backend )
|
||||||
backend->SendIn(new InitMessage(backend, arg_path, arg_num_fields, arg_fields));
|
backend->SendIn(new InitMessage(backend, arg_info, arg_num_fields, arg_fields));
|
||||||
|
|
||||||
if ( remote )
|
if ( remote )
|
||||||
remote_serializer->SendLogCreateWriter(stream,
|
remote_serializer->SendLogCreateWriter(stream,
|
||||||
writer,
|
writer,
|
||||||
arg_path,
|
arg_info,
|
||||||
arg_num_fields,
|
arg_num_fields,
|
||||||
arg_fields);
|
arg_fields);
|
||||||
|
|
||||||
|
@ -183,7 +184,7 @@ void WriterFrontend::Write(int num_fields, Value** vals)
|
||||||
if ( remote )
|
if ( remote )
|
||||||
remote_serializer->SendLogWrite(stream,
|
remote_serializer->SendLogWrite(stream,
|
||||||
writer,
|
writer,
|
||||||
path,
|
info.path,
|
||||||
num_fields,
|
num_fields,
|
||||||
vals);
|
vals);
|
||||||
|
|
||||||
|
|
|
@ -3,13 +3,13 @@
|
||||||
#ifndef LOGGING_WRITERFRONTEND_H
|
#ifndef LOGGING_WRITERFRONTEND_H
|
||||||
#define LOGGING_WRITERFRONTEND_H
|
#define LOGGING_WRITERFRONTEND_H
|
||||||
|
|
||||||
#include "Manager.h"
|
#include "WriterBackend.h"
|
||||||
|
|
||||||
#include "threading/MsgThread.h"
|
#include "threading/MsgThread.h"
|
||||||
|
|
||||||
namespace logging {
|
namespace logging {
|
||||||
|
|
||||||
class WriterBackend;
|
class Manager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Bridge class between the logging::Manager and backend writer threads. The
|
* Bridge class between the logging::Manager and backend writer threads. The
|
||||||
|
@ -68,7 +68,7 @@ public:
|
||||||
*
|
*
|
||||||
* This method must only be called from the main thread.
|
* This method must only be called from the main thread.
|
||||||
*/
|
*/
|
||||||
void Init(string path, int num_fields, const threading::Field* const* fields);
|
void Init(const WriterBackend::WriterInfo& info, int num_fields, const threading::Field* const* fields);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write out a record.
|
* Write out a record.
|
||||||
|
@ -169,9 +169,9 @@ public:
|
||||||
bool Disabled() { return disabled; }
|
bool Disabled() { return disabled; }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the log path as passed into the constructor.
|
* Returns the additional writer information as passed into the constructor.
|
||||||
*/
|
*/
|
||||||
const string Path() const { return path; }
|
const WriterBackend::WriterInfo& Info() const { return info; }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the number of log fields as passed into the constructor.
|
* Returns the number of log fields as passed into the constructor.
|
||||||
|
@ -207,7 +207,7 @@ protected:
|
||||||
bool remote; // True if loggin remotely.
|
bool remote; // True if loggin remotely.
|
||||||
|
|
||||||
string ty_name; // Name of the backend type. Set by the manager.
|
string ty_name; // Name of the backend type. Set by the manager.
|
||||||
string path; // The log path.
|
WriterBackend::WriterInfo info; // The writer information.
|
||||||
int num_fields; // The number of log fields.
|
int num_fields; // The number of log fields.
|
||||||
const threading::Field* const* fields; // The log fields.
|
const threading::Field* const* fields; // The log fields.
|
||||||
|
|
||||||
|
|
|
@ -69,8 +69,10 @@ bool Ascii::WriteHeaderField(const string& key, const string& val)
|
||||||
return (fwrite(str.c_str(), str.length(), 1, file) == 1);
|
return (fwrite(str.c_str(), str.length(), 1, file) == 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Ascii::DoInit(string path, int num_fields, const Field* const * fields)
|
bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const * fields)
|
||||||
{
|
{
|
||||||
|
string path = info.path;
|
||||||
|
|
||||||
if ( output_to_stdout )
|
if ( output_to_stdout )
|
||||||
path = "/dev/stdout";
|
path = "/dev/stdout";
|
||||||
|
|
||||||
|
@ -290,7 +292,7 @@ bool Ascii::DoWrite(int num_fields, const Field* const * fields,
|
||||||
Value** vals)
|
Value** vals)
|
||||||
{
|
{
|
||||||
if ( ! file )
|
if ( ! file )
|
||||||
DoInit(Path(), NumFields(), Fields());
|
DoInit(Info(), NumFields(), Fields());
|
||||||
|
|
||||||
desc.Clear();
|
desc.Clear();
|
||||||
|
|
||||||
|
@ -320,7 +322,7 @@ bool Ascii::DoWrite(int num_fields, const Field* const * fields,
|
||||||
bool Ascii::DoRotate(string rotated_path, double open, double close, bool terminating)
|
bool Ascii::DoRotate(string rotated_path, double open, double close, bool terminating)
|
||||||
{
|
{
|
||||||
// Don't rotate special files or if there's not one currently open.
|
// Don't rotate special files or if there's not one currently open.
|
||||||
if ( ! file || IsSpecial(Path()) )
|
if ( ! file || IsSpecial(Info().path) )
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
fclose(file);
|
fclose(file);
|
||||||
|
|
|
@ -19,7 +19,7 @@ public:
|
||||||
static string LogExt();
|
static string LogExt();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual bool DoInit(string path, int num_fields,
|
virtual bool DoInit(const WriterInfo& info, int num_fields,
|
||||||
const threading::Field* const* fields);
|
const threading::Field* const* fields);
|
||||||
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
|
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
|
||||||
threading::Value** vals);
|
threading::Value** vals);
|
||||||
|
|
|
@ -263,7 +263,7 @@ bool DataSeries::OpenLog(string path)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool DataSeries::DoInit(string path, int num_fields, const threading::Field* const * fields)
|
bool DataSeries::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const * fields)
|
||||||
{
|
{
|
||||||
// We first construct an XML schema thing (and, if ds_dump_schema is
|
// We first construct an XML schema thing (and, if ds_dump_schema is
|
||||||
// set, dump it to path + ".ds.xml"). Assuming that goes well, we
|
// set, dump it to path + ".ds.xml"). Assuming that goes well, we
|
||||||
|
@ -298,11 +298,11 @@ bool DataSeries::DoInit(string path, int num_fields, const threading::Field* con
|
||||||
schema_list.push_back(val);
|
schema_list.push_back(val);
|
||||||
}
|
}
|
||||||
|
|
||||||
string schema = BuildDSSchemaFromFieldTypes(schema_list, path);
|
string schema = BuildDSSchemaFromFieldTypes(schema_list, info.path);
|
||||||
|
|
||||||
if( ds_dump_schema )
|
if( ds_dump_schema )
|
||||||
{
|
{
|
||||||
FILE* pFile = fopen ( string(path + ".ds.xml").c_str() , "wb" );
|
FILE* pFile = fopen ( string(info.path + ".ds.xml").c_str() , "wb" );
|
||||||
|
|
||||||
if( pFile )
|
if( pFile )
|
||||||
{
|
{
|
||||||
|
@ -340,7 +340,7 @@ bool DataSeries::DoInit(string path, int num_fields, const threading::Field* con
|
||||||
log_type = log_types.registerTypePtr(schema);
|
log_type = log_types.registerTypePtr(schema);
|
||||||
log_series.setType(log_type);
|
log_series.setType(log_type);
|
||||||
|
|
||||||
return OpenLog(path);
|
return OpenLog(info.path);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool DataSeries::DoFlush()
|
bool DataSeries::DoFlush()
|
||||||
|
@ -401,7 +401,7 @@ bool DataSeries::DoRotate(string rotated_path, double open, double close, bool t
|
||||||
// size will be (much) larger.
|
// size will be (much) larger.
|
||||||
CloseLog();
|
CloseLog();
|
||||||
|
|
||||||
string dsname = Path() + ".ds";
|
string dsname = Info().path + ".ds";
|
||||||
string nname = rotated_path + ".ds";
|
string nname = rotated_path + ".ds";
|
||||||
rename(dsname.c_str(), nname.c_str());
|
rename(dsname.c_str(), nname.c_str());
|
||||||
|
|
||||||
|
@ -411,7 +411,7 @@ bool DataSeries::DoRotate(string rotated_path, double open, double close, bool t
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return OpenLog(Path());
|
return OpenLog(Info().path);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool DataSeries::DoSetBuf(bool enabled)
|
bool DataSeries::DoSetBuf(bool enabled)
|
||||||
|
|
|
@ -26,7 +26,7 @@ public:
|
||||||
protected:
|
protected:
|
||||||
// Overidden from WriterBackend.
|
// Overidden from WriterBackend.
|
||||||
|
|
||||||
virtual bool DoInit(string path, int num_fields,
|
virtual bool DoInit(const WriterInfo& info, int num_fields,
|
||||||
const threading::Field* const * fields);
|
const threading::Field* const * fields);
|
||||||
|
|
||||||
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
|
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
|
||||||
|
|
|
@ -6,9 +6,9 @@ using namespace writer;
|
||||||
|
|
||||||
bool None::DoRotate(string rotated_path, double open, double close, bool terminating)
|
bool None::DoRotate(string rotated_path, double open, double close, bool terminating)
|
||||||
{
|
{
|
||||||
if ( ! FinishedRotation(string("/dev/null"), Path(), open, close, terminating))
|
if ( ! FinishedRotation(string("/dev/null"), Info().path, open, close, terminating))
|
||||||
{
|
{
|
||||||
Error(Fmt("error rotating %s", Path().c_str()));
|
Error(Fmt("error rotating %s", Info().path.c_str()));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,7 @@ public:
|
||||||
{ return new None(frontend); }
|
{ return new None(frontend); }
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual bool DoInit(string path, int num_fields,
|
virtual bool DoInit(const WriterInfo& info, int num_fields,
|
||||||
const threading::Field* const * fields) { return true; }
|
const threading::Field* const * fields) { return true; }
|
||||||
|
|
||||||
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
|
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue