diff --git a/policy/remote.bro b/policy/remote.bro index fad2beb900..ae9fc14224 100644 --- a/policy/remote.bro +++ b/policy/remote.bro @@ -43,6 +43,9 @@ export { # Whether to perform state synchronization with peer. sync: bool &default = T; + # Whether to requests logs from the peer. + request_logs: bool &default = F; + # When performing state synchronization, whether we consider # our state to be authoritative. If so, we will send the peer # our current set when the connection is set up. @@ -176,6 +179,12 @@ function setup_peer(p: event_peer, dst: Destination) request_remote_sync(p, dst$auth); } + if ( dst$request_logs ) + { + do_script_log(p, "requesting logs"); + request_remote_logs(p); + } + dst$peer = p; dst$connected = T; connected_peers[p$id] = dst; diff --git a/src/LogMgr.cc b/src/LogMgr.cc index ca566a84b3..15db1cb927 100644 --- a/src/LogMgr.cc +++ b/src/LogMgr.cc @@ -51,6 +51,154 @@ struct LogMgr::Stream { ~Stream(); }; +bool LogVal::Read(SerializationFormat* fmt) + { + int ty; + + if ( ! (fmt->Read(&ty, "type") && fmt->Read(&present, "present")) ) + return false; + + type = (TypeTag)(ty); + + if ( ! present ) + return true; + + switch ( type ) { + case TYPE_BOOL: + case TYPE_INT: + case TYPE_ENUM: + return fmt->Read(&val.int_val, "int"); + + case TYPE_COUNT: + case TYPE_COUNTER: + case TYPE_PORT: + return fmt->Read(&val.uint_val, "uint"); + + case TYPE_SUBNET: + { + uint32 net[4]; + if ( ! (fmt->Read(&net[0], "net0") + && fmt->Read(&net[1], "net1") + && fmt->Read(&net[2], "net2") + && fmt->Read(&net[3], "net3") + && fmt->Read(&val.subnet_val.width, "width")) ) + return false; + +#ifdef BROv6 + val.subnet_val.net[0] = net[0]; + val.subnet_val.net[1] = net[1]; + val.subnet_val.net[2] = net[2]; + val.subnet_val.net[3] = net[3]; +#else + val.subnet_val.net = net[0]; +#endif + return true; + } + + case TYPE_NET: + case TYPE_ADDR: + { + uint32 addr[4]; + if ( ! (fmt->Read(&addr[0], "addr0") + && fmt->Read(&addr[1], "addr1") + && fmt->Read(&addr[2], "addr2") + && fmt->Read(&addr[3], "addr3")) ) + return false; + +#ifdef BROv6 + val.addr_val[0] = addr[0]; + val.addr_val[1] = addr[1]; + val.addr_val[2] = addr[2]; + val.addr_val[3] = addr[3]; +#else + val.addr_val = addr[0]; +#endif + return true; + } + + case TYPE_DOUBLE: + case TYPE_TIME: + case TYPE_INTERVAL: + return fmt->Read(&val.double_val, "double"); + + case TYPE_STRING: + { + val.string_val = new string; + return fmt->Read(val.string_val, "string"); + } + + default: + internal_error(::fmt("unsupported type %s in LogVal::Write", type_name(type))); + } + + return false; + } + +bool LogVal::Write(SerializationFormat* fmt) const + { + if ( ! (fmt->Write((int)type, "type") && fmt->Write(present, "present")) ) + return false; + + if ( ! present ) + return true; + + switch ( type ) { + case TYPE_BOOL: + case TYPE_INT: + case TYPE_ENUM: + return fmt->Write(val.int_val, "int"); + + case TYPE_COUNT: + case TYPE_COUNTER: + case TYPE_PORT: + return fmt->Write(val.uint_val, "uint"); + + case TYPE_SUBNET: + { +#ifdef BROv6 + uint32* net = val.subnet_val; +#else + uint32 net[4]; + net[0] = val.subnet_val.net; + net[1] = net[2] = net[3] = 0; +#endif + return fmt->Write(net[0], "net0") + && fmt->Write(net[1], "net1") + && fmt->Write(net[2], "net2") + && fmt->Write(net[3], "net3") + && fmt->Write(val.subnet_val.width, "width"); + } + + case TYPE_NET: + case TYPE_ADDR: + { +#ifdef BROv6 + uint32* addr = val.addr_val; +#else + uint32 addr[4]; + addr[0] = val.addr_val; + addr[1] = addr[2] = addr[3] = 0; +#endif + return fmt->Write(addr[0], "addr0") + && fmt->Write(addr[1], "addr1") + && fmt->Write(addr[2], "addr2") + && fmt->Write(addr[3], "addr3"); + } + + case TYPE_DOUBLE: + case TYPE_TIME: + case TYPE_INTERVAL: + return fmt->Write(val.double_val, "double"); + + case TYPE_STRING: + return fmt->Write(*val.string_val, "string"); + + default: + internal_error(::fmt("unsupported type %s in LogVal::REad", type_name(type))); + } + + return false; + } LogMgr::Filter::~Filter() { @@ -87,7 +235,7 @@ LogMgr::Stream* LogMgr::FindStream(EnumVal* id) if ( idx >= streams.size() || ! streams[idx] ) { - run_time("unknown log stream"); + run_time(fmt("unknown log stream (%d)", id->AsEnum())); return 0; } @@ -260,9 +408,8 @@ bool LogMgr::AddFilter(EnumVal* id, RecordVal* fval) return false; // Find the right writer type. - int writer = 0; int idx = rtype->FieldOffset("writer"); - writer = fval->LookupWithDefault(idx)->AsEnum(); + EnumVal* writer = fval->LookupWithDefault(idx)->AsEnumVal(); // Create a new Filter instance. @@ -273,7 +420,7 @@ bool LogMgr::AddFilter(EnumVal* id, RecordVal* fval) filter->name = fval->Lookup(rtype->FieldOffset("name"))->AsString()->CheckString(); filter->pred = pred ? pred->AsFunc() : 0; filter->path_func = path_func ? path_func->AsFunc() : 0; - filter->writer = id->Ref()->AsEnumVal(); + filter->writer = writer->Ref()->AsEnumVal(); filter->local = fval->LookupWithDefault(rtype->FieldOffset("log_local"))->AsBool(); filter->remote = fval->LookupWithDefault(rtype->FieldOffset("log_remote"))->AsBool(); @@ -318,8 +465,11 @@ bool LogMgr::AddFilter(EnumVal* id, RecordVal* fval) stream->filters.push_back(filter); #ifdef DEBUG + ODesc desc; + writer->Describe(&desc); + DBG_LOG(DBG_LOGGING, "Created new filter '%s' for stream '%s'", filter->name.c_str(), stream->name.c_str()); - DBG_LOG(DBG_LOGGING, " writer : %d", filter->writer); + DBG_LOG(DBG_LOGGING, " writer : %s", desc.Description()); DBG_LOG(DBG_LOGGING, " path : %s", filter->path.c_str()); DBG_LOG(DBG_LOGGING, " path_func : %s", (filter->path_func ? "set" : "not set")); DBG_LOG(DBG_LOGGING, " pred : %s", (filter->pred ? "set" : "not set")); @@ -371,7 +521,7 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns) if ( ! columns ) { - run_time("imcompatible log record type"); + run_time("incompatible log record type"); return false; } @@ -423,10 +573,6 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns) #endif } - if ( ! filter->local ) - // Skip the subsequent local logging code. - continue; - // See if we already have a writer for this path. Stream::WriterMap::iterator w = stream->writers.find(Stream::WriterPathPair(filter->writer->AsEnum(), path)); @@ -445,18 +591,29 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns) for ( int j = 0; j < filter->num_fields; ++j ) arg_fields[j] = new LogField(*filter->fields[j]); - writer = CreateWriter(stream->id, filter->writer, path, filter->num_fields, arg_fields); - - if ( ! writer ) + if ( filter->local ) { - Unref(columns); - return false; + writer = CreateWriter(stream->id, filter->writer, path, filter->num_fields, arg_fields); + + if ( ! writer ) + { + Unref(columns); + return false; + } } + + if ( filter->remote ) + remote_serializer->SendLogCreateWriter(stream->id, filter->writer, path, filter->num_fields, arg_fields); } // Alright, can do the write now. + LogVal** vals = RecordToFilterVals(filter, columns); - if ( ! writer->Write(filter->num_fields, vals) ) + + if ( filter->remote ) + remote_serializer->SendLogWrite(stream->id, filter->writer, path, filter->num_fields, vals); + + if ( filter->local && ! writer->Write(filter->num_fields, vals) ) error = true; #ifdef DEBUG @@ -501,30 +658,28 @@ LogVal** LogMgr::RecordToFilterVals(Filter* filter, RecordVal* columns) if ( ! val ) continue; + vals[i] = new LogVal(type); + switch ( val->Type()->Tag() ) { case TYPE_BOOL: case TYPE_INT: case TYPE_ENUM: - vals[i] = new LogVal(type); vals[i]->val.int_val = val->InternalInt(); break; case TYPE_COUNT: case TYPE_COUNTER: case TYPE_PORT: - vals[i] = new LogVal(type); vals[i]->val.uint_val = val->InternalUnsigned(); break; case TYPE_SUBNET: - vals[i] = new LogVal(type); vals[i]->val.subnet_val = *val->AsSubNet(); break; case TYPE_NET: case TYPE_ADDR: { - vals[i] = new LogVal(type); addr_type t = val->AsAddr(); copy_addr(&t, &vals[i]->val.addr_val); break; @@ -533,18 +688,13 @@ LogVal** LogMgr::RecordToFilterVals(Filter* filter, RecordVal* columns) case TYPE_DOUBLE: case TYPE_TIME: case TYPE_INTERVAL: - vals[i] = new LogVal(type); vals[i]->val.double_val = val->InternalDouble(); break; case TYPE_STRING: { const BroString* s = val->AsString(); - LogVal* lval = (LogVal*) new char[sizeof(LogVal) + sizeof(log_string_type) + s->Len()]; - new (lval) LogVal(type); // Run ctor. - lval->val.string_val.len = s->Len(); - memcpy(&lval->val.string_val.string, s->Bytes(), s->Len()); - vals[i] = lval; + vals[i]->val.string_val = new string((const char*) s->Bytes(), s->Len()); break; } @@ -600,6 +750,7 @@ LogWriter* LogMgr::CreateWriter(EnumVal* id, EnumVal* writer, string path, int n // Init failed, disable by deleting factory function. ld->factory = 0; + DBG_LOG(DBG_LOGGING, "failed to init writer class %s", ld->name); return false; } @@ -610,7 +761,10 @@ LogWriter* LogMgr::CreateWriter(EnumVal* id, EnumVal* writer, string path, int n LogWriter* writer_obj = (*ld->factory)(); if ( ! writer_obj->Init(path, num_fields, fields) ) + { + DBG_LOG(DBG_LOGGING, "failed to init instance of writer %s", ld->name); return 0; + } stream->writers.insert(Stream::WriterMap::value_type(Stream::WriterPathPair(writer->AsEnum(), path), writer_obj)); @@ -622,24 +776,54 @@ bool LogMgr::Write(EnumVal* id, EnumVal* writer, string path, int num_fields, Lo Stream* stream = FindStream(id); if ( ! stream ) + { // Don't know this stream. +#ifdef DEBUG + ODesc desc; + id->Describe(&desc); + DBG_LOG(DBG_LOGGING, "unknown stream %s in LogMgr::Write()", desc.Description()); +#endif return false; + } Stream::WriterMap::iterator w = stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), path)); if ( w == stream->writers.end() ) + { // Don't know this writer. +#ifdef DEBUG + ODesc desc; + id->Describe(&desc); + DBG_LOG(DBG_LOGGING, "unknown writer %s in LogMgr::Write()", desc.Description()); +#endif return false; + } bool success = w->second->Write(num_fields, vals); -#ifdef DEBUG - DBG_LOG(DBG_LOGGING, "Wrote pre-filtered record to '%s' on stream '%s'", path.c_str(), stream->name.c_str()); -#endif + DBG_LOG(DBG_LOGGING, "Wrote pre-filtered record to path '%s' on stream '%s' [%s]", path.c_str(), stream->name.c_str(), (success ? "ok" : "error")); return success; } +void LogMgr::SendAllWritersTo(RemoteSerializer::PeerID peer) + { + for ( vector::iterator s = streams.begin(); s != streams.end(); ++s ) + { + Stream* stream = (*s); + + if ( ! stream ) + continue; + + for ( Stream::WriterMap::iterator i = stream->writers.begin(); i != stream->writers.end(); i++ ) + { + LogWriter* writer = i->second; + EnumVal writer_val(i->first.first, BifType::Enum::Log::Writer); + remote_serializer->SendLogCreateWriter(peer, (*s)->id, &writer_val, i->first.second, writer->NumFields(), writer->Fields()); + } + } + } + bool LogMgr::SetBuf(EnumVal* id, bool enabled) { Stream* stream = FindStream(id); @@ -672,3 +856,4 @@ void LogMgr::Error(LogWriter* writer, const char* msg) { run_time(fmt("error with writer for %s: %s", writer->Path().c_str(), msg)); } + diff --git a/src/LogMgr.h b/src/LogMgr.h index ca1cda8e96..069ddfdff2 100644 --- a/src/LogMgr.h +++ b/src/LogMgr.h @@ -6,24 +6,31 @@ #include "Val.h" #include "EventHandler.h" +#include "RemoteSerializer.h" + +class SerializationFormat; struct LogField { - LogField() { } - LogField(const LogField& other) : name(other.name), type(other.type) { } string name; TypeTag type; -}; -// A string that we can directly include as part of the value union below. -struct log_string_type { - int len; - char string[]; // The string starts right here. + LogField() { } + LogField(const LogField& other) : name(other.name), type(other.type) { } + + bool Read(SerializationFormat* fmt) + { + int t; + bool success = fmt->Read(&name, "name") && fmt->Read(&t, "type"); + type = (TypeTag) t; + return success; + } + + bool Write(SerializationFormat* fmt) const + { return fmt->Write(name, "name") && fmt->Write((int)type, "type"); } }; // All values that can be directly logged by a Writer. struct LogVal { - LogVal(TypeTag arg_type, bool arg_present = true) : type(arg_type), present(arg_present) {} - TypeTag type; bool present; // If false, the field is unset (i.e., &optional and not initialzed). @@ -35,8 +42,17 @@ struct LogVal { addr_type addr_val; subnet_type subnet_val; double double_val; - log_string_type string_val; + string* string_val; } val; + + LogVal(TypeTag arg_type = TYPE_ERROR, bool arg_present = true) : type(arg_type), present(arg_present) {} + ~LogVal() { if ( type == TYPE_STRING && present ) delete val.string_val; } + + bool Read(SerializationFormat* fmt); + bool Write(SerializationFormat* fmt) const; + +private: + LogVal(const LogVal& other) { } }; class LogWriter; @@ -60,10 +76,10 @@ protected: friend class LogWriter; friend class RemoteSerializer; - // These function are also used by the RemoteSerializer to inject - // received logs. - LogWriter* CreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, LogField** fields); - bool Write(EnumVal* id, EnumVal* writer, string path, int num_fields, LogVal** vals); + // These function are also used by the RemoteSerializer. + LogWriter* CreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, LogField** fields); // takes ownership of fields. + bool Write(EnumVal* id, EnumVal* writer, string path, int num_fields, LogVal** vals); // takes ownership of vals. + void SendAllWritersTo(RemoteSerializer::PeerID peer); /// Functions also used by the writers. diff --git a/src/LogWriter.cc b/src/LogWriter.cc index 53a36476a3..6c27bb55b5 100644 --- a/src/LogWriter.cc +++ b/src/LogWriter.cc @@ -38,11 +38,17 @@ bool LogWriter::Write(int arg_num_fields, LogVal** vals) // Double-check that the arguments match. If we get this from remote, // something might be mixed up. if ( num_fields != arg_num_fields ) + { + DBG_LOG(DBG_LOGGING, "Number of fields don't match in LogWriter::Write() (%d vs. %d)", arg_num_fields, num_fields); return false; + } for ( int i = 0; i < num_fields; ++i ) if ( vals[i]->type != fields[i]->type ) + { + DBG_LOG(DBG_LOGGING, "Field type doesn't match in LogWriter::Write() (%d vs. %d)", vals[i]->type, fields[i]->type); return false; + } bool result = DoWrite(num_fields, fields, vals); DeleteVals(vals); diff --git a/src/LogWriter.h b/src/LogWriter.h index 86ca4d10a8..0dd37d2881 100644 --- a/src/LogWriter.h +++ b/src/LogWriter.h @@ -43,6 +43,10 @@ public: // performed. void Finish(); + + int NumFields() const { return num_fields; } + const LogField* const * Fields() const { return fields; } + protected: // Methods for Writers to override. If any of these returs false, it will diff --git a/src/LogWriterAscii.cc b/src/LogWriterAscii.cc index 1b9eab68c8..902cbf7ea2 100644 --- a/src/LogWriterAscii.cc +++ b/src/LogWriterAscii.cc @@ -114,7 +114,7 @@ bool LogWriterAscii::DoWrite(int num_fields, LogField** fields, LogVal** vals) break; case TYPE_STRING: - desc.AddN((const char*)&val->val.string_val.string, val->val.string_val.len); + desc.AddN(val->val.string_val->data(), val->val.string_val->size()); break; default: diff --git a/src/RemoteSerializer.cc b/src/RemoteSerializer.cc index 15b1872680..f1224797bc 100644 --- a/src/RemoteSerializer.cc +++ b/src/RemoteSerializer.cc @@ -183,6 +183,7 @@ #include "Sessions.h" #include "File.h" #include "Conn.h" +#include "LogMgr.h" extern "C" { #include "setsignal.h" @@ -190,7 +191,7 @@ extern "C" { // Gets incremented each time there's an incompatible change // to the communication internals. -static const unsigned short PROTOCOL_VERSION = 0x06; +static const unsigned short PROTOCOL_VERSION = 0x07; static const char MSG_NONE = 0x00; static const char MSG_VERSION = 0x01; @@ -216,9 +217,12 @@ static const char MSG_SYNC_POINT = 0x14; static const char MSG_TERMINATE = 0x15; static const char MSG_DEBUG_DUMP = 0x16; static const char MSG_REMOTE_PRINT = 0x17; +static const char MSG_LOG_CREATE_WRITER = 0x18; +static const char MSG_LOG_WRITE = 0x19; +static const char MSG_REQUEST_LOGS = 0x20; // Update this one whenever adding a new ID: -static const char MSG_ID_MAX = MSG_REMOTE_PRINT; +static const char MSG_ID_MAX = MSG_REQUEST_LOGS; static const uint32 FINAL_SYNC_POINT = /* UINT32_MAX */ 4294967295U; @@ -226,6 +230,9 @@ static const uint32 FINAL_SYNC_POINT = /* UINT32_MAX */ 4294967295U; static const int PRINT_BUFFER_SIZE = 10 * 1024; static const int SOCKBUF_SIZE = 1024 * 1024; +// Buffer size for remote-log data. +static const int LOG_BUFFER_SIZE = 50 * 1024; + struct ping_args { uint32 seq; double time1; // Round-trip time parent1<->parent2 @@ -303,6 +310,9 @@ static const char* msgToStr(int msg) MSG_STR(MSG_TERMINATE) MSG_STR(MSG_DEBUG_DUMP) MSG_STR(MSG_REMOTE_PRINT) + MSG_STR(MSG_LOG_CREATE_WRITER) + MSG_STR(MSG_LOG_WRITE) + MSG_STR(MSG_REQUEST_LOGS) default: return "UNKNOWN_MSG"; } @@ -469,7 +479,10 @@ static inline bool is_peer_msg(int msg) msg == MSG_CAPS || msg == MSG_COMPRESS || msg == MSG_SYNC_POINT || - msg == MSG_REMOTE_PRINT; + msg == MSG_REMOTE_PRINT || + msg == MSG_LOG_CREATE_WRITER || + msg == MSG_LOG_WRITE || + msg == MSG_REQUEST_LOGS; } bool RemoteSerializer::IsConnectedPeer(PeerID id) @@ -704,6 +717,7 @@ bool RemoteSerializer::CloseConnection(Peer* peer) return true; FlushPrintBuffer(peer); + FlushLogBuffer(peer); Log(LogInfo, "closing connection", peer); @@ -738,6 +752,31 @@ bool RemoteSerializer::RequestSync(PeerID id, bool auth) return true; } +bool RemoteSerializer::RequestLogs(PeerID id) + { + if ( ! using_communication ) + return true; + + Peer* peer = LookupPeer(id, true); + if ( ! peer ) + { + run_time(fmt("unknown peer id %d for request logs", int(id))); + return false; + } + + if ( peer->phase != Peer::HANDSHAKE ) + { + run_time(fmt("can't request logs from peer; wrong phase %d", + peer->phase)); + return false; + } + + if ( ! SendToChild(MSG_REQUEST_LOGS, peer, 0) ) + return false; + + return true; + } + bool RemoteSerializer::RequestEvents(PeerID id, RE_Matcher* pattern) { if ( ! using_communication ) @@ -1443,6 +1482,7 @@ bool RemoteSerializer::Poll(bool may_block) case MSG_PHASE_DONE: case MSG_TERMINATE: case MSG_DEBUG_DUMP: + case MSG_REQUEST_LOGS: { // No further argument chunk. msgstate = TYPE; @@ -1465,6 +1505,8 @@ bool RemoteSerializer::Poll(bool may_block) case MSG_LOG: case MSG_SYNC_POINT: case MSG_REMOTE_PRINT: + case MSG_LOG_CREATE_WRITER: + case MSG_LOG_WRITE: { // One further argument chunk. msgstate = ARGS; @@ -1601,6 +1643,15 @@ bool RemoteSerializer::DoMessage() case MSG_REMOTE_PRINT: return ProcessRemotePrint(); + case MSG_LOG_CREATE_WRITER: + return ProcessLogCreateWriter(); + + case MSG_LOG_WRITE: + return ProcessLogWrite(); + + case MSG_REQUEST_LOGS: + return ProcessRequestLogs(); + default: DEBUG_COMM(fmt("unexpected msg type: %d", int(current_msgtype))); @@ -1663,6 +1714,7 @@ void RemoteSerializer::PeerConnected(Peer* peer) peer->cache_out->Clear(); peer->our_runtime = int(current_time(true) - bro_start_time); peer->sync_point = 0; + peer->logs_requested = false; if ( ! SendCMsgToChild(MSG_VERSION, peer) ) return; @@ -1725,6 +1777,7 @@ RemoteSerializer::Peer* RemoteSerializer::AddPeer(uint32 ip, uint16 port, peer->orig = false; peer->accept_state = false; peer->send_state = false; + peer->logs_requested = false; peer->caps = 0; peer->comp_level = 0; peer->suspended_processing = false; @@ -1735,6 +1788,8 @@ RemoteSerializer::Peer* RemoteSerializer::AddPeer(uint32 ip, uint16 port, peer->sync_point = 0; peer->print_buffer = 0; peer->print_buffer_used = 0; + peer->log_buffer = 0; + peer->log_buffer_used = 0; peers.append(peer); Log(LogInfo, "added peer", peer); @@ -1767,6 +1822,7 @@ void RemoteSerializer::RemovePeer(Peer* peer) int id = peer->id; Unref(peer->val); delete [] peer->print_buffer; + delete [] peer->log_buffer; delete peer->cache_in; delete peer->cache_out; delete peer; @@ -1960,6 +2016,19 @@ bool RemoteSerializer::ProcessRequestSyncMsg() return true; } +bool RemoteSerializer::ProcessRequestLogs() + { + if ( ! current_peer ) + return false; + + Log(LogInfo, "peer requested logs", current_peer); + + current_peer->logs_requested = true; + current_peer->log_buffer = new char[LOG_BUFFER_SIZE]; + current_peer->log_buffer_used = 0; + return true; + } + bool RemoteSerializer::ProcessPhaseDone() { switch ( current_peer->phase ) { @@ -2024,6 +2093,9 @@ bool RemoteSerializer::HandshakeDone(Peer* peer) if ( (peer->caps & Peer::NEW_CACHE_STRATEGY) ) Log(LogInfo, "peer supports keep-in-cache; using that", peer); + if ( peer->logs_requested ) + log_mgr->SendAllWritersTo(peer->id); + if ( peer->sync_requested != Peer::NONE ) { if ( in_sync ) @@ -2377,6 +2449,260 @@ bool RemoteSerializer::ProcessRemotePrint() return true; } +bool RemoteSerializer::SendLogCreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, const LogField* const * fields) + { + loop_over_list(peers, i) + { + SendLogCreateWriter(peers[i]->id, id, writer, path, num_fields, fields); + } + + return true; + } + +bool RemoteSerializer::SendLogCreateWriter(PeerID peer_id, EnumVal* id, EnumVal* writer, string path, int num_fields, const LogField* const * fields) + { + SetErrorDescr("logging"); + + ChunkedIO::Chunk* c = 0; + + Peer* peer = LookupPeer(peer_id, true); + if ( ! peer ) + return false; + + if ( peer->phase != Peer::HANDSHAKE && peer->phase != Peer::RUNNING ) + return false; + + BinarySerializationFormat fmt; + + fmt.StartWrite(); + + bool success = fmt.Write(id->AsEnum(), "id") && + fmt.Write(writer->AsEnum(), "writer") && + fmt.Write(path, "path") && + fmt.Write(num_fields, "num_fields"); + + if ( ! success ) + goto error; + + for ( int i = 0; i < num_fields; i++ ) + { + if ( ! fields[i]->Write(&fmt) ) + goto error; + } + + c = new ChunkedIO::Chunk; + c->len = fmt.EndWrite(&c->data); + + if ( ! SendToChild(MSG_LOG_CREATE_WRITER, peer, 0) ) + goto error; + + if ( ! SendToChild(c) ) + goto error; + + return true; + +error: + FatalError(io->Error()); + return false; + } + +bool RemoteSerializer::SendLogWrite(EnumVal* id, EnumVal* writer, string path, int num_fields, const LogVal* const * vals) + { + loop_over_list(peers, i) + { + SendLogWrite(peers[i], id, writer, path, num_fields, vals); + } + + return true; + } + +bool RemoteSerializer::SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const LogVal* const * vals) + { + if ( peer->phase != Peer::HANDSHAKE && peer->phase != Peer::RUNNING ) + return false; + + if ( ! peer->logs_requested ) + return false; + + assert(peer->log_buffer); + + // Serialize the log record entry. + + BinarySerializationFormat fmt; + + fmt.StartWrite(); + + bool success = fmt.Write(id->AsEnum(), "id") && + fmt.Write(writer->AsEnum(), "writer") && + fmt.Write(path, "path") && + fmt.Write(num_fields, "num_fields"); + + if ( ! success ) + goto error; + + for ( int i = 0; i < num_fields; i++ ) + { + if ( ! vals[i]->Write(&fmt) ) + goto error; + } + + // Ok, we have the binary data now. + char* data; + int len; + + len = fmt.EndWrite(&data); + + // Do we have enough space in the buffer? If not, flush first. + if ( len > (LOG_BUFFER_SIZE - peer->log_buffer_used) ) + FlushLogBuffer(peer); + + // If the data is actually larger than our complete buffer, just send it out. + if ( len > LOG_BUFFER_SIZE ) + return SendToChild(MSG_LOG_WRITE, peer, data, len); + + // Now we have space in the buffer, copy it into there. + memcpy(peer->log_buffer + peer->log_buffer_used, data, len); + peer->log_buffer_used += len; + assert(peer->log_buffer_used <= LOG_BUFFER_SIZE); + + FlushLogBuffer(peer); + return false; + +error: + FatalError(io->Error()); + return false; + } + +bool RemoteSerializer::FlushLogBuffer(Peer* p) + { + if ( p->state == Peer::CLOSING ) + return false; + + if ( ! p->log_buffer ) + return true; + + SendToChild(MSG_LOG_WRITE, p, p->log_buffer, p->log_buffer_used); + + p->log_buffer = new char[LOG_BUFFER_SIZE]; + p->log_buffer_used = 0; + return true; + } + +bool RemoteSerializer::ProcessLogCreateWriter() + { + if ( current_peer->state == Peer::CLOSING ) + return false; + + assert(current_args); + + EnumVal* id_val = 0; + EnumVal* writer_val = 0; + LogField** fields = 0; + + BinarySerializationFormat fmt; + fmt.StartRead(current_args->data, current_args->len); + + int id, writer; + string path; + int num_fields; + + bool success = fmt.Read(&id, "id") && + fmt.Read(&writer, "writer") && + fmt.Read(&path, "path") && + fmt.Read(&num_fields, "num_fields"); + + if ( ! success ) + goto error; + + fields = new LogField* [num_fields]; + + for ( int i = 0; i < num_fields; i++ ) + { + fields[i] = new LogField; + if ( ! fields[i]->Read(&fmt) ) + goto error; + } + + fmt.EndRead(); + + id_val = new EnumVal(id, BifType::Enum::Log::ID); + writer_val = new EnumVal(writer, BifType::Enum::Log::Writer); + + if ( ! log_mgr->CreateWriter(id_val, writer_val, path, num_fields, fields) ) + goto error; + + Unref(id_val); + Unref(writer_val); + + return true; + +error: + Unref(id_val); + Unref(writer_val); + + Error("write error for creating writer"); + return false; + } + +bool RemoteSerializer::ProcessLogWrite() + { + if ( current_peer->state == Peer::CLOSING ) + return false; + + assert(current_args); + + BinarySerializationFormat fmt; + fmt.StartRead(current_args->data, current_args->len); + + while ( fmt.BytesRead() != (int)current_args->len ) + { + // Unserialize one entry. + EnumVal* id_val = 0; + EnumVal* writer_val = 0; + LogVal** vals = 0; + + int id, writer; + string path; + int num_fields; + + bool success = fmt.Read(&id, "id") && + fmt.Read(&writer, "writer") && + fmt.Read(&path, "path") && + fmt.Read(&num_fields, "num_fields"); + + if ( ! success ) + goto error; + + vals = new LogVal* [num_fields]; + + for ( int i = 0; i < num_fields; i++ ) + { + vals[i] = new LogVal; + if ( ! vals[i]->Read(&fmt) ) + goto error; + } + + id_val = new EnumVal(id, BifType::Enum::Log::ID); + writer_val = new EnumVal(writer, BifType::Enum::Log::Writer); + + success = log_mgr->Write(id_val, writer_val, path, num_fields, vals); + + Unref(id_val); + Unref(writer_val); + + if ( ! success ) + goto error; + + } + + fmt.EndRead(); + + return true; + +error: + Error("write error for log entry"); + return false; + } void RemoteSerializer::GotEvent(const char* name, double time, EventHandlerPtr event, val_list* args) @@ -3048,6 +3374,7 @@ bool SocketComm::ProcessParentMessage() case MSG_TERMINATE: case MSG_PHASE_DONE: case MSG_DEBUG_DUMP: + case MSG_REQUEST_LOGS: { // No further argument chunk. parent_msgstate = TYPE; @@ -3067,6 +3394,8 @@ bool SocketComm::ProcessParentMessage() case MSG_CAPS: case MSG_SYNC_POINT: case MSG_REMOTE_PRINT: + case MSG_LOG_CREATE_WRITER: + case MSG_LOG_WRITE: { // One further argument chunk. parent_msgstate = ARGS; @@ -3167,18 +3496,6 @@ bool SocketComm::DoParentMessage() return true; } - case MSG_PHASE_DONE: - { - // No argument block follows. - if ( parent_peer && parent_peer->connected ) - { - DEBUG_COMM("child: forwarding with MSG_PHASE_DONE to peer"); - if ( ! SendToPeer(parent_peer, MSG_PHASE_DONE, 0) ) - return false; - } - return true; - } - case MSG_LISTEN: return ProcessListen(); @@ -3206,6 +3523,20 @@ bool SocketComm::DoParentMessage() return ForwardChunkToPeer(); } + case MSG_PHASE_DONE: + case MSG_REQUEST_LOGS: + { + // No argument block follows. + if ( parent_peer && parent_peer->connected ) + { + DEBUG_COMM(fmt("child: forwarding %s to peer", msgToStr(parent_msgtype))); + if ( ! SendToPeer(parent_peer, parent_msgtype, 0) ) + return false; + } + + return true; + } + case MSG_REQUEST_EVENTS: case MSG_REQUEST_SYNC: case MSG_SERIAL: @@ -3214,6 +3545,8 @@ bool SocketComm::DoParentMessage() case MSG_CAPS: case MSG_SYNC_POINT: case MSG_REMOTE_PRINT: + case MSG_LOG_CREATE_WRITER: + case MSG_LOG_WRITE: assert(parent_args); return ForwardChunkToPeer(); @@ -3332,6 +3665,7 @@ bool SocketComm::ProcessRemoteMessage(SocketComm::Peer* peer) switch ( msg->Type() ) { case MSG_PHASE_DONE: + case MSG_REQUEST_LOGS: // No further argument block. DEBUG_COMM("child: forwarding with 0 args to parent"); if ( ! SendToParent(msg->Type(), peer, 0) ) @@ -3388,6 +3722,8 @@ bool SocketComm::ProcessRemoteMessage(SocketComm::Peer* peer) case MSG_CAPS: case MSG_SYNC_POINT: case MSG_REMOTE_PRINT: + case MSG_LOG_CREATE_WRITER: + case MSG_LOG_WRITE: { // Messages with one further argument block which we simply // forward to our parent. diff --git a/src/RemoteSerializer.h b/src/RemoteSerializer.h index 6afec4ec6f..20886544c6 100644 --- a/src/RemoteSerializer.h +++ b/src/RemoteSerializer.h @@ -16,6 +16,8 @@ // FIXME: Change this to network byte order class IncrementalSendTimer; +class LogField; +class LogVal; // This class handles the communication done in Bro's main loop. class RemoteSerializer : public Serializer, public IOSource { @@ -42,6 +44,9 @@ public: // the peer right after the handshake. bool RequestSync(PeerID peer, bool auth); + // Requests logs from the remote side. + bool RequestLogs(PeerID id); + // Sets flag whether we're accepting state from this peer // (default: yes). bool SetAcceptState(PeerID peer, bool accept); @@ -92,6 +97,15 @@ public: // Broadcast remote print. bool SendPrintHookEvent(BroFile* f, const char* txt); + // Send a request to create a writer on a remote side. + bool SendLogCreateWriter(PeerID peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const LogField* const * fields); + + // Broadcasts a request to create a writer. + bool SendLogCreateWriter(EnumVal* id, EnumVal* writer, string path, int num_fields, const LogField* const * fields); + + // Broadcast a log entry to everybody interested. + bool SendLogWrite(EnumVal* id, EnumVal* writer, string path, int num_fields, const LogVal* const * vals); + // Synchronzizes time with all connected peers. Returns number of // current sync-point, or -1 on error. uint32 SendSyncPoint(); @@ -205,6 +219,7 @@ protected: bool accept_state; // True if we accept state from peer. bool send_state; // True if we're supposed to initially sent our state. int comp_level; // Compression level. + bool logs_requested; // True if the peer has requested logs. // True if this peer triggered a net_suspend_processing(). bool suspended_processing; @@ -217,6 +232,8 @@ protected: uint32 sync_point; // Highest sync-point received so far char* print_buffer; // Buffer for remote print or null. int print_buffer_used; // Number of bytes used in buffer. + char* log_buffer; // Buffer for remote log or null. + int log_buffer_used; // Number of bytes used in buffer. }; // Shuts down remote serializer. @@ -255,6 +272,9 @@ protected: bool ProcessCapsMsg(); bool ProcessSyncPointMsg(); bool ProcessRemotePrint(); + bool ProcessLogCreateWriter(); + bool ProcessLogWrite(); + bool ProcessRequestLogs(); Peer* AddPeer(uint32 ip, uint16 port, PeerID id = PEER_NONE); Peer* LookupPeer(PeerID id, bool only_if_connected); @@ -282,11 +302,13 @@ protected: bool SendID(SerialInfo* info, Peer* peer, const ID& id); bool SendCapabilities(Peer* peer); bool SendPacket(SerialInfo* info, Peer* peer, const Packet& p); + bool SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, string path, int num_fields, const LogVal* const * vals); void UnregisterHandlers(Peer* peer); void RaiseEvent(EventHandlerPtr event, Peer* peer, const char* arg = 0); bool EnterPhaseRunning(Peer* peer); bool FlushPrintBuffer(Peer* p); + bool FlushLogBuffer(Peer* p); void ChildDied(); void InternalCommError(const char* msg); diff --git a/src/SerializationFormat.cc b/src/SerializationFormat.cc index 55e35eb30e..c181107870 100644 --- a/src/SerializationFormat.cc +++ b/src/SerializationFormat.cc @@ -44,6 +44,7 @@ void SerializationFormat::StartWrite() output_pos = 0; bytes_written = 0; + bytes_read = 0; } uint32 SerializationFormat::EndWrite(char** data) @@ -64,6 +65,8 @@ bool SerializationFormat::ReadData(void* b, size_t count) memcpy(b, input + input_pos, count); input_pos += count; + bytes_read += count; + return true; } @@ -214,6 +217,20 @@ bool BinarySerializationFormat::Read(char** str, int* len, const char* tag) return true; } +bool BinarySerializationFormat::Read(string* v, const char* tag) + { + char* buffer; + int len; + + if ( ! Read(&buffer, &len, tag) ) + return false; + + *v = string(buffer, len); + + delete [] buffer; + return true; + } + bool BinarySerializationFormat::Write(char v, const char* tag) { DBG_LOG(DBG_SERIAL, "Write char %s [%s]", fmt_bytes(&v, 1), tag); @@ -278,6 +295,11 @@ bool BinarySerializationFormat::Write(const char* s, const char* tag) return Write(s, strlen(s), tag); } +bool BinarySerializationFormat::Write(const string& s, const char* tag) + { + return Write(s.data(), s.size(), tag); + } + bool BinarySerializationFormat::WriteOpenTag(const char* tag) { return true; @@ -362,6 +384,12 @@ bool XMLSerializationFormat::Read(char** str, int* len, const char* tag) return false; } +bool XMLSerializationFormat::Read(string* s, const char* tag) + { + internal_error("no reading of xml"); + return false; + } + bool XMLSerializationFormat::Write(char v, const char* tag) { return WriteElem(tag, "char", &v, 1); @@ -416,6 +444,11 @@ bool XMLSerializationFormat::Write(const char* s, const char* tag) return WriteElem(tag, "string", s, strlen(s)); } +bool XMLSerializationFormat::Write(const string& s, const char* tag) + { + return WriteElem(tag, "string", s.data(), s.size()); + } + bool XMLSerializationFormat::WriteOpenTag(const char* tag) { return WriteData("<", 1) && WriteData(tag, strlen(tag) && WriteData(">", 1)); diff --git a/src/SerializationFormat.h b/src/SerializationFormat.h index 8127343274..b9c7ec1549 100644 --- a/src/SerializationFormat.h +++ b/src/SerializationFormat.h @@ -5,6 +5,10 @@ #ifndef SERIALIZATION_FORMAT #define SERIALIZATION_FORMAT +#include + +using namespace std; + #include "util.h" // Abstract base class. @@ -25,6 +29,10 @@ public: virtual bool Read(char* v, const char* tag) = 0; virtual bool Read(bool* v, const char* tag) = 0; virtual bool Read(double* d, const char* tag) = 0; + virtual bool Read(string* s, const char* tag) = 0; + + // Returns number of raw bytes read since last call to StartRead(). + int BytesRead() const { return bytes_read; } // Passes ownership of string. virtual bool Read(char** str, int* len, const char* tag) = 0; @@ -43,6 +51,7 @@ public: virtual bool Write(double d, const char* tag) = 0; virtual bool Write(const char* s, const char* tag) = 0; virtual bool Write(const char* buf, int len, const char* tag) = 0; + virtual bool Write(const string& s, const char* tag) = 0; virtual bool WriteOpenTag(const char* tag) = 0; virtual bool WriteCloseTag(const char* tag) = 0; @@ -65,6 +74,7 @@ protected: uint32 input_pos; int bytes_written; + int bytes_read; }; class BinarySerializationFormat : public SerializationFormat { @@ -81,6 +91,7 @@ public: virtual bool Read(bool* v, const char* tag); virtual bool Read(double* d, const char* tag); virtual bool Read(char** str, int* len, const char* tag); + virtual bool Read(string* s, const char* tag); virtual bool Write(int v, const char* tag); virtual bool Write(uint16 v, const char* tag); virtual bool Write(uint32 v, const char* tag); @@ -91,6 +102,7 @@ public: virtual bool Write(double d, const char* tag); virtual bool Write(const char* s, const char* tag); virtual bool Write(const char* buf, int len, const char* tag); + virtual bool Write(const string& s, const char* tag); virtual bool WriteOpenTag(const char* tag); virtual bool WriteCloseTag(const char* tag); virtual bool WriteSeparator(); @@ -112,6 +124,7 @@ public: virtual bool Write(double d, const char* tag); virtual bool Write(const char* s, const char* tag); virtual bool Write(const char* buf, int len, const char* tag); + virtual bool Write(const string& s, const char* tag); virtual bool WriteOpenTag(const char* tag); virtual bool WriteCloseTag(const char* tag); virtual bool WriteSeparator(); @@ -126,6 +139,7 @@ public: virtual bool Read(bool* v, const char* tag); virtual bool Read(double* d, const char* tag); virtual bool Read(char** str, int* len, const char* tag); + virtual bool Read(string* s, const char* tag); private: // Encodes non-printable characters. diff --git a/src/bro.bif b/src/bro.bif index 054fafec0f..b9140befb3 100644 --- a/src/bro.bif +++ b/src/bro.bif @@ -2108,6 +2108,12 @@ function request_remote_sync%(p: event_peer, auth: bool%) : bool return new Val(remote_serializer->RequestSync(id, auth), TYPE_BOOL); %} +function request_remote_logs%(p: event_peer%) : bool + %{ + RemoteSerializer::PeerID id = p->AsRecordVal()->Lookup(0)->AsCount(); + return new Val(remote_serializer->RequestLogs(id), TYPE_BOOL); + %} + function set_accept_state%(p: event_peer, accept: bool%) : bool %{ RemoteSerializer::PeerID id = p->AsRecordVal()->Lookup(0)->AsCount();