Remote logging for the new logging framework.

It works with a simple example, but that's as much testing as it has
seen so far.

Remote::Destination has a new attribute "request_logs: bool"
indicating whether we are interested in the peer's log. Default is
false. If true, Bro will send an explicit "I want your logs" message
over to the other side, which will then start sending log records
back.

When such log records are received, they will be recorded exactly in
the same way as on the remote side, i.e., same fields/writer/path. All
filtering is already performed on the remote side.

Log::Filter has two new attributes, "log_local: bool" and
"log_remote: bool" (both true by default). If log_local is false, this
filter will not record anything locally but still process everything
normally otherwise and potentially forward to remote. If log_remote is
false, this filter will never send anything to remote even if a peer
has requested logs. (Note that with the defaults, requesting logs will
mean getting everything.)

Note that with log forwarding, *both* sides must create the
Filter::Stream. If the remote sends log records for a specific stream,
but the local side hasn't created it, the data will be discarded.
Filtes on the other hand shouldn't created locally; and if they are,
they are ignored for records received from remote).
This commit is contained in:
Robin Sommer 2011-03-03 16:35:51 -08:00
parent c355f5d1fa
commit 3f413a2539
11 changed files with 690 additions and 59 deletions

View file

@ -43,6 +43,9 @@ export {
# Whether to perform state synchronization with peer.
sync: bool &default = T;
# Whether to requests logs from the peer.
request_logs: bool &default = F;
# When performing state synchronization, whether we consider
# our state to be authoritative. If so, we will send the peer
# our current set when the connection is set up.
@ -176,6 +179,12 @@ function setup_peer(p: event_peer, dst: Destination)
request_remote_sync(p, dst$auth);
}
if ( dst$request_logs )
{
do_script_log(p, "requesting logs");
request_remote_logs(p);
}
dst$peer = p;
dst$connected = T;
connected_peers[p$id] = dst;

View file

@ -51,6 +51,154 @@ struct LogMgr::Stream {
~Stream();
};
bool LogVal::Read(SerializationFormat* fmt)
{
int ty;
if ( ! (fmt->Read(&ty, "type") && fmt->Read(&present, "present")) )
return false;
type = (TypeTag)(ty);
if ( ! present )
return true;
switch ( type ) {
case TYPE_BOOL:
case TYPE_INT:
case TYPE_ENUM:
return fmt->Read(&val.int_val, "int");
case TYPE_COUNT:
case TYPE_COUNTER:
case TYPE_PORT:
return fmt->Read(&val.uint_val, "uint");
case TYPE_SUBNET:
{
uint32 net[4];
if ( ! (fmt->Read(&net[0], "net0")
&& fmt->Read(&net[1], "net1")
&& fmt->Read(&net[2], "net2")
&& fmt->Read(&net[3], "net3")
&& fmt->Read(&val.subnet_val.width, "width")) )
return false;
#ifdef BROv6
val.subnet_val.net[0] = net[0];
val.subnet_val.net[1] = net[1];
val.subnet_val.net[2] = net[2];
val.subnet_val.net[3] = net[3];
#else
val.subnet_val.net = net[0];
#endif
return true;
}
case TYPE_NET:
case TYPE_ADDR:
{
uint32 addr[4];
if ( ! (fmt->Read(&addr[0], "addr0")
&& fmt->Read(&addr[1], "addr1")
&& fmt->Read(&addr[2], "addr2")
&& fmt->Read(&addr[3], "addr3")) )
return false;
#ifdef BROv6
val.addr_val[0] = addr[0];
val.addr_val[1] = addr[1];
val.addr_val[2] = addr[2];
val.addr_val[3] = addr[3];
#else
val.addr_val = addr[0];
#endif
return true;
}
case TYPE_DOUBLE:
case TYPE_TIME:
case TYPE_INTERVAL:
return fmt->Read(&val.double_val, "double");
case TYPE_STRING:
{
val.string_val = new string;
return fmt->Read(val.string_val, "string");
}
default:
internal_error(::fmt("unsupported type %s in LogVal::Write", type_name(type)));
}
return false;
}
bool LogVal::Write(SerializationFormat* fmt) const
{
if ( ! (fmt->Write((int)type, "type") && fmt->Write(present, "present")) )
return false;
if ( ! present )
return true;
switch ( type ) {
case TYPE_BOOL:
case TYPE_INT:
case TYPE_ENUM:
return fmt->Write(val.int_val, "int");
case TYPE_COUNT:
case TYPE_COUNTER:
case TYPE_PORT:
return fmt->Write(val.uint_val, "uint");
case TYPE_SUBNET:
{
#ifdef BROv6
uint32* net = val.subnet_val;
#else
uint32 net[4];
net[0] = val.subnet_val.net;
net[1] = net[2] = net[3] = 0;
#endif
return fmt->Write(net[0], "net0")
&& fmt->Write(net[1], "net1")
&& fmt->Write(net[2], "net2")
&& fmt->Write(net[3], "net3")
&& fmt->Write(val.subnet_val.width, "width");
}
case TYPE_NET:
case TYPE_ADDR:
{
#ifdef BROv6
uint32* addr = val.addr_val;
#else
uint32 addr[4];
addr[0] = val.addr_val;
addr[1] = addr[2] = addr[3] = 0;
#endif
return fmt->Write(addr[0], "addr0")
&& fmt->Write(addr[1], "addr1")
&& fmt->Write(addr[2], "addr2")
&& fmt->Write(addr[3], "addr3");
}
case TYPE_DOUBLE:
case TYPE_TIME:
case TYPE_INTERVAL:
return fmt->Write(val.double_val, "double");
case TYPE_STRING:
return fmt->Write(*val.string_val, "string");
default:
internal_error(::fmt("unsupported type %s in LogVal::REad", type_name(type)));
}
return false;
}
LogMgr::Filter::~Filter()
{
@ -87,7 +235,7 @@ LogMgr::Stream* LogMgr::FindStream(EnumVal* id)
if ( idx >= streams.size() || ! streams[idx] )
{
run_time("unknown log stream");
run_time(fmt("unknown log stream (%d)", id->AsEnum()));
return 0;
}
@ -260,9 +408,8 @@ bool LogMgr::AddFilter(EnumVal* id, RecordVal* fval)
return false;
// Find the right writer type.
int writer = 0;
int idx = rtype->FieldOffset("writer");
writer = fval->LookupWithDefault(idx)->AsEnum();
EnumVal* writer = fval->LookupWithDefault(idx)->AsEnumVal();
// Create a new Filter instance.
@ -273,7 +420,7 @@ bool LogMgr::AddFilter(EnumVal* id, RecordVal* fval)
filter->name = fval->Lookup(rtype->FieldOffset("name"))->AsString()->CheckString();
filter->pred = pred ? pred->AsFunc() : 0;
filter->path_func = path_func ? path_func->AsFunc() : 0;
filter->writer = id->Ref()->AsEnumVal();
filter->writer = writer->Ref()->AsEnumVal();
filter->local = fval->LookupWithDefault(rtype->FieldOffset("log_local"))->AsBool();
filter->remote = fval->LookupWithDefault(rtype->FieldOffset("log_remote"))->AsBool();
@ -318,8 +465,11 @@ bool LogMgr::AddFilter(EnumVal* id, RecordVal* fval)
stream->filters.push_back(filter);
#ifdef DEBUG
ODesc desc;
writer->Describe(&desc);
DBG_LOG(DBG_LOGGING, "Created new filter '%s' for stream '%s'", filter->name.c_str(), stream->name.c_str());
DBG_LOG(DBG_LOGGING, " writer : %d", filter->writer);
DBG_LOG(DBG_LOGGING, " writer : %s", desc.Description());
DBG_LOG(DBG_LOGGING, " path : %s", filter->path.c_str());
DBG_LOG(DBG_LOGGING, " path_func : %s", (filter->path_func ? "set" : "not set"));
DBG_LOG(DBG_LOGGING, " pred : %s", (filter->pred ? "set" : "not set"));
@ -371,7 +521,7 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
if ( ! columns )
{
run_time("imcompatible log record type");
run_time("incompatible log record type");
return false;
}
@ -423,10 +573,6 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
#endif
}
if ( ! filter->local )
// Skip the subsequent local logging code.
continue;
// See if we already have a writer for this path.
Stream::WriterMap::iterator w = stream->writers.find(Stream::WriterPathPair(filter->writer->AsEnum(), path));
@ -445,6 +591,8 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
for ( int j = 0; j < filter->num_fields; ++j )
arg_fields[j] = new LogField(*filter->fields[j]);
if ( filter->local )
{
writer = CreateWriter(stream->id, filter->writer, path, filter->num_fields, arg_fields);
if ( ! writer )
@ -454,9 +602,18 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
}
}
if ( filter->remote )
remote_serializer->SendLogCreateWriter(stream->id, filter->writer, path, filter->num_fields, arg_fields);
}
// Alright, can do the write now.
LogVal** vals = RecordToFilterVals(filter, columns);
if ( ! writer->Write(filter->num_fields, vals) )
if ( filter->remote )
remote_serializer->SendLogWrite(stream->id, filter->writer, path, filter->num_fields, vals);
if ( filter->local && ! writer->Write(filter->num_fields, vals) )
error = true;
#ifdef DEBUG
@ -501,30 +658,28 @@ LogVal** LogMgr::RecordToFilterVals(Filter* filter, RecordVal* columns)
if ( ! val )
continue;
vals[i] = new LogVal(type);
switch ( val->Type()->Tag() ) {
case TYPE_BOOL:
case TYPE_INT:
case TYPE_ENUM:
vals[i] = new LogVal(type);
vals[i]->val.int_val = val->InternalInt();
break;
case TYPE_COUNT:
case TYPE_COUNTER:
case TYPE_PORT:
vals[i] = new LogVal(type);
vals[i]->val.uint_val = val->InternalUnsigned();
break;
case TYPE_SUBNET:
vals[i] = new LogVal(type);
vals[i]->val.subnet_val = *val->AsSubNet();
break;
case TYPE_NET:
case TYPE_ADDR:
{
vals[i] = new LogVal(type);
addr_type t = val->AsAddr();
copy_addr(&t, &vals[i]->val.addr_val);
break;
@ -533,18 +688,13 @@ LogVal** LogMgr::RecordToFilterVals(Filter* filter, RecordVal* columns)
case TYPE_DOUBLE:
case TYPE_TIME:
case TYPE_INTERVAL:
vals[i] = new LogVal(type);
vals[i]->val.double_val = val->InternalDouble();
break;
case TYPE_STRING:
{
const BroString* s = val->AsString();
LogVal* lval = (LogVal*) new char[sizeof(LogVal) + sizeof(log_string_type) + s->Len()];
new (lval) LogVal(type); // Run ctor.
lval->val.string_val.len = s->Len();
memcpy(&lval->val.string_val.string, s->Bytes(), s->Len());
vals[i] = lval;
vals[i]->val.string_val = new string((const char*) s->Bytes(), s->Len());
break;
}
@ -600,6 +750,7 @@ LogWriter* LogMgr::CreateWriter(EnumVal* id, EnumVal* writer, string path, int n
// Init failed, disable by deleting factory function.
ld->factory = 0;
DBG_LOG(DBG_LOGGING, "failed to init writer class %s", ld->name);
return false;
}
@ -610,7 +761,10 @@ LogWriter* LogMgr::CreateWriter(EnumVal* id, EnumVal* writer, string path, int n
LogWriter* writer_obj = (*ld->factory)();
if ( ! writer_obj->Init(path, num_fields, fields) )
{
DBG_LOG(DBG_LOGGING, "failed to init instance of writer %s", ld->name);
return 0;
}
stream->writers.insert(Stream::WriterMap::value_type(Stream::WriterPathPair(writer->AsEnum(), path), writer_obj));
@ -622,24 +776,54 @@ bool LogMgr::Write(EnumVal* id, EnumVal* writer, string path, int num_fields, Lo
Stream* stream = FindStream(id);
if ( ! stream )
{
// Don't know this stream.
#ifdef DEBUG
ODesc desc;
id->Describe(&desc);
DBG_LOG(DBG_LOGGING, "unknown stream %s in LogMgr::Write()", desc.Description());
#endif
return false;
}
Stream::WriterMap::iterator w = stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), path));
if ( w == stream->writers.end() )
{
// Don't know this writer.
#ifdef DEBUG
ODesc desc;
id->Describe(&desc);
DBG_LOG(DBG_LOGGING, "unknown writer %s in LogMgr::Write()", desc.Description());
#endif
return false;
}
bool success = w->second->Write(num_fields, vals);
#ifdef DEBUG
DBG_LOG(DBG_LOGGING, "Wrote pre-filtered record to '%s' on stream '%s'", path.c_str(), stream->name.c_str());
#endif
DBG_LOG(DBG_LOGGING, "Wrote pre-filtered record to path '%s' on stream '%s' [%s]", path.c_str(), stream->name.c_str(), (success ? "ok" : "error"));
return success;
}
void LogMgr::SendAllWritersTo(RemoteSerializer::PeerID peer)
{
for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s )
{
Stream* stream = (*s);
if ( ! stream )
continue;
for ( Stream::WriterMap::iterator i = stream->writers.begin(); i != stream->writers.end(); i++ )
{
LogWriter* writer = i->second;
EnumVal writer_val(i->first.first, BifType::Enum::Log::Writer);
remote_serializer->SendLogCreateWriter(peer, (*s)->id, &writer_val, i->first.second, writer->NumFields(), writer->Fields());
}
}
}
bool LogMgr::SetBuf(EnumVal* id, bool enabled)
{
Stream* stream = FindStream(id);
@ -672,3 +856,4 @@ void LogMgr::Error(LogWriter* writer, const char* msg)
{
run_time(fmt("error with writer for %s: %s", writer->Path().c_str(), msg));
}

View file

@ -6,24 +6,31 @@
#include "Val.h"
#include "EventHandler.h"
#include "RemoteSerializer.h"
class SerializationFormat;
struct LogField {
LogField() { }
LogField(const LogField& other) : name(other.name), type(other.type) { }
string name;
TypeTag type;
};
// A string that we can directly include as part of the value union below.
struct log_string_type {
int len;
char string[]; // The string starts right here.
LogField() { }
LogField(const LogField& other) : name(other.name), type(other.type) { }
bool Read(SerializationFormat* fmt)
{
int t;
bool success = fmt->Read(&name, "name") && fmt->Read(&t, "type");
type = (TypeTag) t;
return success;
}
bool Write(SerializationFormat* fmt) const
{ return fmt->Write(name, "name") && fmt->Write((int)type, "type"); }
};
// All values that can be directly logged by a Writer.
struct LogVal {
LogVal(TypeTag arg_type, bool arg_present = true) : type(arg_type), present(arg_present) {}
TypeTag type;
bool present; // If false, the field is unset (i.e., &optional and not initialzed).
@ -35,8 +42,17 @@ struct LogVal {
addr_type addr_val;
subnet_type subnet_val;
double double_val;
log_string_type string_val;
string* string_val;
} val;
LogVal(TypeTag arg_type = TYPE_ERROR, bool arg_present = true) : type(arg_type), present(arg_present) {}
~LogVal() { if ( type == TYPE_STRING && present ) delete val.string_val; }
bool Read(SerializationFormat* fmt);
bool Write(SerializationFormat* fmt) const;
private:
LogVal(const LogVal& other) { }
};
class LogWriter;
@ -60,10 +76,10 @@ protected:
friend class LogWriter;
friend class RemoteSerializer;
// These function are also used by the RemoteSerializer to inject
// received logs.
LogWriter* CreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, LogField** fields);
bool Write(EnumVal* id, EnumVal* writer, string path, int num_fields, LogVal** vals);
// These function are also used by the RemoteSerializer.
LogWriter* CreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, LogField** fields); // takes ownership of fields.
bool Write(EnumVal* id, EnumVal* writer, string path, int num_fields, LogVal** vals); // takes ownership of vals.
void SendAllWritersTo(RemoteSerializer::PeerID peer);
/// Functions also used by the writers.

View file

@ -38,11 +38,17 @@ bool LogWriter::Write(int arg_num_fields, LogVal** vals)
// Double-check that the arguments match. If we get this from remote,
// something might be mixed up.
if ( num_fields != arg_num_fields )
{
DBG_LOG(DBG_LOGGING, "Number of fields don't match in LogWriter::Write() (%d vs. %d)", arg_num_fields, num_fields);
return false;
}
for ( int i = 0; i < num_fields; ++i )
if ( vals[i]->type != fields[i]->type )
{
DBG_LOG(DBG_LOGGING, "Field type doesn't match in LogWriter::Write() (%d vs. %d)", vals[i]->type, fields[i]->type);
return false;
}
bool result = DoWrite(num_fields, fields, vals);
DeleteVals(vals);

View file

@ -43,6 +43,10 @@ public:
// performed.
void Finish();
int NumFields() const { return num_fields; }
const LogField* const * Fields() const { return fields; }
protected:
// Methods for Writers to override. If any of these returs false, it will

View file

@ -114,7 +114,7 @@ bool LogWriterAscii::DoWrite(int num_fields, LogField** fields, LogVal** vals)
break;
case TYPE_STRING:
desc.AddN((const char*)&val->val.string_val.string, val->val.string_val.len);
desc.AddN(val->val.string_val->data(), val->val.string_val->size());
break;
default:

View file

@ -183,6 +183,7 @@
#include "Sessions.h"
#include "File.h"
#include "Conn.h"
#include "LogMgr.h"
extern "C" {
#include "setsignal.h"
@ -190,7 +191,7 @@ extern "C" {
// Gets incremented each time there's an incompatible change
// to the communication internals.
static const unsigned short PROTOCOL_VERSION = 0x06;
static const unsigned short PROTOCOL_VERSION = 0x07;
static const char MSG_NONE = 0x00;
static const char MSG_VERSION = 0x01;
@ -216,9 +217,12 @@ static const char MSG_SYNC_POINT = 0x14;
static const char MSG_TERMINATE = 0x15;
static const char MSG_DEBUG_DUMP = 0x16;
static const char MSG_REMOTE_PRINT = 0x17;
static const char MSG_LOG_CREATE_WRITER = 0x18;
static const char MSG_LOG_WRITE = 0x19;
static const char MSG_REQUEST_LOGS = 0x20;
// Update this one whenever adding a new ID:
static const char MSG_ID_MAX = MSG_REMOTE_PRINT;
static const char MSG_ID_MAX = MSG_REQUEST_LOGS;
static const uint32 FINAL_SYNC_POINT = /* UINT32_MAX */ 4294967295U;
@ -226,6 +230,9 @@ static const uint32 FINAL_SYNC_POINT = /* UINT32_MAX */ 4294967295U;
static const int PRINT_BUFFER_SIZE = 10 * 1024;
static const int SOCKBUF_SIZE = 1024 * 1024;
// Buffer size for remote-log data.
static const int LOG_BUFFER_SIZE = 50 * 1024;
struct ping_args {
uint32 seq;
double time1; // Round-trip time parent1<->parent2
@ -303,6 +310,9 @@ static const char* msgToStr(int msg)
MSG_STR(MSG_TERMINATE)
MSG_STR(MSG_DEBUG_DUMP)
MSG_STR(MSG_REMOTE_PRINT)
MSG_STR(MSG_LOG_CREATE_WRITER)
MSG_STR(MSG_LOG_WRITE)
MSG_STR(MSG_REQUEST_LOGS)
default:
return "UNKNOWN_MSG";
}
@ -469,7 +479,10 @@ static inline bool is_peer_msg(int msg)
msg == MSG_CAPS ||
msg == MSG_COMPRESS ||
msg == MSG_SYNC_POINT ||
msg == MSG_REMOTE_PRINT;
msg == MSG_REMOTE_PRINT ||
msg == MSG_LOG_CREATE_WRITER ||
msg == MSG_LOG_WRITE ||
msg == MSG_REQUEST_LOGS;
}
bool RemoteSerializer::IsConnectedPeer(PeerID id)
@ -704,6 +717,7 @@ bool RemoteSerializer::CloseConnection(Peer* peer)
return true;
FlushPrintBuffer(peer);
FlushLogBuffer(peer);
Log(LogInfo, "closing connection", peer);
@ -738,6 +752,31 @@ bool RemoteSerializer::RequestSync(PeerID id, bool auth)
return true;
}
bool RemoteSerializer::RequestLogs(PeerID id)
{
if ( ! using_communication )
return true;
Peer* peer = LookupPeer(id, true);
if ( ! peer )
{
run_time(fmt("unknown peer id %d for request logs", int(id)));
return false;
}
if ( peer->phase != Peer::HANDSHAKE )
{
run_time(fmt("can't request logs from peer; wrong phase %d",
peer->phase));
return false;
}
if ( ! SendToChild(MSG_REQUEST_LOGS, peer, 0) )
return false;
return true;
}
bool RemoteSerializer::RequestEvents(PeerID id, RE_Matcher* pattern)
{
if ( ! using_communication )
@ -1443,6 +1482,7 @@ bool RemoteSerializer::Poll(bool may_block)
case MSG_PHASE_DONE:
case MSG_TERMINATE:
case MSG_DEBUG_DUMP:
case MSG_REQUEST_LOGS:
{
// No further argument chunk.
msgstate = TYPE;
@ -1465,6 +1505,8 @@ bool RemoteSerializer::Poll(bool may_block)
case MSG_LOG:
case MSG_SYNC_POINT:
case MSG_REMOTE_PRINT:
case MSG_LOG_CREATE_WRITER:
case MSG_LOG_WRITE:
{
// One further argument chunk.
msgstate = ARGS;
@ -1601,6 +1643,15 @@ bool RemoteSerializer::DoMessage()
case MSG_REMOTE_PRINT:
return ProcessRemotePrint();
case MSG_LOG_CREATE_WRITER:
return ProcessLogCreateWriter();
case MSG_LOG_WRITE:
return ProcessLogWrite();
case MSG_REQUEST_LOGS:
return ProcessRequestLogs();
default:
DEBUG_COMM(fmt("unexpected msg type: %d",
int(current_msgtype)));
@ -1663,6 +1714,7 @@ void RemoteSerializer::PeerConnected(Peer* peer)
peer->cache_out->Clear();
peer->our_runtime = int(current_time(true) - bro_start_time);
peer->sync_point = 0;
peer->logs_requested = false;
if ( ! SendCMsgToChild(MSG_VERSION, peer) )
return;
@ -1725,6 +1777,7 @@ RemoteSerializer::Peer* RemoteSerializer::AddPeer(uint32 ip, uint16 port,
peer->orig = false;
peer->accept_state = false;
peer->send_state = false;
peer->logs_requested = false;
peer->caps = 0;
peer->comp_level = 0;
peer->suspended_processing = false;
@ -1735,6 +1788,8 @@ RemoteSerializer::Peer* RemoteSerializer::AddPeer(uint32 ip, uint16 port,
peer->sync_point = 0;
peer->print_buffer = 0;
peer->print_buffer_used = 0;
peer->log_buffer = 0;
peer->log_buffer_used = 0;
peers.append(peer);
Log(LogInfo, "added peer", peer);
@ -1767,6 +1822,7 @@ void RemoteSerializer::RemovePeer(Peer* peer)
int id = peer->id;
Unref(peer->val);
delete [] peer->print_buffer;
delete [] peer->log_buffer;
delete peer->cache_in;
delete peer->cache_out;
delete peer;
@ -1960,6 +2016,19 @@ bool RemoteSerializer::ProcessRequestSyncMsg()
return true;
}
bool RemoteSerializer::ProcessRequestLogs()
{
if ( ! current_peer )
return false;
Log(LogInfo, "peer requested logs", current_peer);
current_peer->logs_requested = true;
current_peer->log_buffer = new char[LOG_BUFFER_SIZE];
current_peer->log_buffer_used = 0;
return true;
}
bool RemoteSerializer::ProcessPhaseDone()
{
switch ( current_peer->phase ) {
@ -2024,6 +2093,9 @@ bool RemoteSerializer::HandshakeDone(Peer* peer)
if ( (peer->caps & Peer::NEW_CACHE_STRATEGY) )
Log(LogInfo, "peer supports keep-in-cache; using that", peer);
if ( peer->logs_requested )
log_mgr->SendAllWritersTo(peer->id);
if ( peer->sync_requested != Peer::NONE )
{
if ( in_sync )
@ -2377,6 +2449,260 @@ bool RemoteSerializer::ProcessRemotePrint()
return true;
}
bool RemoteSerializer::SendLogCreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, const LogField* const * fields)
{
loop_over_list(peers, i)
{
SendLogCreateWriter(peers[i]->id, id, writer, path, num_fields, fields);
}
return true;
}
bool RemoteSerializer::SendLogCreateWriter(PeerID peer_id, EnumVal* id, EnumVal* writer, string path, int num_fields, const LogField* const * fields)
{
SetErrorDescr("logging");
ChunkedIO::Chunk* c = 0;
Peer* peer = LookupPeer(peer_id, true);
if ( ! peer )
return false;
if ( peer->phase != Peer::HANDSHAKE && peer->phase != Peer::RUNNING )
return false;
BinarySerializationFormat fmt;
fmt.StartWrite();
bool success = fmt.Write(id->AsEnum(), "id") &&
fmt.Write(writer->AsEnum(), "writer") &&
fmt.Write(path, "path") &&
fmt.Write(num_fields, "num_fields");
if ( ! success )
goto error;
for ( int i = 0; i < num_fields; i++ )
{
if ( ! fields[i]->Write(&fmt) )
goto error;
}
c = new ChunkedIO::Chunk;
c->len = fmt.EndWrite(&c->data);
if ( ! SendToChild(MSG_LOG_CREATE_WRITER, peer, 0) )
goto error;
if ( ! SendToChild(c) )
goto error;
return true;
error:
FatalError(io->Error());
return false;
}
bool RemoteSerializer::SendLogWrite(EnumVal* id, EnumVal* writer, string path, int num_fields, const LogVal* const * vals)
{
loop_over_list(peers, i)
{
SendLogWrite(peers[i], id, writer, path, num_fields, vals);
}
return true;
}
bool RemoteSerializer::SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const LogVal* const * vals)
{
if ( peer->phase != Peer::HANDSHAKE && peer->phase != Peer::RUNNING )
return false;
if ( ! peer->logs_requested )
return false;
assert(peer->log_buffer);
// Serialize the log record entry.
BinarySerializationFormat fmt;
fmt.StartWrite();
bool success = fmt.Write(id->AsEnum(), "id") &&
fmt.Write(writer->AsEnum(), "writer") &&
fmt.Write(path, "path") &&
fmt.Write(num_fields, "num_fields");
if ( ! success )
goto error;
for ( int i = 0; i < num_fields; i++ )
{
if ( ! vals[i]->Write(&fmt) )
goto error;
}
// Ok, we have the binary data now.
char* data;
int len;
len = fmt.EndWrite(&data);
// Do we have enough space in the buffer? If not, flush first.
if ( len > (LOG_BUFFER_SIZE - peer->log_buffer_used) )
FlushLogBuffer(peer);
// If the data is actually larger than our complete buffer, just send it out.
if ( len > LOG_BUFFER_SIZE )
return SendToChild(MSG_LOG_WRITE, peer, data, len);
// Now we have space in the buffer, copy it into there.
memcpy(peer->log_buffer + peer->log_buffer_used, data, len);
peer->log_buffer_used += len;
assert(peer->log_buffer_used <= LOG_BUFFER_SIZE);
FlushLogBuffer(peer);
return false;
error:
FatalError(io->Error());
return false;
}
bool RemoteSerializer::FlushLogBuffer(Peer* p)
{
if ( p->state == Peer::CLOSING )
return false;
if ( ! p->log_buffer )
return true;
SendToChild(MSG_LOG_WRITE, p, p->log_buffer, p->log_buffer_used);
p->log_buffer = new char[LOG_BUFFER_SIZE];
p->log_buffer_used = 0;
return true;
}
bool RemoteSerializer::ProcessLogCreateWriter()
{
if ( current_peer->state == Peer::CLOSING )
return false;
assert(current_args);
EnumVal* id_val = 0;
EnumVal* writer_val = 0;
LogField** fields = 0;
BinarySerializationFormat fmt;
fmt.StartRead(current_args->data, current_args->len);
int id, writer;
string path;
int num_fields;
bool success = fmt.Read(&id, "id") &&
fmt.Read(&writer, "writer") &&
fmt.Read(&path, "path") &&
fmt.Read(&num_fields, "num_fields");
if ( ! success )
goto error;
fields = new LogField* [num_fields];
for ( int i = 0; i < num_fields; i++ )
{
fields[i] = new LogField;
if ( ! fields[i]->Read(&fmt) )
goto error;
}
fmt.EndRead();
id_val = new EnumVal(id, BifType::Enum::Log::ID);
writer_val = new EnumVal(writer, BifType::Enum::Log::Writer);
if ( ! log_mgr->CreateWriter(id_val, writer_val, path, num_fields, fields) )
goto error;
Unref(id_val);
Unref(writer_val);
return true;
error:
Unref(id_val);
Unref(writer_val);
Error("write error for creating writer");
return false;
}
bool RemoteSerializer::ProcessLogWrite()
{
if ( current_peer->state == Peer::CLOSING )
return false;
assert(current_args);
BinarySerializationFormat fmt;
fmt.StartRead(current_args->data, current_args->len);
while ( fmt.BytesRead() != (int)current_args->len )
{
// Unserialize one entry.
EnumVal* id_val = 0;
EnumVal* writer_val = 0;
LogVal** vals = 0;
int id, writer;
string path;
int num_fields;
bool success = fmt.Read(&id, "id") &&
fmt.Read(&writer, "writer") &&
fmt.Read(&path, "path") &&
fmt.Read(&num_fields, "num_fields");
if ( ! success )
goto error;
vals = new LogVal* [num_fields];
for ( int i = 0; i < num_fields; i++ )
{
vals[i] = new LogVal;
if ( ! vals[i]->Read(&fmt) )
goto error;
}
id_val = new EnumVal(id, BifType::Enum::Log::ID);
writer_val = new EnumVal(writer, BifType::Enum::Log::Writer);
success = log_mgr->Write(id_val, writer_val, path, num_fields, vals);
Unref(id_val);
Unref(writer_val);
if ( ! success )
goto error;
}
fmt.EndRead();
return true;
error:
Error("write error for log entry");
return false;
}
void RemoteSerializer::GotEvent(const char* name, double time,
EventHandlerPtr event, val_list* args)
@ -3048,6 +3374,7 @@ bool SocketComm::ProcessParentMessage()
case MSG_TERMINATE:
case MSG_PHASE_DONE:
case MSG_DEBUG_DUMP:
case MSG_REQUEST_LOGS:
{
// No further argument chunk.
parent_msgstate = TYPE;
@ -3067,6 +3394,8 @@ bool SocketComm::ProcessParentMessage()
case MSG_CAPS:
case MSG_SYNC_POINT:
case MSG_REMOTE_PRINT:
case MSG_LOG_CREATE_WRITER:
case MSG_LOG_WRITE:
{
// One further argument chunk.
parent_msgstate = ARGS;
@ -3167,18 +3496,6 @@ bool SocketComm::DoParentMessage()
return true;
}
case MSG_PHASE_DONE:
{
// No argument block follows.
if ( parent_peer && parent_peer->connected )
{
DEBUG_COMM("child: forwarding with MSG_PHASE_DONE to peer");
if ( ! SendToPeer(parent_peer, MSG_PHASE_DONE, 0) )
return false;
}
return true;
}
case MSG_LISTEN:
return ProcessListen();
@ -3206,6 +3523,20 @@ bool SocketComm::DoParentMessage()
return ForwardChunkToPeer();
}
case MSG_PHASE_DONE:
case MSG_REQUEST_LOGS:
{
// No argument block follows.
if ( parent_peer && parent_peer->connected )
{
DEBUG_COMM(fmt("child: forwarding %s to peer", msgToStr(parent_msgtype)));
if ( ! SendToPeer(parent_peer, parent_msgtype, 0) )
return false;
}
return true;
}
case MSG_REQUEST_EVENTS:
case MSG_REQUEST_SYNC:
case MSG_SERIAL:
@ -3214,6 +3545,8 @@ bool SocketComm::DoParentMessage()
case MSG_CAPS:
case MSG_SYNC_POINT:
case MSG_REMOTE_PRINT:
case MSG_LOG_CREATE_WRITER:
case MSG_LOG_WRITE:
assert(parent_args);
return ForwardChunkToPeer();
@ -3332,6 +3665,7 @@ bool SocketComm::ProcessRemoteMessage(SocketComm::Peer* peer)
switch ( msg->Type() ) {
case MSG_PHASE_DONE:
case MSG_REQUEST_LOGS:
// No further argument block.
DEBUG_COMM("child: forwarding with 0 args to parent");
if ( ! SendToParent(msg->Type(), peer, 0) )
@ -3388,6 +3722,8 @@ bool SocketComm::ProcessRemoteMessage(SocketComm::Peer* peer)
case MSG_CAPS:
case MSG_SYNC_POINT:
case MSG_REMOTE_PRINT:
case MSG_LOG_CREATE_WRITER:
case MSG_LOG_WRITE:
{
// Messages with one further argument block which we simply
// forward to our parent.

View file

@ -16,6 +16,8 @@
// FIXME: Change this to network byte order
class IncrementalSendTimer;
class LogField;
class LogVal;
// This class handles the communication done in Bro's main loop.
class RemoteSerializer : public Serializer, public IOSource {
@ -42,6 +44,9 @@ public:
// the peer right after the handshake.
bool RequestSync(PeerID peer, bool auth);
// Requests logs from the remote side.
bool RequestLogs(PeerID id);
// Sets flag whether we're accepting state from this peer
// (default: yes).
bool SetAcceptState(PeerID peer, bool accept);
@ -92,6 +97,15 @@ public:
// Broadcast remote print.
bool SendPrintHookEvent(BroFile* f, const char* txt);
// Send a request to create a writer on a remote side.
bool SendLogCreateWriter(PeerID peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const LogField* const * fields);
// Broadcasts a request to create a writer.
bool SendLogCreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, const LogField* const * fields);
// Broadcast a log entry to everybody interested.
bool SendLogWrite(EnumVal* id, EnumVal* writer, string path, int num_fields, const LogVal* const * vals);
// Synchronzizes time with all connected peers. Returns number of
// current sync-point, or -1 on error.
uint32 SendSyncPoint();
@ -205,6 +219,7 @@ protected:
bool accept_state; // True if we accept state from peer.
bool send_state; // True if we're supposed to initially sent our state.
int comp_level; // Compression level.
bool logs_requested; // True if the peer has requested logs.
// True if this peer triggered a net_suspend_processing().
bool suspended_processing;
@ -217,6 +232,8 @@ protected:
uint32 sync_point; // Highest sync-point received so far
char* print_buffer; // Buffer for remote print or null.
int print_buffer_used; // Number of bytes used in buffer.
char* log_buffer; // Buffer for remote log or null.
int log_buffer_used; // Number of bytes used in buffer.
};
// Shuts down remote serializer.
@ -255,6 +272,9 @@ protected:
bool ProcessCapsMsg();
bool ProcessSyncPointMsg();
bool ProcessRemotePrint();
bool ProcessLogCreateWriter();
bool ProcessLogWrite();
bool ProcessRequestLogs();
Peer* AddPeer(uint32 ip, uint16 port, PeerID id = PEER_NONE);
Peer* LookupPeer(PeerID id, bool only_if_connected);
@ -282,11 +302,13 @@ protected:
bool SendID(SerialInfo* info, Peer* peer, const ID& id);
bool SendCapabilities(Peer* peer);
bool SendPacket(SerialInfo* info, Peer* peer, const Packet& p);
bool SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const LogVal* const * vals);
void UnregisterHandlers(Peer* peer);
void RaiseEvent(EventHandlerPtr event, Peer* peer, const char* arg = 0);
bool EnterPhaseRunning(Peer* peer);
bool FlushPrintBuffer(Peer* p);
bool FlushLogBuffer(Peer* p);
void ChildDied();
void InternalCommError(const char* msg);

View file

@ -44,6 +44,7 @@ void SerializationFormat::StartWrite()
output_pos = 0;
bytes_written = 0;
bytes_read = 0;
}
uint32 SerializationFormat::EndWrite(char** data)
@ -64,6 +65,8 @@ bool SerializationFormat::ReadData(void* b, size_t count)
memcpy(b, input + input_pos, count);
input_pos += count;
bytes_read += count;
return true;
}
@ -214,6 +217,20 @@ bool BinarySerializationFormat::Read(char** str, int* len, const char* tag)
return true;
}
bool BinarySerializationFormat::Read(string* v, const char* tag)
{
char* buffer;
int len;
if ( ! Read(&buffer, &len, tag) )
return false;
*v = string(buffer, len);
delete [] buffer;
return true;
}
bool BinarySerializationFormat::Write(char v, const char* tag)
{
DBG_LOG(DBG_SERIAL, "Write char %s [%s]", fmt_bytes(&v, 1), tag);
@ -278,6 +295,11 @@ bool BinarySerializationFormat::Write(const char* s, const char* tag)
return Write(s, strlen(s), tag);
}
bool BinarySerializationFormat::Write(const string& s, const char* tag)
{
return Write(s.data(), s.size(), tag);
}
bool BinarySerializationFormat::WriteOpenTag(const char* tag)
{
return true;
@ -362,6 +384,12 @@ bool XMLSerializationFormat::Read(char** str, int* len, const char* tag)
return false;
}
bool XMLSerializationFormat::Read(string* s, const char* tag)
{
internal_error("no reading of xml");
return false;
}
bool XMLSerializationFormat::Write(char v, const char* tag)
{
return WriteElem(tag, "char", &v, 1);
@ -416,6 +444,11 @@ bool XMLSerializationFormat::Write(const char* s, const char* tag)
return WriteElem(tag, "string", s, strlen(s));
}
bool XMLSerializationFormat::Write(const string& s, const char* tag)
{
return WriteElem(tag, "string", s.data(), s.size());
}
bool XMLSerializationFormat::WriteOpenTag(const char* tag)
{
return WriteData("<", 1) && WriteData(tag, strlen(tag) && WriteData(">", 1));

View file

@ -5,6 +5,10 @@
#ifndef SERIALIZATION_FORMAT
#define SERIALIZATION_FORMAT
#include <string>
using namespace std;
#include "util.h"
// Abstract base class.
@ -25,6 +29,10 @@ public:
virtual bool Read(char* v, const char* tag) = 0;
virtual bool Read(bool* v, const char* tag) = 0;
virtual bool Read(double* d, const char* tag) = 0;
virtual bool Read(string* s, const char* tag) = 0;
// Returns number of raw bytes read since last call to StartRead().
int BytesRead() const { return bytes_read; }
// Passes ownership of string.
virtual bool Read(char** str, int* len, const char* tag) = 0;
@ -43,6 +51,7 @@ public:
virtual bool Write(double d, const char* tag) = 0;
virtual bool Write(const char* s, const char* tag) = 0;
virtual bool Write(const char* buf, int len, const char* tag) = 0;
virtual bool Write(const string& s, const char* tag) = 0;
virtual bool WriteOpenTag(const char* tag) = 0;
virtual bool WriteCloseTag(const char* tag) = 0;
@ -65,6 +74,7 @@ protected:
uint32 input_pos;
int bytes_written;
int bytes_read;
};
class BinarySerializationFormat : public SerializationFormat {
@ -81,6 +91,7 @@ public:
virtual bool Read(bool* v, const char* tag);
virtual bool Read(double* d, const char* tag);
virtual bool Read(char** str, int* len, const char* tag);
virtual bool Read(string* s, const char* tag);
virtual bool Write(int v, const char* tag);
virtual bool Write(uint16 v, const char* tag);
virtual bool Write(uint32 v, const char* tag);
@ -91,6 +102,7 @@ public:
virtual bool Write(double d, const char* tag);
virtual bool Write(const char* s, const char* tag);
virtual bool Write(const char* buf, int len, const char* tag);
virtual bool Write(const string& s, const char* tag);
virtual bool WriteOpenTag(const char* tag);
virtual bool WriteCloseTag(const char* tag);
virtual bool WriteSeparator();
@ -112,6 +124,7 @@ public:
virtual bool Write(double d, const char* tag);
virtual bool Write(const char* s, const char* tag);
virtual bool Write(const char* buf, int len, const char* tag);
virtual bool Write(const string& s, const char* tag);
virtual bool WriteOpenTag(const char* tag);
virtual bool WriteCloseTag(const char* tag);
virtual bool WriteSeparator();
@ -126,6 +139,7 @@ public:
virtual bool Read(bool* v, const char* tag);
virtual bool Read(double* d, const char* tag);
virtual bool Read(char** str, int* len, const char* tag);
virtual bool Read(string* s, const char* tag);
private:
// Encodes non-printable characters.

View file

@ -2108,6 +2108,12 @@ function request_remote_sync%(p: event_peer, auth: bool%) : bool
return new Val(remote_serializer->RequestSync(id, auth), TYPE_BOOL);
%}
function request_remote_logs%(p: event_peer%) : bool
%{
RemoteSerializer::PeerID id = p->AsRecordVal()->Lookup(0)->AsCount();
return new Val(remote_serializer->RequestLogs(id), TYPE_BOOL);
%}
function set_accept_state%(p: event_peer, accept: bool%) : bool
%{
RemoteSerializer::PeerID id = p->AsRecordVal()->Lookup(0)->AsCount();