diff --git a/aux/binpac b/aux/binpac index 0f1ecfa972..a0990e61ad 160000 --- a/aux/binpac +++ b/aux/binpac @@ -1 +1 @@ -Subproject commit 0f1ecfa97236635fb93e013404e6b30d6c506ddd +Subproject commit a0990e61ad4a3705bda4cc5a20059af2d1bda4c3 diff --git a/aux/bro-aux b/aux/bro-aux index b1e75f6a21..7660b5f4c5 160000 --- a/aux/bro-aux +++ b/aux/bro-aux @@ -1 +1 @@ -Subproject commit b1e75f6a212250b1730a438f27fc778618b67ec3 +Subproject commit 7660b5f4c5be40aa5f3a7c8746fdcf68331f9b93 diff --git a/aux/broccoli b/aux/broccoli index ed52e3414b..765eab50f7 160000 --- a/aux/broccoli +++ b/aux/broccoli @@ -1 +1 @@ -Subproject commit ed52e3414b31b05ec9abed627b4153c8e2243441 +Subproject commit 765eab50f7796fdb3c308fe9232cd7891f098c67 diff --git a/aux/broctl b/aux/broctl index 73dbc79ac2..f6d451520e 160000 --- a/aux/broctl +++ b/aux/broctl @@ -1 +1 @@ -Subproject commit 73dbc79ac24cdfef07d8574a4da5d43056ba5fa5 +Subproject commit f6d451520eaaaae97aab6df2bb4e0aecb6b63e66 diff --git a/aux/broker b/aux/broker index 23def70c44..68a36ed814 160000 --- a/aux/broker +++ b/aux/broker @@ -1 +1 @@ -Subproject commit 23def70c44128d19138029615dd154359286e111 +Subproject commit 68a36ed81480ba935268bcaf7b6f2249d23436da diff --git a/aux/btest b/aux/btest index 9d5c7bcac9..32e582514a 160000 --- a/aux/btest +++ b/aux/btest @@ -1 +1 @@ -Subproject commit 9d5c7bcac9b04710931bc8a42b545f0691561b2f +Subproject commit 32e582514ae044befa8e0511083bf11a51408a1d diff --git a/aux/plugins b/aux/plugins index 2322840bcd..0a2f021527 160000 --- a/aux/plugins +++ b/aux/plugins @@ -1 +1 @@ -Subproject commit 2322840bcdbd618ae7bd24e22d874fb30ab89bbb +Subproject commit 0a2f0215270e6ceaf9c1312f705b95d2cce1b530 diff --git a/src/RemoteSerializer.cc b/src/RemoteSerializer.cc index 4842f819b6..2adecfc89a 100644 --- a/src/RemoteSerializer.cc +++ b/src/RemoteSerializer.cc @@ -2730,8 +2730,7 @@ bool RemoteSerializer::ProcessLogCreateWriter() id_val = new EnumVal(id, internal_type("Log::ID")->AsEnumType()); writer_val = new EnumVal(writer, internal_type("Log::Writer")->AsEnumType()); - if ( ! log_mgr->CreateWriter(id_val, writer_val, info, num_fields, fields, - true, false, true) ) + if ( ! log_mgr->CreateWriterForRemoteLog(id_val, writer_val, info, num_fields, fields) ) { delete_fields_up_to = num_fields; goto error; @@ -2803,7 +2802,7 @@ bool RemoteSerializer::ProcessLogWrite() id_val = new EnumVal(id, internal_type("Log::ID")->AsEnumType()); writer_val = new EnumVal(writer, internal_type("Log::Writer")->AsEnumType()); - success = log_mgr->Write(id_val, writer_val, path, num_fields, vals); + success = log_mgr->WriteFromRemote(id_val, writer_val, path, num_fields, vals); Unref(id_val); Unref(writer_val); diff --git a/src/broker/Data.cc b/src/broker/Data.cc index bc4197a974..35d815b9e8 100644 --- a/src/broker/Data.cc +++ b/src/broker/Data.cc @@ -709,3 +709,433 @@ bool bro_broker::DataVal::DoUnserialize(UnserialInfo* info) delete [] serial; return true; } + +static broker::util::optional threading_val_to_data_internal(TypeTag type, const threading::Value::_val& val) + { + switch ( type ) { + case TYPE_BOOL: + return {val.int_val != 0}; + + case TYPE_INT: + return {val.int_val}; + + case TYPE_COUNT: + case TYPE_COUNTER: + return {val.uint_val}; + + case TYPE_PORT: + return {broker::port(val.port_val.port, to_broker_port_proto(val.port_val.proto))}; + + case TYPE_ADDR: + { + IPAddr a; + + switch ( val.addr_val.family ) { + case IPv4: + a = IPAddr(val.addr_val.in.in4); + break; + + case IPv6: + a = IPAddr(val.addr_val.in.in6); + break; + + default: + reporter->InternalError("unsupported protocol family in threading_val_to_data"); + } + + in6_addr tmp; + a.CopyIPv6(&tmp); + return {broker::address(reinterpret_cast(&tmp), + broker::address::family::ipv6, + broker::address::byte_order::network)}; + } + + case TYPE_SUBNET: + { + IPAddr a; + int length; + + switch ( val.subnet_val.prefix.family ) { + case IPv4: + a = IPAddr(val.subnet_val.prefix.in.in4); + length = (val.subnet_val.length - 96); + break; + + case IPv6: + a = IPAddr(val.subnet_val.prefix.in.in6); + length = val.subnet_val.length; + break; + + default: + reporter->InternalError("unsupported protocol family in threading_val_to_data"); + } + + in6_addr tmp; + a.CopyIPv6(&tmp); + + auto s = broker::address(reinterpret_cast(&tmp), + broker::address::family::ipv6, + broker::address::byte_order::network); + fprintf(stderr, "%d\n", val.subnet_val.length); + return {broker::subnet(s, length)}; + } + + case TYPE_DOUBLE: + return {val.double_val}; + + case TYPE_TIME: + return {broker::time_point(val.double_val)}; + + case TYPE_INTERVAL: + return {broker::time_duration(val.double_val)}; + + case TYPE_ENUM: + return {broker::enum_value(std::string(val.string_val.data, val.string_val.length))}; + + case TYPE_STRING: + case TYPE_FILE: + case TYPE_FUNC: + return {std::string(val.string_val.data, val.string_val.length)}; + + case TYPE_TABLE: + { + auto s = broker::set(); + + for ( int i = 0; i < val.set_val.size; ++i ) + { + auto c = bro_broker::threading_val_to_data(val.set_val.vals[i]); + + if ( ! c ) + return {}; + + s.emplace(*c); + } + + return {move(s)}; + } + + case TYPE_VECTOR: + { + auto s = broker::vector(); + + for ( int i = 0; i < val.vector_val.size; ++i ) + { + auto c = bro_broker::threading_val_to_data(val.vector_val.vals[i]); + + if ( ! c ) + return {}; + + s.emplace_back(*c); + } + + return {move(s)}; + } + + default: + reporter->InternalError("unsupported type %s in threading_val_to_data", + type_name(type)); + } + + return {}; + } + + +broker::util::optional bro_broker::threading_val_to_data(const threading::Value* v) + { + broker::util::optional d; + + if ( v->present ) + { + d = threading_val_to_data_internal(v->type, v->val); + + if ( ! d ) + return {}; + } + + auto type = broker::record::field(static_cast(v->type)); + auto present = broker::record::field(v->present); + auto data = (v->present) ? broker::record::field(*d) : broker::util::optional(); + + return {broker::record({move(type), move(present), move(data)})}; + }; + +struct threading_val_converter { + using result_type = bool; + + TypeTag type; + threading::Value::_val& val; + + result_type operator()(bool a) + { + if ( type == TYPE_BOOL ) + { + val.int_val = (a ? 1 : 0); + return true; + } + + return false; + } + + result_type operator()(uint64_t a) + { + if ( type == TYPE_COUNT || type == TYPE_COUNTER ) + { + val.uint_val = a; + return true; + } + + return false; + } + + result_type operator()(int64_t a) + { + if ( type == TYPE_INT ) + { + val.int_val = a; + return true; + } + + return false; + } + + result_type operator()(double a) + { + if ( type == TYPE_DOUBLE ) + { + val.double_val = a; + return true; + } + + return false; + } + + + result_type operator()(const std::string& a) + { + if ( type == TYPE_STRING || type == TYPE_FILE || type == TYPE_FUNC ) + { + auto n = a.size(); + val.string_val.length = n; + val.string_val.data = new char[n]; + memcpy(val.string_val.data, a.data(), n); + return true; + } + + return false; + } + + result_type operator()(const broker::address& a) + { + if ( type == TYPE_ADDR ) + { + auto bits = reinterpret_cast(&a.bytes()); + auto b = IPAddr(*bits); + + if ( a.is_v4() ) + { + val.addr_val.family = IPv4; + b.CopyIPv4(&val.addr_val.in.in4); + return true; + } + + if ( a.is_v6() ) + { + val.addr_val.family = IPv6; + b.CopyIPv6(&val.addr_val.in.in6); + return true; + } + } + + return false; + } + + result_type operator()(const broker::subnet& s) + { + if ( type == TYPE_SUBNET ) + { + auto bits = reinterpret_cast(&s.network().bytes()); + auto a = IPAddr(*bits); + + val.subnet_val.length = s.length(); + + if ( s.network().is_v4() ) + { + val.subnet_val.prefix.family = IPv4; + a.CopyIPv4(&val.subnet_val.prefix.in.in4); + val.subnet_val.length += 96; + return true; + } + + if ( s.network().is_v6() ) + { + val.subnet_val.prefix.family = IPv6; + a.CopyIPv6(&val.subnet_val.prefix.in.in6); + return true; + } + } + + return false; + } + + result_type operator()(const broker::port& a) + { + if ( type == TYPE_PORT ) + { + val.port_val.port = a.number(); + val.port_val.proto = bro_broker::to_bro_port_proto(a.type()); + return true; + } + + return false; + } + + result_type operator()(const broker::time_point& a) + { + if ( type == TYPE_TIME ) + { + val.double_val = a.value; + return true; + } + + return false; + } + + result_type operator()(const broker::time_duration& a) + { + if ( type == TYPE_INTERVAL ) + { + val.double_val = a.value; + return true; + } + + return false; + } + + result_type operator()(const broker::enum_value& a) + { + if ( type == TYPE_ENUM ) + { + auto n = a.name.size(); + val.string_val.length = n; + val.string_val.data = new char[n]; + memcpy(val.string_val.data, a.name.data(), n); + return true; + } + + return false; + } + + result_type operator()(const broker::set& a) + { + if ( type == TYPE_TABLE ) + { + val.set_val.size = a.size(); + val.set_val.vals = new threading::Value* [val.set_val.size]; + + auto p = val.set_val.vals; + + for ( auto& i : a ) + *p++ = bro_broker::data_to_threading_val(move(i)); + + return true; + } + + return false; + } + + result_type operator()(const broker::table& a) + { + return false; + } + + result_type operator()(const broker::vector& a) + { + if ( type == TYPE_VECTOR ) + { + val.vector_val.size = a.size(); + val.vector_val.vals = new threading::Value* [val.set_val.size]; + + auto p = val.set_val.vals; + + for ( auto& i : a ) + *p++ = bro_broker::data_to_threading_val(move(i)); + + return true; + } + + return false; + } + + result_type operator()(const broker::record& a) + { + return false; + } +}; + +threading::Value* bro_broker::data_to_threading_val(broker::data d) + { + auto r = broker::get(d); + + if ( ! r ) + return nullptr; + + auto type = broker::get(*r->get(0));; + auto present = broker::get(*r->get(1));; + auto data = *r->get(2); + + if ( ! (type && present) ) + return nullptr; + + auto tv = new threading::Value; + tv->type = static_cast(*type); + tv->present = *present; + + if ( present && ! broker::visit(threading_val_converter{tv->type, tv->val}, data) ) + { + delete tv; + return nullptr; + } + + return tv; + } + +broker::data bro_broker::threading_field_to_data(const threading::Field* f) + { + auto name = broker::record::field(f->name); + auto type = broker::record::field(static_cast(f->type)); + auto subtype = broker::record::field(static_cast(f->subtype)); + auto optional = broker::record::field(f->optional); + + broker::util::optional secondary; + + if ( f->secondary_name ) + secondary = {f->secondary_name}; + + return move(broker::record({name, secondary, type, subtype, optional})); + } + +threading::Field* bro_broker::data_to_threading_field(broker::data d) + { + auto r = broker::get(d); + + if ( ! r ) + return nullptr; + + auto name = broker::get(*r->get(0)); + auto secondary = r->get(1); + auto type = broker::get(*r->get(2)); + auto subtype = broker::get(*r->get(3)); + auto optional = broker::get(*r->get(4)); + + if ( ! (name && type && subtype && optional) ) + return nullptr; + + if ( secondary && ! broker::is(*secondary) ) + return nullptr; + + return new threading::Field(name->c_str(), + secondary ? broker::get(*secondary)->c_str() : nullptr, + static_cast(*type), + static_cast(*subtype), + *optional); + } diff --git a/src/broker/Data.h b/src/broker/Data.h index 0045ad58ad..9e0c8120de 100644 --- a/src/broker/Data.h +++ b/src/broker/Data.h @@ -61,6 +61,36 @@ broker::util::optional val_to_data(Val* v); */ Val* data_to_val(broker::data d, BroType* type, bool require_log_attr = false); +/** + * Convert a Bro threading::Value to a Broker data value. + * @param v a Bro threading::Value. + * @return a Broker data value if the Bro threading::Value could be converted to one. + */ +broker::util::optional threading_val_to_data(const threading::Value* v); + +/** + * Convert a Bro threading::Field to a Broker data value. + * @param v a Bro threading::Field. + * @return a Broker data value if the Bro threading::Field could be converted to one. + */ +broker::data threading_field_to_data(const threading::Field* f); + +/** + * Convert a Broker data value to a Bro threading::Value. + * @param d a Broker data value. + * @return a pointer to a new Bro threading::Value or a nullptr if the conversion was not + * possible. + */ +threading::Value* data_to_threading_val(broker::data d); + +/** + * Convert a Broker data value to a Bro threading::Value. + * @param d a Broker data value. + * @return a pointer to a new Bro threading::Value or a nullptr if the conversion was not + * possible. + */ +threading::Field* data_to_threading_field(broker::data d); + /** * A Bro value which wraps a Broker data value. */ diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 334b7f84f5..76040b129c 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -20,10 +20,17 @@ using namespace std; VectorType* bro_broker::Manager::vector_of_data_type; EnumType* bro_broker::Manager::log_id_type; +EnumType* bro_broker::Manager::writer_id_type; int bro_broker::Manager::send_flags_self_idx; int bro_broker::Manager::send_flags_peers_idx; int bro_broker::Manager::send_flags_unsolicited_idx; +struct unref_guard { + unref_guard(Val* v) : val(v) {} + ~unref_guard() { Unref(val); } + Val* val; +}; + bro_broker::Manager::Manager() : iosource::IOSource(), next_timestamp(-1) { @@ -83,6 +90,7 @@ bool bro_broker::Manager::Enable(Val* broker_endpoint_flags) send_flags_unsolicited_idx = require_field(send_flags_type, "unsolicited"); log_id_type = internal_type("Log::ID")->AsEnumType(); + writer_id_type = internal_type("Log::Writer")->AsEnumType(); bro_broker::opaque_of_data_type = new OpaqueType("Broker::Data"); bro_broker::opaque_of_set_iterator = new OpaqueType("Broker::SetIterator"); @@ -206,8 +214,9 @@ bool bro_broker::Manager::Event(std::string topic, broker::message msg, int flag return true; } -bool bro_broker::Manager::Log(EnumVal* stream, RecordVal* columns, RecordType* info, - int flags) +bool bro_broker::Manager::CreateLog(EnumVal* stream, EnumVal* writer, const logging::WriterBackend::WriterInfo& info, + int num_fields, const threading::Field* const * fields, int flags, + const string& peer) { if ( ! Enabled() ) return false; @@ -221,40 +230,82 @@ bool bro_broker::Manager::Log(EnumVal* stream, RecordVal* columns, RecordType* i return false; } - broker::record column_data; + auto writer_name = writer->Type()->AsEnumType()->Lookup(writer->AsEnum()); - for ( auto i = 0u; i < static_cast(info->NumFields()); ++i ) + if ( ! writer_name ) { - if ( ! info->FieldDecl(i)->FindAttr(ATTR_LOG) ) - continue; + reporter->Error("Failed to remotely log: writer %d doesn't have name", + writer->AsEnum()); + return false; + } - auto field_val = columns->LookupWithDefault(i); + auto writer_info = info.ToBroker(); - if ( ! field_val ) - { - column_data.fields.emplace_back(broker::record::field{}); - continue; - } + broker::vector fields_data; - auto opt_field_data = val_to_data(field_val); - Unref(field_val); + for ( auto i = 0; i < num_fields; ++i ) + { + auto field_data = threading_field_to_data(fields[i]); + fields_data.push_back(move(field_data)); + } - if ( ! opt_field_data ) + // TODO: If peer is given, send message to just that one destination. + + std::string topic = std::string("bro/log/") + stream_name; + auto bstream_name = broker::enum_value(move(stream_name)); + auto bwriter_name = broker::enum_value(move(writer_name)); + broker::message msg{move("create"), move(bstream_name), move(bwriter_name), move(writer_info), move(fields_data)}; + endpoint->send(move(topic), move(msg), flags); + + return true; + } + +bool bro_broker::Manager::Log(EnumVal* stream, EnumVal* writer, string path, int num_vals, const threading::Value* const * vals, int flags) + { + if ( ! Enabled() ) + return false; + + auto stream_name = stream->Type()->AsEnumType()->Lookup(stream->AsEnum()); + + if ( ! stream_name ) + { + reporter->Error("Failed to remotely log: stream %d doesn't have name", + stream->AsEnum()); + return false; + } + + auto writer_name = writer->Type()->AsEnumType()->Lookup(writer->AsEnum()); + + if ( ! writer_name ) + { + reporter->Error("Failed to remotely log: writer %d doesn't have name", + writer->AsEnum()); + return false; + } + + broker::vector vals_data; + + for ( auto i = 0; i < num_vals; ++i ) + { + auto field_data = threading_val_to_data(vals[i]); + + if ( ! field_data ) { reporter->Error("Failed to remotely log stream %s: " - "unsupported type '%s'", - stream_name, - type_name(info->FieldDecl(i)->type->Tag())); + "unsupported type for field #%d", + stream_name, i); return false; } - column_data.fields.emplace_back( - broker::record::field{move(*opt_field_data)}); + vals_data.push_back(move(*field_data)); } - broker::message msg{broker::enum_value{stream_name}, move(column_data)}; std::string topic = std::string("bro/log/") + stream_name; + auto bstream_name = broker::enum_value(move(stream_name)); + auto bwriter_name = broker::enum_value(move(writer_name)); + broker::message msg{move("write"), move(bstream_name), move(bwriter_name), move(path), move(vals_data)}; endpoint->send(move(topic), move(msg), flags); + return true; } @@ -639,6 +690,8 @@ void bro_broker::Manager::Process() { switch ( u.status ) { case broker::outgoing_connection_status::tag::established: + log_mgr->SendAllWritersTo(u.peer_name); + if ( Broker::outgoing_connection_established ) { val_list* vl = new val_list; @@ -684,6 +737,8 @@ void bro_broker::Manager::Process() { switch ( u.status ) { case broker::incoming_connection_status::tag::established: + log_mgr->SendAllWritersTo(u.peer_name); + if ( Broker::incoming_connection_established ) { val_list* vl = new val_list; @@ -809,12 +864,6 @@ void bro_broker::Manager::Process() } } - struct unref_guard { - unref_guard(Val* v) : val(v) {} - ~unref_guard() { Unref(val); } - Val* val; - }; - for ( auto& ls : log_subscriptions ) { auto log_messages = ls.second.q.want_pop(); @@ -826,59 +875,19 @@ void bro_broker::Manager::Process() for ( auto& lm : log_messages ) { - if ( lm.size() != 2 ) + if ( lm.size() < 1 ) { - reporter->Warning("got bad remote log size: %zd (expect 2)", - lm.size()); + reporter->Warning("got bad remote log message, no type field"); continue; } - if ( ! broker::get(lm[0]) ) - { - reporter->Warning("got remote log w/o stream id: %d", - static_cast(broker::which(lm[0]))); - continue; - } - - if ( ! broker::get(lm[1]) ) - { - reporter->Warning("got remote log w/o columns: %d", - static_cast(broker::which(lm[1]))); - continue; - } - - auto stream_id = data_to_val(move(lm[0]), log_id_type); - - if ( ! stream_id ) - { - reporter->Warning("failed to unpack remote log stream id"); - continue; - } - - unref_guard stream_id_unreffer{stream_id}; - auto columns_type = log_mgr->StreamColumns(stream_id->AsEnumVal()); - - if ( ! columns_type ) - { - reporter->Warning("got remote log for unknown stream: %s", - stream_id->Type()->AsEnumType()->Lookup( - stream_id->AsEnum())); - continue; - } - - auto columns = data_to_val(move(lm[1]), columns_type, true); - - if ( ! columns ) - { - reporter->Warning("failed to unpack remote log stream columns" - " for stream: %s", - stream_id->Type()->AsEnumType()->Lookup( - stream_id->AsEnum())); - continue; - } - - log_mgr->Write(stream_id->AsEnumVal(), columns->AsRecordVal()); - Unref(columns); + if ( lm[0] == "create" ) + ProcessCreateLog(std::move(lm)); + else if ( lm[0] == "write" ) + ProcessWriteLog(std::move(lm)); + else + reporter->Warning("got remote log w/o known type: %d", + static_cast(broker::which(lm[0]))); } } @@ -979,6 +988,202 @@ void bro_broker::Manager::Process() next_timestamp = -1; } +bool bro_broker::Manager::ProcessCreateLog(broker::message msg) + { + if ( msg.size() != 5 ) + { + reporter->Warning("got bad remote log create size: %zd (expected 5)", + msg.size()); + return false; + } + + unsigned int idx = 1; // Skip type at index 0. + + // Get stream ID. + + if ( ! broker::get(msg[idx]) ) + { + reporter->Warning("got remote log create w/o stream id: %d", + static_cast(broker::which(msg[idx]))); + return false; + } + + auto stream_id = data_to_val(move(msg[idx]), log_id_type); + + if ( ! stream_id ) + { + reporter->Warning("failed to unpack remote log stream id"); + return false; + } + + unref_guard stream_id_unreffer{stream_id}; + ++idx; + + // Get writer ID. + + if ( ! broker::get(msg[idx]) ) + { + reporter->Warning("got remote log w/o writer id: %d", + static_cast(broker::which(msg[idx]))); + return false; + } + + auto writer_id = data_to_val(move(msg[idx]), writer_id_type); + + if ( ! writer_id ) + { + reporter->Warning("failed to unpack remote log writer id"); + return false; + } + + unref_guard writer_id_unreffer{writer_id}; + ++idx; + + // Get writer info. + + if ( ! broker::get(msg[idx]) ) + { + reporter->Warning("got remote log create w/o writer info id: %d", + static_cast(broker::which(msg[idx]))); + return false; + } + + auto writer_info = std::unique_ptr(new logging::WriterBackend::WriterInfo); + + if ( ! writer_info->FromBroker(std::move(msg[idx])) ) + { + reporter->Warning("failed to unpack remote log writer info"); + return false; + } + + ++idx; + + // Get log fields. + + auto fields_data = broker::get(msg[idx]); + + if ( ! fields_data ) + { + reporter->Warning("failed to unpack remote log fields"); + return false; + } + + auto num_fields = fields_data->size(); + auto fields = new threading::Field* [num_fields]; + + for ( auto i = 0u; i < num_fields; ++i ) + { + if ( auto field = data_to_threading_field((*fields_data)[i]) ) + fields[i] = field; + else + { + reporter->Warning("failed to convert remote log field # %d", i); + return false; + } + } + + if ( ! log_mgr->CreateWriterForRemoteLog(stream_id->AsEnumVal(), writer_id->AsEnumVal(), writer_info.get(), num_fields, fields) ) + { + ODesc d; + stream_id->Describe(&d); + reporter->Warning("failed to create remote log stream for %s locally", d.Description()); + } + + writer_info.release(); // log_mgr took ownership. + return true; + } + +bool bro_broker::Manager::ProcessWriteLog(broker::message msg) + { + if ( msg.size() != 5 ) + { + reporter->Warning("got bad remote log size: %zd (expected 5)", + msg.size()); + return false; + } + + unsigned int idx = 1; // Skip type at index 0. + + // Get stream ID. + + if ( ! broker::get(msg[idx]) ) + { + reporter->Warning("got remote log w/o stream id: %d", + static_cast(broker::which(msg[idx]))); + return false; + } + + auto stream_id = data_to_val(move(msg[idx]), log_id_type); + + if ( ! stream_id ) + { + reporter->Warning("failed to unpack remote log stream id"); + return false; + } + + unref_guard stream_id_unreffer{stream_id}; + ++idx; + + // Get writer ID. + + if ( ! broker::get(msg[idx]) ) + { + reporter->Warning("got remote log w/o writer id: %d", + static_cast(broker::which(msg[idx]))); + return false; + } + + auto writer_id = data_to_val(move(msg[idx]), writer_id_type); + + if ( ! writer_id ) + { + reporter->Warning("failed to unpack remote log writer id"); + return false; + } + + unref_guard writer_id_unreffer{writer_id}; + ++idx; + + // Get path. + + auto path = broker::get(msg[idx]); + + if ( ! path ) + { + reporter->Warning("failed to unpack remote log path"); + return false; + } + + ++idx; + + // Get log values. + + auto vals_data = broker::get(msg[idx]); + + if ( ! vals_data ) + { + reporter->Warning("failed to unpack remote log values"); + return false; + } + + auto num_vals = vals_data->size(); + auto vals = new threading::Value* [num_vals]; + + for ( auto i = 0u; i < num_vals; ++i ) + { + if ( auto val = data_to_threading_val((*vals_data)[i]) ) + vals[i] = val; + else + { + reporter->Warning("failed to convert remote log arg # %d", i); + return false; + } + } + + log_mgr->WriteFromRemote(stream_id->AsEnumVal(), writer_id->AsEnumVal(), *path, num_vals, vals); + return true; + } + bool bro_broker::Manager::AddStore(StoreHandleVal* handle) { if ( ! Enabled() ) diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 9fb7b9e328..10d5019b74 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -156,15 +156,34 @@ public: /** * Send a log entry to any interested peers. The topic name used is * implicitly "bro/log/". - * @param stream_id the stream to which the log entry belongs. - * @param columns the data which comprises the log entry. - * @param info the record type corresponding to the log's columns. + * @param stream the stream to which the log entry belongs. + * @param writer the writer to use for outputting this log entry. + * @param path the log path to output the log entry to. + * @param num_vals the number of fields to log. + * @param vals the log values to log, of size num_vals. * @param flags tune the behavior of how the message is send. * See the Broker::SendFlags record type. * @return true if the message is sent successfully. */ - bool Log(EnumVal* stream_id, RecordVal* columns, RecordType* info, - int flags); + bool Log(EnumVal* stream, EnumVal* writer, string path, int num_vals, + const threading::Value* const * vals, int flags); + + /** + * Send a message to create a log stream to any interested peers. + * The log stream may or may not already exist on the receiving side. + * The topic name used is implicitly "bro/log/". + * @param stream the stream to which the log entry belongs. + * @param writer the writer to use for outputting this log entry. + * @param info backend initialization information for the writer. + * @param num_fields the number of fields the log has. + * @param fields the log's fields of size num_fields. + * @param flags tune the behavior of how the message is send. + * See the Broker::SendFlags record type. + * @param peer If given, send the message only to this peer. + * @return true if the message is sent successfully. + */ + bool CreateLog(EnumVal* id, EnumVal* writer, const logging::WriterBackend::WriterInfo& info, + int num_fields, const threading::Field* const * fields, int flags, const string& peer = ""); /** * Automatically send an event to any interested peers whenever it is @@ -325,7 +344,6 @@ public: static int send_flags_to_int(Val* flags); private: - // IOSource interface overrides: void GetFds(iosource::FD_Set* read, iosource::FD_Set* write, iosource::FD_Set* except) override; @@ -340,6 +358,9 @@ private: broker::endpoint& Endpoint() { return *endpoint; } + bool ProcessCreateLog(broker::message msg); + bool ProcessWriteLog(broker::message msg); + struct QueueWithStats { broker::message_queue q; size_t received = 0; @@ -360,6 +381,7 @@ private: static VectorType* vector_of_data_type; static EnumType* log_id_type; + static EnumType* writer_id_type; static int send_flags_self_idx; static int send_flags_peers_idx; static int send_flags_unsolicited_idx; diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index e1a314d4d6..5bebdebdfb 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -919,12 +919,6 @@ bool Manager::Write(EnumVal* id, RecordVal* columns) #endif } -#ifdef ENABLE_BROKER - if ( stream->enable_remote && - ! broker_mgr->Log(id, columns, stream->columns, stream->remote_flags) ) - stream->enable_remote = false; -#endif - Unref(columns); return true; @@ -1121,23 +1115,46 @@ threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter, return vals; } +bool Manager::CreateWriterForRemoteLog(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info, + int num_fields, const threading::Field* const* fields) + { + return CreateWriter(id, writer, info, num_fields, fields, true, false, true); + } + +static void delete_info_and_fields(WriterBackend::WriterInfo* info, int num_fields, const threading::Field* const* fields) + { + for ( int i = 0; i < num_fields; i++ ) + delete fields[i]; + + delete [] fields; + delete info; + } + WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info, - int num_fields, const threading::Field* const* fields, bool local, bool remote, bool from_remote, + int num_fields, const threading::Field* const* fields, bool local, bool remote, bool from_remote, const string& instantiating_filter) { + WriterFrontend* result = 0; + Stream* stream = FindStream(id); if ( ! stream ) + { // Don't know this stream. + delete_info_and_fields(info, num_fields, fields); return 0; + } Stream::WriterMap::iterator w = stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), info->path)); if ( w != stream->writers.end() ) + { // If we already have a writer for this. That's fine, we just // return it. + delete_info_and_fields(info, num_fields, fields); return w->second->writer; + } WriterInfo* winfo = new WriterInfo; winfo->type = writer->Ref()->AsEnumVal(); @@ -1190,7 +1207,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBacken winfo->info->rotation_interval = winfo->interval; winfo->info->rotation_base = parse_rotate_base_time(base_time); - winfo->writer = new WriterFrontend(*winfo->info, id, writer, local, remote); + winfo->writer = new WriterFrontend(*winfo->info, id, writer, local, remote, stream->remote_flags); winfo->writer->Init(num_fields, fields); InstallRotationTimer(winfo); @@ -1207,8 +1224,8 @@ void Manager::DeleteVals(int num_fields, threading::Value** vals) delete [] vals; } -bool Manager::Write(EnumVal* id, EnumVal* writer, string path, int num_fields, - threading::Value** vals) +bool Manager::WriteFromRemote(EnumVal* id, EnumVal* writer, string path, int num_fields, + threading::Value** vals) { Stream* stream = FindStream(id); @@ -1280,6 +1297,34 @@ void Manager::SendAllWritersTo(RemoteSerializer::PeerID peer) } } +void Manager::SendAllWritersTo(const string& peer) + { +#ifdef ENABLE_BROKER + for ( vector::iterator s = streams.begin(); s != streams.end(); ++s ) + { + Stream* stream = (*s); + + if ( ! stream ) + continue; + + for ( Stream::WriterMap::iterator i = stream->writers.begin(); + i != stream->writers.end(); i++ ) + { + WriterFrontend* writer = i->second->writer; + + EnumVal writer_val(i->first.first, internal_type("Log::Writer")->AsEnumType()); + broker_mgr->CreateLog((*s)->id, + &writer_val, + *i->second->info, + writer->NumFields(), + writer->Fields(), + stream->remote_flags, + peer); + } + } +#endif + } + bool Manager::SetBuf(EnumVal* id, bool enabled) { Stream* stream = FindStream(id); diff --git a/src/logging/Manager.h b/src/logging/Manager.h index 5d3372fb9b..6852160169 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -129,6 +129,52 @@ public: */ bool Write(EnumVal* id, RecordVal* columns); + /** + * Create a new log writer frontend. This is exposed so that the + * communication system can recreated remote log streams locally. + * + * @param stream The enum value corresponding the log stream. + * + * @param writer The enum value corresponding the desired log writer. + * + * @param info A fully initialized object defining the + * characteristics of the backend writer instance. The method takes + * ownership of this. + * + * @param num_fields The number of log fields to write. + * + * @param vals An arry of log fields to write, of size num_fields. + * The method takes ownership of the arry. + * + * @return Returns true if the writer was successfully created. + */ + bool CreateWriterForRemoteLog(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info, + int num_fields, const threading::Field* const* fields); + + /** + * Writes out log entries that have already passed through all + * filters, and have raised any events. This is meant called for logs + * received alrready processed from remote. + * + * @param stream The enum value corresponding the log stream. + * + * @param writer The enum value corresponding the desired log writer. + * + * @param path The path of the target log stream to write to. + * + * @param num_fields The number of log values to write. + * + * @param vals An arry of log values to write, of size num_fields. + * The method takes ownership of the arry. + */ + bool WriteFromRemote(EnumVal* stream, EnumVal* writer, string path, + int num_fields, threading::Value** vals); + + /** + * Announces all instantiated writers to a given Broker peer. + */ + void SendAllWritersTo(const string& peer); + /** * Sets log streams buffering state. This adjusts all associated * writers to the new state. @@ -203,10 +249,6 @@ protected: int num_fields, const threading::Field* const* fields, bool local, bool remote, bool from_remote, const string& instantiating_filter=""); - // Takes ownership of values.. - bool Write(EnumVal* id, EnumVal* writer, string path, - int num_fields, threading::Value** vals); - // Announces all instantiated writers to peer. void SendAllWritersTo(RemoteSerializer::PeerID peer); diff --git a/src/logging/WriterBackend.cc b/src/logging/WriterBackend.cc index 3e868f067a..624b66b8e8 100644 --- a/src/logging/WriterBackend.cc +++ b/src/logging/WriterBackend.cc @@ -119,6 +119,63 @@ bool WriterBackend::WriterInfo::Write(SerializationFormat* fmt) const return true; } +broker::data WriterBackend::WriterInfo::ToBroker() const + { + auto bpath = broker::record::field(path); + auto brotation_base = broker::record::field(rotation_base); + auto brotation_interval = broker::record::field(rotation_interval); + auto bnetwork_time = broker::record::field(network_time); + + auto t = broker::table(); + + for ( config_map::const_iterator i = config.begin(); i != config.end(); ++i ) + { + auto key = std::string(i->first); + auto value = std::string(i->second); + t.insert(std::make_pair(key, value)); + } + + auto bconfig = broker::record::field(move(t)); + + return move(broker::record({bpath, brotation_base, brotation_interval, bnetwork_time, bconfig})); + } + +bool WriterBackend::WriterInfo::FromBroker(broker::data d) + { + auto r = broker::get(d); + + if ( ! r ) + return false; + + auto bpath = broker::get(*r->get(0)); + auto brotation_base = broker::get(*r->get(1)); + auto brotation_interval = broker::get(*r->get(2)); + auto bnetwork_time = broker::get(*r->get(3)); + auto bconfig = broker::get(*r->get(4)); + + if ( ! (bpath && brotation_base && brotation_interval && bnetwork_time && bconfig) ) + return false; + + path = copy_string(bpath->c_str()); + rotation_base = *brotation_base; + rotation_interval = *brotation_interval; + network_time = *bnetwork_time; + + for ( auto i : *bconfig ) + { + auto k = broker::get(i.first); + auto v = broker::get(i.second); + + if ( ! (k && v) ) + return false; + + auto p = std::make_pair(copy_string(k->c_str()), copy_string(v->c_str())); + config.insert(p); + } + + return true; + } + WriterBackend::WriterBackend(WriterFrontend* arg_frontend) : MsgThread() { num_fields = 0; diff --git a/src/logging/WriterBackend.h b/src/logging/WriterBackend.h index 2a93e8fefc..e17b7070c1 100644 --- a/src/logging/WriterBackend.h +++ b/src/logging/WriterBackend.h @@ -6,6 +6,7 @@ #define LOGGING_WRITERBACKEND_H #include "threading/MsgThread.h" +#include "broker/Data.h" #include "Component.h" @@ -110,15 +111,15 @@ public: } } - private: - const WriterInfo& operator=(const WriterInfo& other); // Disable. - - friend class ::RemoteSerializer; - // Note, these need to be adapted when changing the struct's // fields. They serialize/deserialize the struct. bool Read(SerializationFormat* fmt); bool Write(SerializationFormat* fmt) const; + broker::data ToBroker() const; + bool FromBroker(broker::data d); + + private: + const WriterInfo& operator=(const WriterInfo& other); // Disable. }; /** diff --git a/src/logging/WriterFrontend.cc b/src/logging/WriterFrontend.cc index 14e131c755..05f4f6593a 100644 --- a/src/logging/WriterFrontend.cc +++ b/src/logging/WriterFrontend.cc @@ -2,6 +2,10 @@ #include "Net.h" #include "threading/SerialTypes.h" +#ifdef ENABLE_BROKER +#include "broker/Manager.h" +#endif + #include "Manager.h" #include "WriterFrontend.h" #include "WriterBackend.h" @@ -97,7 +101,7 @@ private: using namespace logging; -WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVal* arg_stream, EnumVal* arg_writer, bool arg_local, bool arg_remote) +WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVal* arg_stream, EnumVal* arg_writer, bool arg_local, bool arg_remote, int arg_remote_flags) { stream = arg_stream; writer = arg_writer; @@ -108,6 +112,7 @@ WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVa buf = true; local = arg_local; remote = arg_remote; + remote_flags = arg_remote_flags; write_buffer = 0; write_buffer_pos = 0; info = new WriterBackend::WriterInfo(arg_info); @@ -167,12 +172,23 @@ void WriterFrontend::Init(int arg_num_fields, const Field* const * arg_fields) backend->SendIn(new InitMessage(backend, arg_num_fields, arg_fields)); if ( remote ) + { remote_serializer->SendLogCreateWriter(stream, writer, *info, arg_num_fields, arg_fields); +#ifdef ENABLE_BROKER + broker_mgr->CreateLog(stream, + writer, + *info, + arg_num_fields, + arg_fields, + remote_flags); +#endif + } + } void WriterFrontend::Write(int arg_num_fields, Value** vals) @@ -191,12 +207,21 @@ void WriterFrontend::Write(int arg_num_fields, Value** vals) } if ( remote ) + { remote_serializer->SendLogWrite(stream, writer, info->path, num_fields, vals); + broker_mgr->Log(stream, + writer, + info->path, + num_fields, + vals, + remote_flags); + } + if ( ! backend ) { DeleteVals(arg_num_fields, vals); diff --git a/src/logging/WriterFrontend.h b/src/logging/WriterFrontend.h index e343f326bf..c39d2bdf3c 100644 --- a/src/logging/WriterFrontend.h +++ b/src/logging/WriterFrontend.h @@ -38,12 +38,15 @@ public: * * local: If true, the writer will instantiate a local backend. * - * remote: If true, the writer will forward all data to remote + * remote: If true, the writer will forward logs to remote * clients. * + * remote_flags: Broker flags controlling where remote logs are + * propagated to. + * * Frontends must only be instantiated by the main thread. */ - WriterFrontend(const WriterBackend::WriterInfo& info, EnumVal* stream, EnumVal* writer, bool local, bool remote); + WriterFrontend(const WriterBackend::WriterInfo& info, EnumVal* stream, EnumVal* writer, bool local, bool remote, int arg_remote_flags); /** * Destructor. @@ -214,6 +217,7 @@ protected: bool buf; // True if buffering is enabled (default). bool local; // True if logging locally. bool remote; // True if loggin remotely. + int remote_flags; // Broker propagation flags. const char* name; // Descriptive name of the WriterBackend::WriterInfo* info; // The writer information. diff --git a/testing/btest/Baseline/broker.remote_log/recv.recv.out b/testing/btest/Baseline/broker.remote_log/recv.recv.out index 2f4a31df51..e69de29bb2 100644 --- a/testing/btest/Baseline/broker.remote_log/recv.recv.out +++ b/testing/btest/Baseline/broker.remote_log/recv.recv.out @@ -1,6 +0,0 @@ -wrote log, [msg=ping, nolog=no, num=0] -wrote log, [msg=ping, nolog=no, num=1] -wrote log, [msg=ping, nolog=no, num=2] -wrote log, [msg=ping, nolog=no, num=3] -wrote log, [msg=ping, nolog=no, num=4] -wrote log, [msg=ping, nolog=no, num=5] diff --git a/testing/btest/Baseline/broker.remote_log_types/recv.recv.out b/testing/btest/Baseline/broker.remote_log_types/recv.recv.out new file mode 100644 index 0000000000..e69de29bb2 diff --git a/testing/btest/Baseline/broker.remote_log_types/recv.test.log b/testing/btest/Baseline/broker.remote_log_types/recv.test.log new file mode 100644 index 0000000000..eb2b066cd4 --- /dev/null +++ b/testing/btest/Baseline/broker.remote_log_types/recv.test.log @@ -0,0 +1,10 @@ +#separator \x09 +#set_separator , +#empty_field (empty) +#unset_field - +#path test +#open 2017-02-11-02-17-35 +#fields b i e c p sn a d t iv s sc ss se vc ve f +#types bool int enum count port subnet addr double time interval string set[count] set[string] set[string] vector[count] vector[string] func +T -42 Test::LOG 21 123 10.0.0.0/24 1.2.3.4 3.14 1486779455.703438 100.000000 hurz 1 AA (empty) 10,20,30 (empty) foo\x0a{ \x0aif (0 < i) \x0a\x09return (Foo);\x0aelse\x0a\x09return (Bar);\x0a\x0a} +#close 2017-02-11-02-17-35 diff --git a/testing/btest/Baseline/broker.remote_log_types/send.send.out b/testing/btest/Baseline/broker.remote_log_types/send.send.out new file mode 100644 index 0000000000..632279e697 --- /dev/null +++ b/testing/btest/Baseline/broker.remote_log_types/send.send.out @@ -0,0 +1 @@ +Broker::outgoing_connection_established, 127.0.0.1, 9999/tcp diff --git a/testing/btest/Baseline/broker.remote_log_types/send.test.log b/testing/btest/Baseline/broker.remote_log_types/send.test.log new file mode 100644 index 0000000000..59987c5998 --- /dev/null +++ b/testing/btest/Baseline/broker.remote_log_types/send.test.log @@ -0,0 +1,10 @@ +#separator \x09 +#set_separator , +#empty_field (empty) +#unset_field - +#path test +#open 2017-02-11-02-17-35 +#fields b i e c p sn a d t iv s sc ss se vc ve f +#types bool int enum count port subnet addr double time interval string set[count] set[string] set[string] vector[count] vector[string] func +T -42 Test::LOG 21 123 10.0.0.0/24 1.2.3.4 3.14 1486779455.703438 100.000000 hurz 1 AA (empty) 10,20,30 (empty) foo\x0a{ \x0aif (0 < i) \x0a\x09return (Foo);\x0aelse\x0a\x09return (Bar);\x0a\x0a} +#close 2017-02-11-02-17-36 diff --git a/testing/btest/broker/remote_log.test b/testing/btest/broker/remote_log.test index 5881ad6d92..598f3f6e46 100644 --- a/testing/btest/broker/remote_log.test +++ b/testing/btest/broker/remote_log.test @@ -12,25 +12,28 @@ @TEST-START-FILE common.bro + +global quit_receiver: event(); +global quit_sender: event(); + + module Test; export { - redef enum Log::ID += { LOG }; + redef enum Log::ID += { LOG }; - type Info: record { - msg: string &log; - nolog: string &default="no"; - num: count &log; - }; - - global log_test: event(rec: Test::Info); + type Info: record { + msg: string &log; + nolog: string &default="no"; + num: count &log; + }; } event bro_init() &priority=5 - { - Broker::enable(); - Log::create_stream(Test::LOG, [$columns=Test::Info, $ev=log_test]); - } + { + Broker::enable(); + Log::create_stream(Test::LOG, [$columns=Test::Info]); + } @TEST-END-FILE @@ -40,58 +43,66 @@ const broker_port: port &redef; redef exit_only_after_terminate = T; event bro_init() - { - Broker::subscribe_to_logs("bro/log/"); - Broker::listen(broker_port, "127.0.0.1"); - } + { + Broker::subscribe_to_logs("bro/log/"); + Broker::subscribe_to_events("bro/event/"); + Broker::listen(broker_port, "127.0.0.1"); + } -event Test::log_test(rec: Test::Info) +event quit_receiver() { - print "wrote log", rec; - - if ( rec$num == 5 ) - terminate(); + terminate(); } @TEST-END-FILE + @TEST-START-FILE send.bro const broker_port: port &redef; redef exit_only_after_terminate = T; event bro_init() - { - Broker::enable_remote_logs(Test::LOG); - Broker::connect("127.0.0.1", broker_port, 1secs); - } + { + Broker::enable_remote_logs(Test::LOG); + Broker::connect("127.0.0.1", broker_port, 1secs); + } global n = 0; event do_write() - { - if ( n == 6 ) - return; - else + { + if ( n == 6 ) { - Log::write(Test::LOG, [$msg = "ping", $num = n]); - ++n; - event do_write(); + local args = Broker::event_args(quit_receiver); + Broker::send_event("bro/event/", args); + schedule 1sec { quit_sender() }; } + else + { + Log::write(Test::LOG, [$msg = "ping", $num = n]); + ++n; + event do_write(); + } + } + +event quit_sender() + { + terminate(); } event Broker::outgoing_connection_established(peer_address: string, peer_port: port, peer_name: string) - { - print "Broker::outgoing_connection_established", peer_address, peer_port; - event do_write(); - } + { + print "Broker::outgoing_connection_established", peer_address, peer_port; + event do_write(); + } event Broker::outgoing_connection_broken(peer_address: string, peer_port: port) - { - terminate(); - } + { + terminate(); + } @TEST-END-FILE diff --git a/testing/btest/broker/remote_log_types.test b/testing/btest/broker/remote_log_types.test new file mode 100644 index 0000000000..9089e087cb --- /dev/null +++ b/testing/btest/broker/remote_log_types.test @@ -0,0 +1,133 @@ +# @TEST-SERIALIZE: brokercomm +# @TEST-REQUIRES: grep -q ENABLE_BROKER:BOOL=true $BUILD/CMakeCache.txt + +# @TEST-EXEC: btest-bg-run recv "bro -b ../common.bro ../recv.bro broker_port=$BROKER_PORT >recv.out" +# @TEST-EXEC: btest-bg-run send "bro -b ../common.bro ../send.bro broker_port=$BROKER_PORT >send.out" + +# @TEST-EXEC: btest-bg-wait 20 +# @TEST-EXEC: btest-diff recv/recv.out +# @TEST-EXEC: btest-diff recv/test.log +# @TEST-EXEC: btest-diff send/send.out +# @TEST-EXEC: btest-diff send/test.log + +@TEST-START-FILE common.bro + + +global quit_receiver: event(); +global quit_sender: event(); + + +module Test; + +export { + redef enum Log::ID += { LOG }; + + type Info: record { + b: bool; + i: int; + e: Log::ID; + c: count; + p: port; + sn: subnet; + a: addr; + d: double; + t: time; + iv: interval; + s: string; + sc: set[count]; + ss: set[string]; + se: set[string]; + vc: vector of count; + ve: vector of string; + f: function(i: count) : string; + } &log; + +} + +event bro_init() &priority=5 + { + Broker::enable(); + Log::create_stream(Test::LOG, [$columns=Test::Info]); + } + +@TEST-END-FILE + +@TEST-START-FILE recv.bro + +const broker_port: port &redef; +redef exit_only_after_terminate = T; + +event bro_init() + { + Broker::subscribe_to_logs("bro/log/"); + Broker::subscribe_to_events("bro/event/"); + Broker::listen(broker_port, "127.0.0.1"); + } + +event quit_receiver() + { + terminate(); + } + +@TEST-END-FILE + + +@TEST-START-FILE send.bro + +const broker_port: port &redef; +redef exit_only_after_terminate = T; + +event bro_init() + { + Broker::enable_remote_logs(Test::LOG); + Broker::connect("127.0.0.1", broker_port, 1secs); + } + +event quit_sender() + { + terminate(); + } + +function foo(i : count) : string + { + if ( i > 0 ) + return "Foo"; + else + return "Bar"; + } + +event Broker::outgoing_connection_established(peer_address: string, + peer_port: port, + peer_name: string) + { + print "Broker::outgoing_connection_established", peer_address, peer_port; + + local empty_set: set[string]; + local empty_vector: vector of string; + + Log::write(Test::LOG, [ + $b=T, + $i=-42, + $e=Test::LOG, + $c=21, + $p=123/tcp, + $sn=10.0.0.1/24, + $a=1.2.3.4, + $d=3.14, + $t=network_time(), + $iv=100secs, + $s="hurz", + $sc=set(1), # set(1,2,3,4), # Output not stable for multi-element sets. + $ss=set("AA"), # set("AA", "BB", "CC") # Output not stable for multi-element sets. + $se=empty_set, + $vc=vector(10, 20, 30), + $ve=empty_vector, + $f=foo + ]); + + local args = Broker::event_args(quit_receiver); + Broker::send_event("bro/event/", args); + schedule 1sec { quit_sender() }; + } + +@TEST-END-FILE