From a5e9a535a5d2095e3409b238a8dc2299da5cd508 Mon Sep 17 00:00:00 2001 From: Robin Sommer Date: Tue, 31 Jan 2017 21:51:31 -0800 Subject: [PATCH 01/16] Changing semantics of Broker's remote logging to match old communication framework. Broker had changed the semantics of remote logging: it sent over the original Bro record containing the values to be logged, which on the receiving side would then pass through the logging framework normally, including triggering filters and events. The old communication system however special-cases logs: it sends already processed log entries, just as they go into the log files, and without any receiver-side filtering etc. This more efficient as it short-cuts the processing path, and also avoids the more expensive Val serialization. It also lets the sender determine the specifics of what gets logged (and how). This commit changes Broker over to now use the same semantics as the old communication system. TODOs: - The new Broker code doesn't have consistent #ifdefs yet. - Right now, when a new log receiver connects, all existing logs are broadcasted out again to all current clients. That doesn't so any harm, but is unncessary. Need to add a way to send the existing logs to just the new client. --- aux/binpac | 2 +- aux/bro-aux | 2 +- aux/broccoli | 2 +- aux/broctl | 2 +- aux/broker | 2 +- aux/btest | 2 +- aux/plugins | 2 +- src/RemoteSerializer.cc | 5 +- src/broker/Data.cc | 430 ++++++++++++++++++ src/broker/Data.h | 30 ++ src/broker/Manager.cc | 357 +++++++++++---- src/broker/Manager.h | 34 +- src/logging/Manager.cc | 65 ++- src/logging/Manager.h | 50 +- src/logging/WriterBackend.cc | 57 +++ src/logging/WriterBackend.h | 11 +- src/logging/WriterFrontend.cc | 27 +- src/logging/WriterFrontend.h | 8 +- .../Baseline/broker.remote_log/recv.recv.out | 6 - .../broker.remote_log_types/recv.recv.out | 0 .../broker.remote_log_types/recv.test.log | 10 + .../broker.remote_log_types/send.send.out | 1 + .../broker.remote_log_types/send.test.log | 10 + testing/btest/broker/remote_log.test | 89 ++-- testing/btest/broker/remote_log_types.test | 133 ++++++ 25 files changed, 1178 insertions(+), 159 deletions(-) create mode 100644 testing/btest/Baseline/broker.remote_log_types/recv.recv.out create mode 100644 testing/btest/Baseline/broker.remote_log_types/recv.test.log create mode 100644 testing/btest/Baseline/broker.remote_log_types/send.send.out create mode 100644 testing/btest/Baseline/broker.remote_log_types/send.test.log create mode 100644 testing/btest/broker/remote_log_types.test diff --git a/aux/binpac b/aux/binpac index 0f1ecfa972..a0990e61ad 160000 --- a/aux/binpac +++ b/aux/binpac @@ -1 +1 @@ -Subproject commit 0f1ecfa97236635fb93e013404e6b30d6c506ddd +Subproject commit a0990e61ad4a3705bda4cc5a20059af2d1bda4c3 diff --git a/aux/bro-aux b/aux/bro-aux index b1e75f6a21..7660b5f4c5 160000 --- a/aux/bro-aux +++ b/aux/bro-aux @@ -1 +1 @@ -Subproject commit b1e75f6a212250b1730a438f27fc778618b67ec3 +Subproject commit 7660b5f4c5be40aa5f3a7c8746fdcf68331f9b93 diff --git a/aux/broccoli b/aux/broccoli index ed52e3414b..765eab50f7 160000 --- a/aux/broccoli +++ b/aux/broccoli @@ -1 +1 @@ -Subproject commit ed52e3414b31b05ec9abed627b4153c8e2243441 +Subproject commit 765eab50f7796fdb3c308fe9232cd7891f098c67 diff --git a/aux/broctl b/aux/broctl index 73dbc79ac2..f6d451520e 160000 --- a/aux/broctl +++ b/aux/broctl @@ -1 +1 @@ -Subproject commit 73dbc79ac24cdfef07d8574a4da5d43056ba5fa5 +Subproject commit f6d451520eaaaae97aab6df2bb4e0aecb6b63e66 diff --git a/aux/broker b/aux/broker index 23def70c44..68a36ed814 160000 --- a/aux/broker +++ b/aux/broker @@ -1 +1 @@ -Subproject commit 23def70c44128d19138029615dd154359286e111 +Subproject commit 68a36ed81480ba935268bcaf7b6f2249d23436da diff --git a/aux/btest b/aux/btest index 9d5c7bcac9..32e582514a 160000 --- a/aux/btest +++ b/aux/btest @@ -1 +1 @@ -Subproject commit 9d5c7bcac9b04710931bc8a42b545f0691561b2f +Subproject commit 32e582514ae044befa8e0511083bf11a51408a1d diff --git a/aux/plugins b/aux/plugins index 2322840bcd..0a2f021527 160000 --- a/aux/plugins +++ b/aux/plugins @@ -1 +1 @@ -Subproject commit 2322840bcdbd618ae7bd24e22d874fb30ab89bbb +Subproject commit 0a2f0215270e6ceaf9c1312f705b95d2cce1b530 diff --git a/src/RemoteSerializer.cc b/src/RemoteSerializer.cc index 4842f819b6..2adecfc89a 100644 --- a/src/RemoteSerializer.cc +++ b/src/RemoteSerializer.cc @@ -2730,8 +2730,7 @@ bool RemoteSerializer::ProcessLogCreateWriter() id_val = new EnumVal(id, internal_type("Log::ID")->AsEnumType()); writer_val = new EnumVal(writer, internal_type("Log::Writer")->AsEnumType()); - if ( ! log_mgr->CreateWriter(id_val, writer_val, info, num_fields, fields, - true, false, true) ) + if ( ! log_mgr->CreateWriterForRemoteLog(id_val, writer_val, info, num_fields, fields) ) { delete_fields_up_to = num_fields; goto error; @@ -2803,7 +2802,7 @@ bool RemoteSerializer::ProcessLogWrite() id_val = new EnumVal(id, internal_type("Log::ID")->AsEnumType()); writer_val = new EnumVal(writer, internal_type("Log::Writer")->AsEnumType()); - success = log_mgr->Write(id_val, writer_val, path, num_fields, vals); + success = log_mgr->WriteFromRemote(id_val, writer_val, path, num_fields, vals); Unref(id_val); Unref(writer_val); diff --git a/src/broker/Data.cc b/src/broker/Data.cc index bc4197a974..35d815b9e8 100644 --- a/src/broker/Data.cc +++ b/src/broker/Data.cc @@ -709,3 +709,433 @@ bool bro_broker::DataVal::DoUnserialize(UnserialInfo* info) delete [] serial; return true; } + +static broker::util::optional threading_val_to_data_internal(TypeTag type, const threading::Value::_val& val) + { + switch ( type ) { + case TYPE_BOOL: + return {val.int_val != 0}; + + case TYPE_INT: + return {val.int_val}; + + case TYPE_COUNT: + case TYPE_COUNTER: + return {val.uint_val}; + + case TYPE_PORT: + return {broker::port(val.port_val.port, to_broker_port_proto(val.port_val.proto))}; + + case TYPE_ADDR: + { + IPAddr a; + + switch ( val.addr_val.family ) { + case IPv4: + a = IPAddr(val.addr_val.in.in4); + break; + + case IPv6: + a = IPAddr(val.addr_val.in.in6); + break; + + default: + reporter->InternalError("unsupported protocol family in threading_val_to_data"); + } + + in6_addr tmp; + a.CopyIPv6(&tmp); + return {broker::address(reinterpret_cast(&tmp), + broker::address::family::ipv6, + broker::address::byte_order::network)}; + } + + case TYPE_SUBNET: + { + IPAddr a; + int length; + + switch ( val.subnet_val.prefix.family ) { + case IPv4: + a = IPAddr(val.subnet_val.prefix.in.in4); + length = (val.subnet_val.length - 96); + break; + + case IPv6: + a = IPAddr(val.subnet_val.prefix.in.in6); + length = val.subnet_val.length; + break; + + default: + reporter->InternalError("unsupported protocol family in threading_val_to_data"); + } + + in6_addr tmp; + a.CopyIPv6(&tmp); + + auto s = broker::address(reinterpret_cast(&tmp), + broker::address::family::ipv6, + broker::address::byte_order::network); + fprintf(stderr, "%d\n", val.subnet_val.length); + return {broker::subnet(s, length)}; + } + + case TYPE_DOUBLE: + return {val.double_val}; + + case TYPE_TIME: + return {broker::time_point(val.double_val)}; + + case TYPE_INTERVAL: + return {broker::time_duration(val.double_val)}; + + case TYPE_ENUM: + return {broker::enum_value(std::string(val.string_val.data, val.string_val.length))}; + + case TYPE_STRING: + case TYPE_FILE: + case TYPE_FUNC: + return {std::string(val.string_val.data, val.string_val.length)}; + + case TYPE_TABLE: + { + auto s = broker::set(); + + for ( int i = 0; i < val.set_val.size; ++i ) + { + auto c = bro_broker::threading_val_to_data(val.set_val.vals[i]); + + if ( ! c ) + return {}; + + s.emplace(*c); + } + + return {move(s)}; + } + + case TYPE_VECTOR: + { + auto s = broker::vector(); + + for ( int i = 0; i < val.vector_val.size; ++i ) + { + auto c = bro_broker::threading_val_to_data(val.vector_val.vals[i]); + + if ( ! c ) + return {}; + + s.emplace_back(*c); + } + + return {move(s)}; + } + + default: + reporter->InternalError("unsupported type %s in threading_val_to_data", + type_name(type)); + } + + return {}; + } + + +broker::util::optional bro_broker::threading_val_to_data(const threading::Value* v) + { + broker::util::optional d; + + if ( v->present ) + { + d = threading_val_to_data_internal(v->type, v->val); + + if ( ! d ) + return {}; + } + + auto type = broker::record::field(static_cast(v->type)); + auto present = broker::record::field(v->present); + auto data = (v->present) ? broker::record::field(*d) : broker::util::optional(); + + return {broker::record({move(type), move(present), move(data)})}; + }; + +struct threading_val_converter { + using result_type = bool; + + TypeTag type; + threading::Value::_val& val; + + result_type operator()(bool a) + { + if ( type == TYPE_BOOL ) + { + val.int_val = (a ? 1 : 0); + return true; + } + + return false; + } + + result_type operator()(uint64_t a) + { + if ( type == TYPE_COUNT || type == TYPE_COUNTER ) + { + val.uint_val = a; + return true; + } + + return false; + } + + result_type operator()(int64_t a) + { + if ( type == TYPE_INT ) + { + val.int_val = a; + return true; + } + + return false; + } + + result_type operator()(double a) + { + if ( type == TYPE_DOUBLE ) + { + val.double_val = a; + return true; + } + + return false; + } + + + result_type operator()(const std::string& a) + { + if ( type == TYPE_STRING || type == TYPE_FILE || type == TYPE_FUNC ) + { + auto n = a.size(); + val.string_val.length = n; + val.string_val.data = new char[n]; + memcpy(val.string_val.data, a.data(), n); + return true; + } + + return false; + } + + result_type operator()(const broker::address& a) + { + if ( type == TYPE_ADDR ) + { + auto bits = reinterpret_cast(&a.bytes()); + auto b = IPAddr(*bits); + + if ( a.is_v4() ) + { + val.addr_val.family = IPv4; + b.CopyIPv4(&val.addr_val.in.in4); + return true; + } + + if ( a.is_v6() ) + { + val.addr_val.family = IPv6; + b.CopyIPv6(&val.addr_val.in.in6); + return true; + } + } + + return false; + } + + result_type operator()(const broker::subnet& s) + { + if ( type == TYPE_SUBNET ) + { + auto bits = reinterpret_cast(&s.network().bytes()); + auto a = IPAddr(*bits); + + val.subnet_val.length = s.length(); + + if ( s.network().is_v4() ) + { + val.subnet_val.prefix.family = IPv4; + a.CopyIPv4(&val.subnet_val.prefix.in.in4); + val.subnet_val.length += 96; + return true; + } + + if ( s.network().is_v6() ) + { + val.subnet_val.prefix.family = IPv6; + a.CopyIPv6(&val.subnet_val.prefix.in.in6); + return true; + } + } + + return false; + } + + result_type operator()(const broker::port& a) + { + if ( type == TYPE_PORT ) + { + val.port_val.port = a.number(); + val.port_val.proto = bro_broker::to_bro_port_proto(a.type()); + return true; + } + + return false; + } + + result_type operator()(const broker::time_point& a) + { + if ( type == TYPE_TIME ) + { + val.double_val = a.value; + return true; + } + + return false; + } + + result_type operator()(const broker::time_duration& a) + { + if ( type == TYPE_INTERVAL ) + { + val.double_val = a.value; + return true; + } + + return false; + } + + result_type operator()(const broker::enum_value& a) + { + if ( type == TYPE_ENUM ) + { + auto n = a.name.size(); + val.string_val.length = n; + val.string_val.data = new char[n]; + memcpy(val.string_val.data, a.name.data(), n); + return true; + } + + return false; + } + + result_type operator()(const broker::set& a) + { + if ( type == TYPE_TABLE ) + { + val.set_val.size = a.size(); + val.set_val.vals = new threading::Value* [val.set_val.size]; + + auto p = val.set_val.vals; + + for ( auto& i : a ) + *p++ = bro_broker::data_to_threading_val(move(i)); + + return true; + } + + return false; + } + + result_type operator()(const broker::table& a) + { + return false; + } + + result_type operator()(const broker::vector& a) + { + if ( type == TYPE_VECTOR ) + { + val.vector_val.size = a.size(); + val.vector_val.vals = new threading::Value* [val.set_val.size]; + + auto p = val.set_val.vals; + + for ( auto& i : a ) + *p++ = bro_broker::data_to_threading_val(move(i)); + + return true; + } + + return false; + } + + result_type operator()(const broker::record& a) + { + return false; + } +}; + +threading::Value* bro_broker::data_to_threading_val(broker::data d) + { + auto r = broker::get(d); + + if ( ! r ) + return nullptr; + + auto type = broker::get(*r->get(0));; + auto present = broker::get(*r->get(1));; + auto data = *r->get(2); + + if ( ! (type && present) ) + return nullptr; + + auto tv = new threading::Value; + tv->type = static_cast(*type); + tv->present = *present; + + if ( present && ! broker::visit(threading_val_converter{tv->type, tv->val}, data) ) + { + delete tv; + return nullptr; + } + + return tv; + } + +broker::data bro_broker::threading_field_to_data(const threading::Field* f) + { + auto name = broker::record::field(f->name); + auto type = broker::record::field(static_cast(f->type)); + auto subtype = broker::record::field(static_cast(f->subtype)); + auto optional = broker::record::field(f->optional); + + broker::util::optional secondary; + + if ( f->secondary_name ) + secondary = {f->secondary_name}; + + return move(broker::record({name, secondary, type, subtype, optional})); + } + +threading::Field* bro_broker::data_to_threading_field(broker::data d) + { + auto r = broker::get(d); + + if ( ! r ) + return nullptr; + + auto name = broker::get(*r->get(0)); + auto secondary = r->get(1); + auto type = broker::get(*r->get(2)); + auto subtype = broker::get(*r->get(3)); + auto optional = broker::get(*r->get(4)); + + if ( ! (name && type && subtype && optional) ) + return nullptr; + + if ( secondary && ! broker::is(*secondary) ) + return nullptr; + + return new threading::Field(name->c_str(), + secondary ? broker::get(*secondary)->c_str() : nullptr, + static_cast(*type), + static_cast(*subtype), + *optional); + } diff --git a/src/broker/Data.h b/src/broker/Data.h index 0045ad58ad..9e0c8120de 100644 --- a/src/broker/Data.h +++ b/src/broker/Data.h @@ -61,6 +61,36 @@ broker::util::optional val_to_data(Val* v); */ Val* data_to_val(broker::data d, BroType* type, bool require_log_attr = false); +/** + * Convert a Bro threading::Value to a Broker data value. + * @param v a Bro threading::Value. + * @return a Broker data value if the Bro threading::Value could be converted to one. + */ +broker::util::optional threading_val_to_data(const threading::Value* v); + +/** + * Convert a Bro threading::Field to a Broker data value. + * @param v a Bro threading::Field. + * @return a Broker data value if the Bro threading::Field could be converted to one. + */ +broker::data threading_field_to_data(const threading::Field* f); + +/** + * Convert a Broker data value to a Bro threading::Value. + * @param d a Broker data value. + * @return a pointer to a new Bro threading::Value or a nullptr if the conversion was not + * possible. + */ +threading::Value* data_to_threading_val(broker::data d); + +/** + * Convert a Broker data value to a Bro threading::Value. + * @param d a Broker data value. + * @return a pointer to a new Bro threading::Value or a nullptr if the conversion was not + * possible. + */ +threading::Field* data_to_threading_field(broker::data d); + /** * A Bro value which wraps a Broker data value. */ diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 334b7f84f5..76040b129c 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -20,10 +20,17 @@ using namespace std; VectorType* bro_broker::Manager::vector_of_data_type; EnumType* bro_broker::Manager::log_id_type; +EnumType* bro_broker::Manager::writer_id_type; int bro_broker::Manager::send_flags_self_idx; int bro_broker::Manager::send_flags_peers_idx; int bro_broker::Manager::send_flags_unsolicited_idx; +struct unref_guard { + unref_guard(Val* v) : val(v) {} + ~unref_guard() { Unref(val); } + Val* val; +}; + bro_broker::Manager::Manager() : iosource::IOSource(), next_timestamp(-1) { @@ -83,6 +90,7 @@ bool bro_broker::Manager::Enable(Val* broker_endpoint_flags) send_flags_unsolicited_idx = require_field(send_flags_type, "unsolicited"); log_id_type = internal_type("Log::ID")->AsEnumType(); + writer_id_type = internal_type("Log::Writer")->AsEnumType(); bro_broker::opaque_of_data_type = new OpaqueType("Broker::Data"); bro_broker::opaque_of_set_iterator = new OpaqueType("Broker::SetIterator"); @@ -206,8 +214,9 @@ bool bro_broker::Manager::Event(std::string topic, broker::message msg, int flag return true; } -bool bro_broker::Manager::Log(EnumVal* stream, RecordVal* columns, RecordType* info, - int flags) +bool bro_broker::Manager::CreateLog(EnumVal* stream, EnumVal* writer, const logging::WriterBackend::WriterInfo& info, + int num_fields, const threading::Field* const * fields, int flags, + const string& peer) { if ( ! Enabled() ) return false; @@ -221,40 +230,82 @@ bool bro_broker::Manager::Log(EnumVal* stream, RecordVal* columns, RecordType* i return false; } - broker::record column_data; + auto writer_name = writer->Type()->AsEnumType()->Lookup(writer->AsEnum()); - for ( auto i = 0u; i < static_cast(info->NumFields()); ++i ) + if ( ! writer_name ) { - if ( ! info->FieldDecl(i)->FindAttr(ATTR_LOG) ) - continue; + reporter->Error("Failed to remotely log: writer %d doesn't have name", + writer->AsEnum()); + return false; + } - auto field_val = columns->LookupWithDefault(i); + auto writer_info = info.ToBroker(); - if ( ! field_val ) - { - column_data.fields.emplace_back(broker::record::field{}); - continue; - } + broker::vector fields_data; - auto opt_field_data = val_to_data(field_val); - Unref(field_val); + for ( auto i = 0; i < num_fields; ++i ) + { + auto field_data = threading_field_to_data(fields[i]); + fields_data.push_back(move(field_data)); + } - if ( ! opt_field_data ) + // TODO: If peer is given, send message to just that one destination. + + std::string topic = std::string("bro/log/") + stream_name; + auto bstream_name = broker::enum_value(move(stream_name)); + auto bwriter_name = broker::enum_value(move(writer_name)); + broker::message msg{move("create"), move(bstream_name), move(bwriter_name), move(writer_info), move(fields_data)}; + endpoint->send(move(topic), move(msg), flags); + + return true; + } + +bool bro_broker::Manager::Log(EnumVal* stream, EnumVal* writer, string path, int num_vals, const threading::Value* const * vals, int flags) + { + if ( ! Enabled() ) + return false; + + auto stream_name = stream->Type()->AsEnumType()->Lookup(stream->AsEnum()); + + if ( ! stream_name ) + { + reporter->Error("Failed to remotely log: stream %d doesn't have name", + stream->AsEnum()); + return false; + } + + auto writer_name = writer->Type()->AsEnumType()->Lookup(writer->AsEnum()); + + if ( ! writer_name ) + { + reporter->Error("Failed to remotely log: writer %d doesn't have name", + writer->AsEnum()); + return false; + } + + broker::vector vals_data; + + for ( auto i = 0; i < num_vals; ++i ) + { + auto field_data = threading_val_to_data(vals[i]); + + if ( ! field_data ) { reporter->Error("Failed to remotely log stream %s: " - "unsupported type '%s'", - stream_name, - type_name(info->FieldDecl(i)->type->Tag())); + "unsupported type for field #%d", + stream_name, i); return false; } - column_data.fields.emplace_back( - broker::record::field{move(*opt_field_data)}); + vals_data.push_back(move(*field_data)); } - broker::message msg{broker::enum_value{stream_name}, move(column_data)}; std::string topic = std::string("bro/log/") + stream_name; + auto bstream_name = broker::enum_value(move(stream_name)); + auto bwriter_name = broker::enum_value(move(writer_name)); + broker::message msg{move("write"), move(bstream_name), move(bwriter_name), move(path), move(vals_data)}; endpoint->send(move(topic), move(msg), flags); + return true; } @@ -639,6 +690,8 @@ void bro_broker::Manager::Process() { switch ( u.status ) { case broker::outgoing_connection_status::tag::established: + log_mgr->SendAllWritersTo(u.peer_name); + if ( Broker::outgoing_connection_established ) { val_list* vl = new val_list; @@ -684,6 +737,8 @@ void bro_broker::Manager::Process() { switch ( u.status ) { case broker::incoming_connection_status::tag::established: + log_mgr->SendAllWritersTo(u.peer_name); + if ( Broker::incoming_connection_established ) { val_list* vl = new val_list; @@ -809,12 +864,6 @@ void bro_broker::Manager::Process() } } - struct unref_guard { - unref_guard(Val* v) : val(v) {} - ~unref_guard() { Unref(val); } - Val* val; - }; - for ( auto& ls : log_subscriptions ) { auto log_messages = ls.second.q.want_pop(); @@ -826,59 +875,19 @@ void bro_broker::Manager::Process() for ( auto& lm : log_messages ) { - if ( lm.size() != 2 ) + if ( lm.size() < 1 ) { - reporter->Warning("got bad remote log size: %zd (expect 2)", - lm.size()); + reporter->Warning("got bad remote log message, no type field"); continue; } - if ( ! broker::get(lm[0]) ) - { - reporter->Warning("got remote log w/o stream id: %d", - static_cast(broker::which(lm[0]))); - continue; - } - - if ( ! broker::get(lm[1]) ) - { - reporter->Warning("got remote log w/o columns: %d", - static_cast(broker::which(lm[1]))); - continue; - } - - auto stream_id = data_to_val(move(lm[0]), log_id_type); - - if ( ! stream_id ) - { - reporter->Warning("failed to unpack remote log stream id"); - continue; - } - - unref_guard stream_id_unreffer{stream_id}; - auto columns_type = log_mgr->StreamColumns(stream_id->AsEnumVal()); - - if ( ! columns_type ) - { - reporter->Warning("got remote log for unknown stream: %s", - stream_id->Type()->AsEnumType()->Lookup( - stream_id->AsEnum())); - continue; - } - - auto columns = data_to_val(move(lm[1]), columns_type, true); - - if ( ! columns ) - { - reporter->Warning("failed to unpack remote log stream columns" - " for stream: %s", - stream_id->Type()->AsEnumType()->Lookup( - stream_id->AsEnum())); - continue; - } - - log_mgr->Write(stream_id->AsEnumVal(), columns->AsRecordVal()); - Unref(columns); + if ( lm[0] == "create" ) + ProcessCreateLog(std::move(lm)); + else if ( lm[0] == "write" ) + ProcessWriteLog(std::move(lm)); + else + reporter->Warning("got remote log w/o known type: %d", + static_cast(broker::which(lm[0]))); } } @@ -979,6 +988,202 @@ void bro_broker::Manager::Process() next_timestamp = -1; } +bool bro_broker::Manager::ProcessCreateLog(broker::message msg) + { + if ( msg.size() != 5 ) + { + reporter->Warning("got bad remote log create size: %zd (expected 5)", + msg.size()); + return false; + } + + unsigned int idx = 1; // Skip type at index 0. + + // Get stream ID. + + if ( ! broker::get(msg[idx]) ) + { + reporter->Warning("got remote log create w/o stream id: %d", + static_cast(broker::which(msg[idx]))); + return false; + } + + auto stream_id = data_to_val(move(msg[idx]), log_id_type); + + if ( ! stream_id ) + { + reporter->Warning("failed to unpack remote log stream id"); + return false; + } + + unref_guard stream_id_unreffer{stream_id}; + ++idx; + + // Get writer ID. + + if ( ! broker::get(msg[idx]) ) + { + reporter->Warning("got remote log w/o writer id: %d", + static_cast(broker::which(msg[idx]))); + return false; + } + + auto writer_id = data_to_val(move(msg[idx]), writer_id_type); + + if ( ! writer_id ) + { + reporter->Warning("failed to unpack remote log writer id"); + return false; + } + + unref_guard writer_id_unreffer{writer_id}; + ++idx; + + // Get writer info. + + if ( ! broker::get(msg[idx]) ) + { + reporter->Warning("got remote log create w/o writer info id: %d", + static_cast(broker::which(msg[idx]))); + return false; + } + + auto writer_info = std::unique_ptr(new logging::WriterBackend::WriterInfo); + + if ( ! writer_info->FromBroker(std::move(msg[idx])) ) + { + reporter->Warning("failed to unpack remote log writer info"); + return false; + } + + ++idx; + + // Get log fields. + + auto fields_data = broker::get(msg[idx]); + + if ( ! fields_data ) + { + reporter->Warning("failed to unpack remote log fields"); + return false; + } + + auto num_fields = fields_data->size(); + auto fields = new threading::Field* [num_fields]; + + for ( auto i = 0u; i < num_fields; ++i ) + { + if ( auto field = data_to_threading_field((*fields_data)[i]) ) + fields[i] = field; + else + { + reporter->Warning("failed to convert remote log field # %d", i); + return false; + } + } + + if ( ! log_mgr->CreateWriterForRemoteLog(stream_id->AsEnumVal(), writer_id->AsEnumVal(), writer_info.get(), num_fields, fields) ) + { + ODesc d; + stream_id->Describe(&d); + reporter->Warning("failed to create remote log stream for %s locally", d.Description()); + } + + writer_info.release(); // log_mgr took ownership. + return true; + } + +bool bro_broker::Manager::ProcessWriteLog(broker::message msg) + { + if ( msg.size() != 5 ) + { + reporter->Warning("got bad remote log size: %zd (expected 5)", + msg.size()); + return false; + } + + unsigned int idx = 1; // Skip type at index 0. + + // Get stream ID. + + if ( ! broker::get(msg[idx]) ) + { + reporter->Warning("got remote log w/o stream id: %d", + static_cast(broker::which(msg[idx]))); + return false; + } + + auto stream_id = data_to_val(move(msg[idx]), log_id_type); + + if ( ! stream_id ) + { + reporter->Warning("failed to unpack remote log stream id"); + return false; + } + + unref_guard stream_id_unreffer{stream_id}; + ++idx; + + // Get writer ID. + + if ( ! broker::get(msg[idx]) ) + { + reporter->Warning("got remote log w/o writer id: %d", + static_cast(broker::which(msg[idx]))); + return false; + } + + auto writer_id = data_to_val(move(msg[idx]), writer_id_type); + + if ( ! writer_id ) + { + reporter->Warning("failed to unpack remote log writer id"); + return false; + } + + unref_guard writer_id_unreffer{writer_id}; + ++idx; + + // Get path. + + auto path = broker::get(msg[idx]); + + if ( ! path ) + { + reporter->Warning("failed to unpack remote log path"); + return false; + } + + ++idx; + + // Get log values. + + auto vals_data = broker::get(msg[idx]); + + if ( ! vals_data ) + { + reporter->Warning("failed to unpack remote log values"); + return false; + } + + auto num_vals = vals_data->size(); + auto vals = new threading::Value* [num_vals]; + + for ( auto i = 0u; i < num_vals; ++i ) + { + if ( auto val = data_to_threading_val((*vals_data)[i]) ) + vals[i] = val; + else + { + reporter->Warning("failed to convert remote log arg # %d", i); + return false; + } + } + + log_mgr->WriteFromRemote(stream_id->AsEnumVal(), writer_id->AsEnumVal(), *path, num_vals, vals); + return true; + } + bool bro_broker::Manager::AddStore(StoreHandleVal* handle) { if ( ! Enabled() ) diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 9fb7b9e328..10d5019b74 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -156,15 +156,34 @@ public: /** * Send a log entry to any interested peers. The topic name used is * implicitly "bro/log/". - * @param stream_id the stream to which the log entry belongs. - * @param columns the data which comprises the log entry. - * @param info the record type corresponding to the log's columns. + * @param stream the stream to which the log entry belongs. + * @param writer the writer to use for outputting this log entry. + * @param path the log path to output the log entry to. + * @param num_vals the number of fields to log. + * @param vals the log values to log, of size num_vals. * @param flags tune the behavior of how the message is send. * See the Broker::SendFlags record type. * @return true if the message is sent successfully. */ - bool Log(EnumVal* stream_id, RecordVal* columns, RecordType* info, - int flags); + bool Log(EnumVal* stream, EnumVal* writer, string path, int num_vals, + const threading::Value* const * vals, int flags); + + /** + * Send a message to create a log stream to any interested peers. + * The log stream may or may not already exist on the receiving side. + * The topic name used is implicitly "bro/log/". + * @param stream the stream to which the log entry belongs. + * @param writer the writer to use for outputting this log entry. + * @param info backend initialization information for the writer. + * @param num_fields the number of fields the log has. + * @param fields the log's fields of size num_fields. + * @param flags tune the behavior of how the message is send. + * See the Broker::SendFlags record type. + * @param peer If given, send the message only to this peer. + * @return true if the message is sent successfully. + */ + bool CreateLog(EnumVal* id, EnumVal* writer, const logging::WriterBackend::WriterInfo& info, + int num_fields, const threading::Field* const * fields, int flags, const string& peer = ""); /** * Automatically send an event to any interested peers whenever it is @@ -325,7 +344,6 @@ public: static int send_flags_to_int(Val* flags); private: - // IOSource interface overrides: void GetFds(iosource::FD_Set* read, iosource::FD_Set* write, iosource::FD_Set* except) override; @@ -340,6 +358,9 @@ private: broker::endpoint& Endpoint() { return *endpoint; } + bool ProcessCreateLog(broker::message msg); + bool ProcessWriteLog(broker::message msg); + struct QueueWithStats { broker::message_queue q; size_t received = 0; @@ -360,6 +381,7 @@ private: static VectorType* vector_of_data_type; static EnumType* log_id_type; + static EnumType* writer_id_type; static int send_flags_self_idx; static int send_flags_peers_idx; static int send_flags_unsolicited_idx; diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index e1a314d4d6..5bebdebdfb 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -919,12 +919,6 @@ bool Manager::Write(EnumVal* id, RecordVal* columns) #endif } -#ifdef ENABLE_BROKER - if ( stream->enable_remote && - ! broker_mgr->Log(id, columns, stream->columns, stream->remote_flags) ) - stream->enable_remote = false; -#endif - Unref(columns); return true; @@ -1121,23 +1115,46 @@ threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter, return vals; } +bool Manager::CreateWriterForRemoteLog(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info, + int num_fields, const threading::Field* const* fields) + { + return CreateWriter(id, writer, info, num_fields, fields, true, false, true); + } + +static void delete_info_and_fields(WriterBackend::WriterInfo* info, int num_fields, const threading::Field* const* fields) + { + for ( int i = 0; i < num_fields; i++ ) + delete fields[i]; + + delete [] fields; + delete info; + } + WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info, - int num_fields, const threading::Field* const* fields, bool local, bool remote, bool from_remote, + int num_fields, const threading::Field* const* fields, bool local, bool remote, bool from_remote, const string& instantiating_filter) { + WriterFrontend* result = 0; + Stream* stream = FindStream(id); if ( ! stream ) + { // Don't know this stream. + delete_info_and_fields(info, num_fields, fields); return 0; + } Stream::WriterMap::iterator w = stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), info->path)); if ( w != stream->writers.end() ) + { // If we already have a writer for this. That's fine, we just // return it. + delete_info_and_fields(info, num_fields, fields); return w->second->writer; + } WriterInfo* winfo = new WriterInfo; winfo->type = writer->Ref()->AsEnumVal(); @@ -1190,7 +1207,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBacken winfo->info->rotation_interval = winfo->interval; winfo->info->rotation_base = parse_rotate_base_time(base_time); - winfo->writer = new WriterFrontend(*winfo->info, id, writer, local, remote); + winfo->writer = new WriterFrontend(*winfo->info, id, writer, local, remote, stream->remote_flags); winfo->writer->Init(num_fields, fields); InstallRotationTimer(winfo); @@ -1207,8 +1224,8 @@ void Manager::DeleteVals(int num_fields, threading::Value** vals) delete [] vals; } -bool Manager::Write(EnumVal* id, EnumVal* writer, string path, int num_fields, - threading::Value** vals) +bool Manager::WriteFromRemote(EnumVal* id, EnumVal* writer, string path, int num_fields, + threading::Value** vals) { Stream* stream = FindStream(id); @@ -1280,6 +1297,34 @@ void Manager::SendAllWritersTo(RemoteSerializer::PeerID peer) } } +void Manager::SendAllWritersTo(const string& peer) + { +#ifdef ENABLE_BROKER + for ( vector::iterator s = streams.begin(); s != streams.end(); ++s ) + { + Stream* stream = (*s); + + if ( ! stream ) + continue; + + for ( Stream::WriterMap::iterator i = stream->writers.begin(); + i != stream->writers.end(); i++ ) + { + WriterFrontend* writer = i->second->writer; + + EnumVal writer_val(i->first.first, internal_type("Log::Writer")->AsEnumType()); + broker_mgr->CreateLog((*s)->id, + &writer_val, + *i->second->info, + writer->NumFields(), + writer->Fields(), + stream->remote_flags, + peer); + } + } +#endif + } + bool Manager::SetBuf(EnumVal* id, bool enabled) { Stream* stream = FindStream(id); diff --git a/src/logging/Manager.h b/src/logging/Manager.h index 5d3372fb9b..6852160169 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -129,6 +129,52 @@ public: */ bool Write(EnumVal* id, RecordVal* columns); + /** + * Create a new log writer frontend. This is exposed so that the + * communication system can recreated remote log streams locally. + * + * @param stream The enum value corresponding the log stream. + * + * @param writer The enum value corresponding the desired log writer. + * + * @param info A fully initialized object defining the + * characteristics of the backend writer instance. The method takes + * ownership of this. + * + * @param num_fields The number of log fields to write. + * + * @param vals An arry of log fields to write, of size num_fields. + * The method takes ownership of the arry. + * + * @return Returns true if the writer was successfully created. + */ + bool CreateWriterForRemoteLog(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info, + int num_fields, const threading::Field* const* fields); + + /** + * Writes out log entries that have already passed through all + * filters, and have raised any events. This is meant called for logs + * received alrready processed from remote. + * + * @param stream The enum value corresponding the log stream. + * + * @param writer The enum value corresponding the desired log writer. + * + * @param path The path of the target log stream to write to. + * + * @param num_fields The number of log values to write. + * + * @param vals An arry of log values to write, of size num_fields. + * The method takes ownership of the arry. + */ + bool WriteFromRemote(EnumVal* stream, EnumVal* writer, string path, + int num_fields, threading::Value** vals); + + /** + * Announces all instantiated writers to a given Broker peer. + */ + void SendAllWritersTo(const string& peer); + /** * Sets log streams buffering state. This adjusts all associated * writers to the new state. @@ -203,10 +249,6 @@ protected: int num_fields, const threading::Field* const* fields, bool local, bool remote, bool from_remote, const string& instantiating_filter=""); - // Takes ownership of values.. - bool Write(EnumVal* id, EnumVal* writer, string path, - int num_fields, threading::Value** vals); - // Announces all instantiated writers to peer. void SendAllWritersTo(RemoteSerializer::PeerID peer); diff --git a/src/logging/WriterBackend.cc b/src/logging/WriterBackend.cc index 3e868f067a..624b66b8e8 100644 --- a/src/logging/WriterBackend.cc +++ b/src/logging/WriterBackend.cc @@ -119,6 +119,63 @@ bool WriterBackend::WriterInfo::Write(SerializationFormat* fmt) const return true; } +broker::data WriterBackend::WriterInfo::ToBroker() const + { + auto bpath = broker::record::field(path); + auto brotation_base = broker::record::field(rotation_base); + auto brotation_interval = broker::record::field(rotation_interval); + auto bnetwork_time = broker::record::field(network_time); + + auto t = broker::table(); + + for ( config_map::const_iterator i = config.begin(); i != config.end(); ++i ) + { + auto key = std::string(i->first); + auto value = std::string(i->second); + t.insert(std::make_pair(key, value)); + } + + auto bconfig = broker::record::field(move(t)); + + return move(broker::record({bpath, brotation_base, brotation_interval, bnetwork_time, bconfig})); + } + +bool WriterBackend::WriterInfo::FromBroker(broker::data d) + { + auto r = broker::get(d); + + if ( ! r ) + return false; + + auto bpath = broker::get(*r->get(0)); + auto brotation_base = broker::get(*r->get(1)); + auto brotation_interval = broker::get(*r->get(2)); + auto bnetwork_time = broker::get(*r->get(3)); + auto bconfig = broker::get(*r->get(4)); + + if ( ! (bpath && brotation_base && brotation_interval && bnetwork_time && bconfig) ) + return false; + + path = copy_string(bpath->c_str()); + rotation_base = *brotation_base; + rotation_interval = *brotation_interval; + network_time = *bnetwork_time; + + for ( auto i : *bconfig ) + { + auto k = broker::get(i.first); + auto v = broker::get(i.second); + + if ( ! (k && v) ) + return false; + + auto p = std::make_pair(copy_string(k->c_str()), copy_string(v->c_str())); + config.insert(p); + } + + return true; + } + WriterBackend::WriterBackend(WriterFrontend* arg_frontend) : MsgThread() { num_fields = 0; diff --git a/src/logging/WriterBackend.h b/src/logging/WriterBackend.h index 2a93e8fefc..e17b7070c1 100644 --- a/src/logging/WriterBackend.h +++ b/src/logging/WriterBackend.h @@ -6,6 +6,7 @@ #define LOGGING_WRITERBACKEND_H #include "threading/MsgThread.h" +#include "broker/Data.h" #include "Component.h" @@ -110,15 +111,15 @@ public: } } - private: - const WriterInfo& operator=(const WriterInfo& other); // Disable. - - friend class ::RemoteSerializer; - // Note, these need to be adapted when changing the struct's // fields. They serialize/deserialize the struct. bool Read(SerializationFormat* fmt); bool Write(SerializationFormat* fmt) const; + broker::data ToBroker() const; + bool FromBroker(broker::data d); + + private: + const WriterInfo& operator=(const WriterInfo& other); // Disable. }; /** diff --git a/src/logging/WriterFrontend.cc b/src/logging/WriterFrontend.cc index 14e131c755..05f4f6593a 100644 --- a/src/logging/WriterFrontend.cc +++ b/src/logging/WriterFrontend.cc @@ -2,6 +2,10 @@ #include "Net.h" #include "threading/SerialTypes.h" +#ifdef ENABLE_BROKER +#include "broker/Manager.h" +#endif + #include "Manager.h" #include "WriterFrontend.h" #include "WriterBackend.h" @@ -97,7 +101,7 @@ private: using namespace logging; -WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVal* arg_stream, EnumVal* arg_writer, bool arg_local, bool arg_remote) +WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVal* arg_stream, EnumVal* arg_writer, bool arg_local, bool arg_remote, int arg_remote_flags) { stream = arg_stream; writer = arg_writer; @@ -108,6 +112,7 @@ WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVa buf = true; local = arg_local; remote = arg_remote; + remote_flags = arg_remote_flags; write_buffer = 0; write_buffer_pos = 0; info = new WriterBackend::WriterInfo(arg_info); @@ -167,12 +172,23 @@ void WriterFrontend::Init(int arg_num_fields, const Field* const * arg_fields) backend->SendIn(new InitMessage(backend, arg_num_fields, arg_fields)); if ( remote ) + { remote_serializer->SendLogCreateWriter(stream, writer, *info, arg_num_fields, arg_fields); +#ifdef ENABLE_BROKER + broker_mgr->CreateLog(stream, + writer, + *info, + arg_num_fields, + arg_fields, + remote_flags); +#endif + } + } void WriterFrontend::Write(int arg_num_fields, Value** vals) @@ -191,12 +207,21 @@ void WriterFrontend::Write(int arg_num_fields, Value** vals) } if ( remote ) + { remote_serializer->SendLogWrite(stream, writer, info->path, num_fields, vals); + broker_mgr->Log(stream, + writer, + info->path, + num_fields, + vals, + remote_flags); + } + if ( ! backend ) { DeleteVals(arg_num_fields, vals); diff --git a/src/logging/WriterFrontend.h b/src/logging/WriterFrontend.h index e343f326bf..c39d2bdf3c 100644 --- a/src/logging/WriterFrontend.h +++ b/src/logging/WriterFrontend.h @@ -38,12 +38,15 @@ public: * * local: If true, the writer will instantiate a local backend. * - * remote: If true, the writer will forward all data to remote + * remote: If true, the writer will forward logs to remote * clients. * + * remote_flags: Broker flags controlling where remote logs are + * propagated to. + * * Frontends must only be instantiated by the main thread. */ - WriterFrontend(const WriterBackend::WriterInfo& info, EnumVal* stream, EnumVal* writer, bool local, bool remote); + WriterFrontend(const WriterBackend::WriterInfo& info, EnumVal* stream, EnumVal* writer, bool local, bool remote, int arg_remote_flags); /** * Destructor. @@ -214,6 +217,7 @@ protected: bool buf; // True if buffering is enabled (default). bool local; // True if logging locally. bool remote; // True if loggin remotely. + int remote_flags; // Broker propagation flags. const char* name; // Descriptive name of the WriterBackend::WriterInfo* info; // The writer information. diff --git a/testing/btest/Baseline/broker.remote_log/recv.recv.out b/testing/btest/Baseline/broker.remote_log/recv.recv.out index 2f4a31df51..e69de29bb2 100644 --- a/testing/btest/Baseline/broker.remote_log/recv.recv.out +++ b/testing/btest/Baseline/broker.remote_log/recv.recv.out @@ -1,6 +0,0 @@ -wrote log, [msg=ping, nolog=no, num=0] -wrote log, [msg=ping, nolog=no, num=1] -wrote log, [msg=ping, nolog=no, num=2] -wrote log, [msg=ping, nolog=no, num=3] -wrote log, [msg=ping, nolog=no, num=4] -wrote log, [msg=ping, nolog=no, num=5] diff --git a/testing/btest/Baseline/broker.remote_log_types/recv.recv.out b/testing/btest/Baseline/broker.remote_log_types/recv.recv.out new file mode 100644 index 0000000000..e69de29bb2 diff --git a/testing/btest/Baseline/broker.remote_log_types/recv.test.log b/testing/btest/Baseline/broker.remote_log_types/recv.test.log new file mode 100644 index 0000000000..eb2b066cd4 --- /dev/null +++ b/testing/btest/Baseline/broker.remote_log_types/recv.test.log @@ -0,0 +1,10 @@ +#separator \x09 +#set_separator , +#empty_field (empty) +#unset_field - +#path test +#open 2017-02-11-02-17-35 +#fields b i e c p sn a d t iv s sc ss se vc ve f +#types bool int enum count port subnet addr double time interval string set[count] set[string] set[string] vector[count] vector[string] func +T -42 Test::LOG 21 123 10.0.0.0/24 1.2.3.4 3.14 1486779455.703438 100.000000 hurz 1 AA (empty) 10,20,30 (empty) foo\x0a{ \x0aif (0 < i) \x0a\x09return (Foo);\x0aelse\x0a\x09return (Bar);\x0a\x0a} +#close 2017-02-11-02-17-35 diff --git a/testing/btest/Baseline/broker.remote_log_types/send.send.out b/testing/btest/Baseline/broker.remote_log_types/send.send.out new file mode 100644 index 0000000000..632279e697 --- /dev/null +++ b/testing/btest/Baseline/broker.remote_log_types/send.send.out @@ -0,0 +1 @@ +Broker::outgoing_connection_established, 127.0.0.1, 9999/tcp diff --git a/testing/btest/Baseline/broker.remote_log_types/send.test.log b/testing/btest/Baseline/broker.remote_log_types/send.test.log new file mode 100644 index 0000000000..59987c5998 --- /dev/null +++ b/testing/btest/Baseline/broker.remote_log_types/send.test.log @@ -0,0 +1,10 @@ +#separator \x09 +#set_separator , +#empty_field (empty) +#unset_field - +#path test +#open 2017-02-11-02-17-35 +#fields b i e c p sn a d t iv s sc ss se vc ve f +#types bool int enum count port subnet addr double time interval string set[count] set[string] set[string] vector[count] vector[string] func +T -42 Test::LOG 21 123 10.0.0.0/24 1.2.3.4 3.14 1486779455.703438 100.000000 hurz 1 AA (empty) 10,20,30 (empty) foo\x0a{ \x0aif (0 < i) \x0a\x09return (Foo);\x0aelse\x0a\x09return (Bar);\x0a\x0a} +#close 2017-02-11-02-17-36 diff --git a/testing/btest/broker/remote_log.test b/testing/btest/broker/remote_log.test index 5881ad6d92..598f3f6e46 100644 --- a/testing/btest/broker/remote_log.test +++ b/testing/btest/broker/remote_log.test @@ -12,25 +12,28 @@ @TEST-START-FILE common.bro + +global quit_receiver: event(); +global quit_sender: event(); + + module Test; export { - redef enum Log::ID += { LOG }; + redef enum Log::ID += { LOG }; - type Info: record { - msg: string &log; - nolog: string &default="no"; - num: count &log; - }; - - global log_test: event(rec: Test::Info); + type Info: record { + msg: string &log; + nolog: string &default="no"; + num: count &log; + }; } event bro_init() &priority=5 - { - Broker::enable(); - Log::create_stream(Test::LOG, [$columns=Test::Info, $ev=log_test]); - } + { + Broker::enable(); + Log::create_stream(Test::LOG, [$columns=Test::Info]); + } @TEST-END-FILE @@ -40,58 +43,66 @@ const broker_port: port &redef; redef exit_only_after_terminate = T; event bro_init() - { - Broker::subscribe_to_logs("bro/log/"); - Broker::listen(broker_port, "127.0.0.1"); - } + { + Broker::subscribe_to_logs("bro/log/"); + Broker::subscribe_to_events("bro/event/"); + Broker::listen(broker_port, "127.0.0.1"); + } -event Test::log_test(rec: Test::Info) +event quit_receiver() { - print "wrote log", rec; - - if ( rec$num == 5 ) - terminate(); + terminate(); } @TEST-END-FILE + @TEST-START-FILE send.bro const broker_port: port &redef; redef exit_only_after_terminate = T; event bro_init() - { - Broker::enable_remote_logs(Test::LOG); - Broker::connect("127.0.0.1", broker_port, 1secs); - } + { + Broker::enable_remote_logs(Test::LOG); + Broker::connect("127.0.0.1", broker_port, 1secs); + } global n = 0; event do_write() - { - if ( n == 6 ) - return; - else + { + if ( n == 6 ) { - Log::write(Test::LOG, [$msg = "ping", $num = n]); - ++n; - event do_write(); + local args = Broker::event_args(quit_receiver); + Broker::send_event("bro/event/", args); + schedule 1sec { quit_sender() }; } + else + { + Log::write(Test::LOG, [$msg = "ping", $num = n]); + ++n; + event do_write(); + } + } + +event quit_sender() + { + terminate(); } event Broker::outgoing_connection_established(peer_address: string, peer_port: port, peer_name: string) - { - print "Broker::outgoing_connection_established", peer_address, peer_port; - event do_write(); - } + { + print "Broker::outgoing_connection_established", peer_address, peer_port; + event do_write(); + } event Broker::outgoing_connection_broken(peer_address: string, peer_port: port) - { - terminate(); - } + { + terminate(); + } @TEST-END-FILE diff --git a/testing/btest/broker/remote_log_types.test b/testing/btest/broker/remote_log_types.test new file mode 100644 index 0000000000..9089e087cb --- /dev/null +++ b/testing/btest/broker/remote_log_types.test @@ -0,0 +1,133 @@ +# @TEST-SERIALIZE: brokercomm +# @TEST-REQUIRES: grep -q ENABLE_BROKER:BOOL=true $BUILD/CMakeCache.txt + +# @TEST-EXEC: btest-bg-run recv "bro -b ../common.bro ../recv.bro broker_port=$BROKER_PORT >recv.out" +# @TEST-EXEC: btest-bg-run send "bro -b ../common.bro ../send.bro broker_port=$BROKER_PORT >send.out" + +# @TEST-EXEC: btest-bg-wait 20 +# @TEST-EXEC: btest-diff recv/recv.out +# @TEST-EXEC: btest-diff recv/test.log +# @TEST-EXEC: btest-diff send/send.out +# @TEST-EXEC: btest-diff send/test.log + +@TEST-START-FILE common.bro + + +global quit_receiver: event(); +global quit_sender: event(); + + +module Test; + +export { + redef enum Log::ID += { LOG }; + + type Info: record { + b: bool; + i: int; + e: Log::ID; + c: count; + p: port; + sn: subnet; + a: addr; + d: double; + t: time; + iv: interval; + s: string; + sc: set[count]; + ss: set[string]; + se: set[string]; + vc: vector of count; + ve: vector of string; + f: function(i: count) : string; + } &log; + +} + +event bro_init() &priority=5 + { + Broker::enable(); + Log::create_stream(Test::LOG, [$columns=Test::Info]); + } + +@TEST-END-FILE + +@TEST-START-FILE recv.bro + +const broker_port: port &redef; +redef exit_only_after_terminate = T; + +event bro_init() + { + Broker::subscribe_to_logs("bro/log/"); + Broker::subscribe_to_events("bro/event/"); + Broker::listen(broker_port, "127.0.0.1"); + } + +event quit_receiver() + { + terminate(); + } + +@TEST-END-FILE + + +@TEST-START-FILE send.bro + +const broker_port: port &redef; +redef exit_only_after_terminate = T; + +event bro_init() + { + Broker::enable_remote_logs(Test::LOG); + Broker::connect("127.0.0.1", broker_port, 1secs); + } + +event quit_sender() + { + terminate(); + } + +function foo(i : count) : string + { + if ( i > 0 ) + return "Foo"; + else + return "Bar"; + } + +event Broker::outgoing_connection_established(peer_address: string, + peer_port: port, + peer_name: string) + { + print "Broker::outgoing_connection_established", peer_address, peer_port; + + local empty_set: set[string]; + local empty_vector: vector of string; + + Log::write(Test::LOG, [ + $b=T, + $i=-42, + $e=Test::LOG, + $c=21, + $p=123/tcp, + $sn=10.0.0.1/24, + $a=1.2.3.4, + $d=3.14, + $t=network_time(), + $iv=100secs, + $s="hurz", + $sc=set(1), # set(1,2,3,4), # Output not stable for multi-element sets. + $ss=set("AA"), # set("AA", "BB", "CC") # Output not stable for multi-element sets. + $se=empty_set, + $vc=vector(10, 20, 30), + $ve=empty_vector, + $f=foo + ]); + + local args = Broker::event_args(quit_receiver); + Broker::send_event("bro/event/", args); + schedule 1sec { quit_sender() }; + } + +@TEST-END-FILE From b3a18f3c80ced6168487c09fa950e0b9b2c9ea9a Mon Sep 17 00:00:00 2001 From: Vlad Grigorescu Date: Wed, 15 Feb 2017 16:24:21 -0600 Subject: [PATCH 02/16] Kerberos ciphertext had some additional ASN.1 content being lumped in. --- src/analyzer/protocol/krb/krb-types.pac | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/analyzer/protocol/krb/krb-types.pac b/src/analyzer/protocol/krb/krb-types.pac index bb2bfba3e8..3b3b9d1f09 100644 --- a/src/analyzer/protocol/krb/krb-types.pac +++ b/src/analyzer/protocol/krb/krb-types.pac @@ -95,7 +95,7 @@ RecordVal* proc_ticket(const KRB_Ticket* ticket) rv->Assign(1, bytestring_to_val(ticket->realm()->data()->content())); rv->Assign(2, GetStringFromPrincipalName(ticket->sname())); rv->Assign(3, asn1_integer_to_val(ticket->enc_part()->data()->etype()->data(), TYPE_COUNT)); - rv->Assign(4, bytestring_to_val(ticket->enc_part()->data()->ciphertext())); + rv->Assign(4, bytestring_to_val(ticket->enc_part()->data()->ciphertext()->encoding()->content())); return rv; } @@ -162,7 +162,7 @@ type KRB_Encrypted_Data = record { true -> next_meta: ASN1EncodingMeta; false -> none_meta: empty; }; - ciphertext : bytestring &length=have_kvno ? next_meta.length : kvno_meta.length; + ciphertext : ASN1OctetString &length=have_kvno ? next_meta.length : kvno_meta.length; } &let { have_kvno : bool = kvno_meta.index == 1; }; From 511ca9e043c06729314db2f2c55cbb330b8d2d0f Mon Sep 17 00:00:00 2001 From: Robin Sommer Date: Fri, 17 Feb 2017 16:28:20 -0800 Subject: [PATCH 03/16] Adding Broker ifdefs for new remote logging code. --- src/logging/Manager.cc | 4 ++++ src/logging/Manager.h | 4 ++-- src/logging/WriterBackend.cc | 2 ++ src/logging/WriterBackend.h | 6 ++++++ src/logging/WriterFrontend.cc | 2 ++ 5 files changed, 16 insertions(+), 2 deletions(-) diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 5bebdebdfb..8c720137e8 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -1207,7 +1207,11 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBacken winfo->info->rotation_interval = winfo->interval; winfo->info->rotation_base = parse_rotate_base_time(base_time); +#ifdef ENABLE_BROKER winfo->writer = new WriterFrontend(*winfo->info, id, writer, local, remote, stream->remote_flags); +#else + winfo->writer = new WriterFrontend(*winfo->info, id, writer, local, remote, 0); +#endif winfo->writer->Init(num_fields, fields); InstallRotationTimer(winfo); diff --git a/src/logging/Manager.h b/src/logging/Manager.h index 6852160169..3334942a3a 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -131,7 +131,7 @@ public: /** * Create a new log writer frontend. This is exposed so that the - * communication system can recreated remote log streams locally. + * communication system can recreate remote log streams locally. * * @param stream The enum value corresponding the log stream. * @@ -153,7 +153,7 @@ public: /** * Writes out log entries that have already passed through all - * filters, and have raised any events. This is meant called for logs + * filters (and have raised any events). This is meant called for logs * received alrready processed from remote. * * @param stream The enum value corresponding the log stream. diff --git a/src/logging/WriterBackend.cc b/src/logging/WriterBackend.cc index 624b66b8e8..e2afd360e7 100644 --- a/src/logging/WriterBackend.cc +++ b/src/logging/WriterBackend.cc @@ -119,6 +119,7 @@ bool WriterBackend::WriterInfo::Write(SerializationFormat* fmt) const return true; } +#ifdef ENABLE_BROKER broker::data WriterBackend::WriterInfo::ToBroker() const { auto bpath = broker::record::field(path); @@ -175,6 +176,7 @@ bool WriterBackend::WriterInfo::FromBroker(broker::data d) return true; } +#endif WriterBackend::WriterBackend(WriterFrontend* arg_frontend) : MsgThread() { diff --git a/src/logging/WriterBackend.h b/src/logging/WriterBackend.h index e17b7070c1..f6602cc8ca 100644 --- a/src/logging/WriterBackend.h +++ b/src/logging/WriterBackend.h @@ -6,7 +6,10 @@ #define LOGGING_WRITERBACKEND_H #include "threading/MsgThread.h" + +#ifdef ENABLE_BROKER #include "broker/Data.h" +#endif #include "Component.h" @@ -115,8 +118,11 @@ public: // fields. They serialize/deserialize the struct. bool Read(SerializationFormat* fmt); bool Write(SerializationFormat* fmt) const; + +#ifdef ENABLE_BROKER broker::data ToBroker() const; bool FromBroker(broker::data d); +#endif private: const WriterInfo& operator=(const WriterInfo& other); // Disable. diff --git a/src/logging/WriterFrontend.cc b/src/logging/WriterFrontend.cc index 05f4f6593a..0c2378890f 100644 --- a/src/logging/WriterFrontend.cc +++ b/src/logging/WriterFrontend.cc @@ -214,12 +214,14 @@ void WriterFrontend::Write(int arg_num_fields, Value** vals) num_fields, vals); +#ifdef ENABLE_BROKER broker_mgr->Log(stream, writer, info->path, num_fields, vals, remote_flags); +#endif } if ( ! backend ) From 7bbaa911b0ba4e102a17fc8e0f882aeb6509c235 Mon Sep 17 00:00:00 2001 From: Seth Hall Date: Wed, 22 Feb 2017 00:02:51 -0500 Subject: [PATCH 04/16] I missed one test I needed to update for the kerberos commit that I just pushed. --- .../kerberos.log | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/testing/btest/Baseline/scripts.policy.protocols.krb.ticket-logging/kerberos.log b/testing/btest/Baseline/scripts.policy.protocols.krb.ticket-logging/kerberos.log index aee00fbf43..5645378a7e 100644 --- a/testing/btest/Baseline/scripts.policy.protocols.krb.ticket-logging/kerberos.log +++ b/testing/btest/Baseline/scripts.policy.protocols.krb.ticket-logging/kerberos.log @@ -3,8 +3,8 @@ #empty_field (empty) #unset_field - #path kerberos -#open 2017-02-18-18-38-58 +#open 2017-02-22-05-02-14 #fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p request_type client service success error_msg from till cipher forwardable renewable client_cert_subject client_cert_fuid server_cert_subject server_cert_fuid auth_ticket new_ticket #types time string addr port addr port string string string bool string time time string bool bool string string string string string string -1429583645.478441 CHhAvVGS1DHFjwGM9 192.168.1.31 64889 192.168.1.32 88 TGS vladg/VLADG.NET krbtgt/VLADG.NET T - - 0.000000 aes256-cts-hmac-sha1-96 T F - - - - ea0f395c3823e8cc7216d82e2405b428 fa8064009b03606f84e475feca5dcd12 -#close 2017-02-18-18-38-58 +1429583645.478441 CHhAvVGS1DHFjwGM9 192.168.1.31 64889 192.168.1.32 88 TGS vladg/VLADG.NET krbtgt/VLADG.NET T - - 0.000000 aes256-cts-hmac-sha1-96 T F - - - - a09fbd89918320cc12a26d4f0c4e6aa2 396a9d9e8975cc5024a83c6e86101f06 +#close 2017-02-22-05-02-14 From ae6dbf17a23ecaf7ed2c38068d3fd8f3373928bb Mon Sep 17 00:00:00 2001 From: Johanna Amann Date: Thu, 23 Feb 2017 09:59:48 -0800 Subject: [PATCH 05/16] Input Manager: tiny error message fix. --- src/input/Manager.cc | 2 +- .../scripts.base.frameworks.input.missing-enum/bro..stderr | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/input/Manager.cc b/src/input/Manager.cc index b84d822101..d029e38092 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -2380,7 +2380,7 @@ Val* Manager::ValueToVal(const Stream* i, const Value* val, BroType* request_typ bro_int_t index = request_type->AsEnumType()->Lookup(module, var.c_str()); if ( index == -1 ) { - Warning(i, "Value not '%s' for stream '%s' is not a valid enum.", + Warning(i, "Value '%s' for stream '%s' is not a valid enum.", enum_string.c_str(), i->name.c_str()); have_error = true; diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.missing-enum/bro..stderr b/testing/btest/Baseline/scripts.base.frameworks.input.missing-enum/bro..stderr index 20207bcf94..8cd0c5ab6c 100644 --- a/testing/btest/Baseline/scripts.base.frameworks.input.missing-enum/bro..stderr +++ b/testing/btest/Baseline/scripts.base.frameworks.input.missing-enum/bro..stderr @@ -1,2 +1,2 @@ -warning: Value not 'IdoNot::Exist' for stream 'enum' is not a valid enum. +warning: Value 'IdoNot::Exist' for stream 'enum' is not a valid enum. received termination signal From e0a72b6e5c7bd9ad0b839d8a4b9297bb8a0211f7 Mon Sep 17 00:00:00 2001 From: Robin Sommer Date: Thu, 23 Feb 2017 10:18:57 -0800 Subject: [PATCH 06/16] Updating submodule. --- CHANGES | 8 ++++++++ VERSION | 2 +- aux/btest | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/CHANGES b/CHANGES index 94f310e6b8..b9cef9e78a 100644 --- a/CHANGES +++ b/CHANGES @@ -1,4 +1,12 @@ +2.5-76 | 2017-02-23 10:19:57 -0800 + + * Kerberos ciphertext had some additional ASN.1 content being lumped + in. (Vlad Grigorescu) + + * Updated Windows version detection to include Windows 10. (Fatema + Bannatwala, Keith Lehigh, Mike, Seth Hall). + 2.5-70 | 2017-02-20 00:20:02 -0500 * Rework the RADIUS base script. diff --git a/VERSION b/VERSION index b130560010..2087ec374d 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.5-70 +2.5-76 diff --git a/aux/btest b/aux/btest index 9d5c7bcac9..2fb0e089b3 160000 --- a/aux/btest +++ b/aux/btest @@ -1 +1 @@ -Subproject commit 9d5c7bcac9b04710931bc8a42b545f0691561b2f +Subproject commit 2fb0e089b37cc62b4616c8d4309ee80fb7fc6a27 From 5b7636619984b5fd4a56e13f13bf973d014d2b59 Mon Sep 17 00:00:00 2001 From: Johanna Amann Date: Thu, 23 Feb 2017 15:00:13 -0800 Subject: [PATCH 07/16] Plugin: add/fix documentation for HookSetupAnalyzerTree --- src/plugin/Plugin.h | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/plugin/Plugin.h b/src/plugin/Plugin.h index 54451dcfb6..49fa7cdd84 100644 --- a/src/plugin/Plugin.h +++ b/src/plugin/Plugin.h @@ -39,7 +39,7 @@ enum HookType { HOOK_DRAIN_EVENTS, //< Activates Plugin::HookDrainEvents() HOOK_UPDATE_NETWORK_TIME, //< Activates Plugin::HookUpdateNetworkTime. HOOK_BRO_OBJ_DTOR, //< Activates Plugin::HookBroObjDtor. - HOOK_SETUP_ANALYZER_TREE, //< Activates Plugin::HookAddToAnalyzerTree + HOOK_SETUP_ANALYZER_TREE, //< Activates Plugin::HookSetupAnalyzerTree // Meta hooks. META_HOOK_PRE, //< Activates Plugin::MetaHookPre(). @@ -642,6 +642,13 @@ protected: */ virtual void HookUpdateNetworkTime(double network_time); + /** + * Hook that executes when a connection's initial analyzer tree + * has been fully set up. The hook can manipulate the tree at this time, + * for example by adding further analyzers. + * + * @param conn The connection. + */ virtual void HookSetupAnalyzerTree(Connection *conn); /** From 5cf7803e685eb147808934a372538c6301035f97 Mon Sep 17 00:00:00 2001 From: Robin Sommer Date: Thu, 23 Feb 2017 17:18:43 -0800 Subject: [PATCH 08/16] Fix some minor issues. From Daniel, thanks! --- aux/binpac | 2 +- aux/bro-aux | 2 +- aux/broccoli | 2 +- aux/broctl | 2 +- aux/broker | 2 +- aux/btest | 2 +- aux/plugins | 2 +- src/broker/Data.cc | 11 +++++------ src/broker/Data.h | 2 +- src/broker/Manager.cc | 11 ++++++----- src/broker/Manager.h | 10 +++++----- src/logging/Manager.h | 18 +++++++++--------- src/logging/WriterBackend.cc | 4 ++-- src/logging/WriterFrontend.h | 4 +--- 14 files changed, 36 insertions(+), 38 deletions(-) diff --git a/aux/binpac b/aux/binpac index a0990e61ad..0f1ecfa972 160000 --- a/aux/binpac +++ b/aux/binpac @@ -1 +1 @@ -Subproject commit a0990e61ad4a3705bda4cc5a20059af2d1bda4c3 +Subproject commit 0f1ecfa97236635fb93e013404e6b30d6c506ddd diff --git a/aux/bro-aux b/aux/bro-aux index 7660b5f4c5..b1e75f6a21 160000 --- a/aux/bro-aux +++ b/aux/bro-aux @@ -1 +1 @@ -Subproject commit 7660b5f4c5be40aa5f3a7c8746fdcf68331f9b93 +Subproject commit b1e75f6a212250b1730a438f27fc778618b67ec3 diff --git a/aux/broccoli b/aux/broccoli index 765eab50f7..ed52e3414b 160000 --- a/aux/broccoli +++ b/aux/broccoli @@ -1 +1 @@ -Subproject commit 765eab50f7796fdb3c308fe9232cd7891f098c67 +Subproject commit ed52e3414b31b05ec9abed627b4153c8e2243441 diff --git a/aux/broctl b/aux/broctl index f6d451520e..73dbc79ac2 160000 --- a/aux/broctl +++ b/aux/broctl @@ -1 +1 @@ -Subproject commit f6d451520eaaaae97aab6df2bb4e0aecb6b63e66 +Subproject commit 73dbc79ac24cdfef07d8574a4da5d43056ba5fa5 diff --git a/aux/broker b/aux/broker index 68a36ed814..23def70c44 160000 --- a/aux/broker +++ b/aux/broker @@ -1 +1 @@ -Subproject commit 68a36ed81480ba935268bcaf7b6f2249d23436da +Subproject commit 23def70c44128d19138029615dd154359286e111 diff --git a/aux/btest b/aux/btest index 32e582514a..2fb0e089b3 160000 --- a/aux/btest +++ b/aux/btest @@ -1 +1 @@ -Subproject commit 32e582514ae044befa8e0511083bf11a51408a1d +Subproject commit 2fb0e089b37cc62b4616c8d4309ee80fb7fc6a27 diff --git a/aux/plugins b/aux/plugins index 0a2f021527..2322840bcd 160000 --- a/aux/plugins +++ b/aux/plugins @@ -1 +1 @@ -Subproject commit 0a2f0215270e6ceaf9c1312f705b95d2cce1b530 +Subproject commit 2322840bcdbd618ae7bd24e22d874fb30ab89bbb diff --git a/src/broker/Data.cc b/src/broker/Data.cc index 35d815b9e8..5da6f5d4e0 100644 --- a/src/broker/Data.cc +++ b/src/broker/Data.cc @@ -776,7 +776,6 @@ static broker::util::optional threading_val_to_data_internal(TypeT auto s = broker::address(reinterpret_cast(&tmp), broker::address::family::ipv6, broker::address::byte_order::network); - fprintf(stderr, "%d\n", val.subnet_val.length); return {broker::subnet(s, length)}; } @@ -863,7 +862,7 @@ struct threading_val_converter { using result_type = bool; TypeTag type; - threading::Value::_val& val; + threading::Value::_val& val; result_type operator()(bool a) { @@ -1053,9 +1052,9 @@ struct threading_val_converter { if ( type == TYPE_VECTOR ) { val.vector_val.size = a.size(); - val.vector_val.vals = new threading::Value* [val.set_val.size]; + val.vector_val.vals = new threading::Value* [val.vector_val.size]; - auto p = val.set_val.vals; + auto p = val.vector_val.vals; for ( auto& i : a ) *p++ = bro_broker::data_to_threading_val(move(i)); @@ -1079,8 +1078,8 @@ threading::Value* bro_broker::data_to_threading_val(broker::data d) if ( ! r ) return nullptr; - auto type = broker::get(*r->get(0));; - auto present = broker::get(*r->get(1));; + auto type = broker::get(*r->get(0)); + auto present = broker::get(*r->get(1)); auto data = *r->get(2); if ( ! (type && present) ) diff --git a/src/broker/Data.h b/src/broker/Data.h index 9e0c8120de..526304b00c 100644 --- a/src/broker/Data.h +++ b/src/broker/Data.h @@ -70,7 +70,7 @@ broker::util::optional threading_val_to_data(const threading::Valu /** * Convert a Bro threading::Field to a Broker data value. - * @param v a Bro threading::Field. + * @param f a Bro threading::Field. * @return a Broker data value if the Bro threading::Field could be converted to one. */ broker::data threading_field_to_data(const threading::Field* f); diff --git a/src/broker/Manager.cc b/src/broker/Manager.cc index 76040b129c..eebcd2792f 100644 --- a/src/broker/Manager.cc +++ b/src/broker/Manager.cc @@ -214,9 +214,10 @@ bool bro_broker::Manager::Event(std::string topic, broker::message msg, int flag return true; } -bool bro_broker::Manager::CreateLog(EnumVal* stream, EnumVal* writer, const logging::WriterBackend::WriterInfo& info, - int num_fields, const threading::Field* const * fields, int flags, - const string& peer) +bool bro_broker::Manager::CreateLog(EnumVal* stream, EnumVal* writer, + const logging::WriterBackend::WriterInfo& info, + int num_fields, const threading::Field* const * fields, + int flags, const string& peer) { if ( ! Enabled() ) return false; @@ -1003,7 +1004,7 @@ bool bro_broker::Manager::ProcessCreateLog(broker::message msg) if ( ! broker::get(msg[idx]) ) { - reporter->Warning("got remote log create w/o stream id: %d", + reporter->Warning("got remote log create w/o stream id: %d", static_cast(broker::which(msg[idx]))); return false; } @@ -1023,7 +1024,7 @@ bool bro_broker::Manager::ProcessCreateLog(broker::message msg) if ( ! broker::get(msg[idx]) ) { - reporter->Warning("got remote log w/o writer id: %d", + reporter->Warning("got remote log create w/o writer id: %d", static_cast(broker::which(msg[idx]))); return false; } diff --git a/src/broker/Manager.h b/src/broker/Manager.h index 10d5019b74..d5701c56b1 100644 --- a/src/broker/Manager.h +++ b/src/broker/Manager.h @@ -169,20 +169,20 @@ public: const threading::Value* const * vals, int flags); /** - * Send a message to create a log stream to any interested peers. - * The log stream may or may not already exist on the receiving side. - * The topic name used is implicitly "bro/log/". + * Send a message to create a log stream to any interested peers. + * The log stream may or may not already exist on the receiving side. + * The topic name used is implicitly "bro/log/". * @param stream the stream to which the log entry belongs. * @param writer the writer to use for outputting this log entry. * @param info backend initialization information for the writer. * @param num_fields the number of fields the log has. - * @param fields the log's fields of size num_fields. + * @param fields the log's fields, of size num_fields. * @param flags tune the behavior of how the message is send. * See the Broker::SendFlags record type. * @param peer If given, send the message only to this peer. * @return true if the message is sent successfully. */ - bool CreateLog(EnumVal* id, EnumVal* writer, const logging::WriterBackend::WriterInfo& info, + bool CreateLog(EnumVal* stream, EnumVal* writer, const logging::WriterBackend::WriterInfo& info, int num_fields, const threading::Field* const * fields, int flags, const string& peer = ""); /** diff --git a/src/logging/Manager.h b/src/logging/Manager.h index 3334942a3a..56626f39d3 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -133,9 +133,9 @@ public: * Create a new log writer frontend. This is exposed so that the * communication system can recreate remote log streams locally. * - * @param stream The enum value corresponding the log stream. + * @param id The enum value corresponding to the log stream. * - * @param writer The enum value corresponding the desired log writer. + * @param writer The enum value corresponding to the desired log writer. * * @param info A fully initialized object defining the * characteristics of the backend writer instance. The method takes @@ -143,8 +143,8 @@ public: * * @param num_fields The number of log fields to write. * - * @param vals An arry of log fields to write, of size num_fields. - * The method takes ownership of the arry. + * @param vals An array of log fields to write, of size num_fields. + * The method takes ownership of the array. * * @return Returns true if the writer was successfully created. */ @@ -154,18 +154,18 @@ public: /** * Writes out log entries that have already passed through all * filters (and have raised any events). This is meant called for logs - * received alrready processed from remote. + * received already processed from remote. * - * @param stream The enum value corresponding the log stream. + * @param stream The enum value corresponding to the log stream. * - * @param writer The enum value corresponding the desired log writer. + * @param writer The enum value corresponding to the desired log writer. * * @param path The path of the target log stream to write to. * * @param num_fields The number of log values to write. * - * @param vals An arry of log values to write, of size num_fields. - * The method takes ownership of the arry. + * @param vals An array of log values to write, of size num_fields. + * The method takes ownership of the array. */ bool WriteFromRemote(EnumVal* stream, EnumVal* writer, string path, int num_fields, threading::Value** vals); diff --git a/src/logging/WriterBackend.cc b/src/logging/WriterBackend.cc index e2afd360e7..ebf5165169 100644 --- a/src/logging/WriterBackend.cc +++ b/src/logging/WriterBackend.cc @@ -130,11 +130,11 @@ broker::data WriterBackend::WriterInfo::ToBroker() const auto t = broker::table(); for ( config_map::const_iterator i = config.begin(); i != config.end(); ++i ) - { + { auto key = std::string(i->first); auto value = std::string(i->second); t.insert(std::make_pair(key, value)); - } + } auto bconfig = broker::record::field(move(t)); diff --git a/src/logging/WriterFrontend.h b/src/logging/WriterFrontend.h index c39d2bdf3c..ae37613261 100644 --- a/src/logging/WriterFrontend.h +++ b/src/logging/WriterFrontend.h @@ -34,8 +34,6 @@ public: * * info: The meta information struct for the writer. * - * writer_name: A descriptive name for the writer's type. - * * local: If true, the writer will instantiate a local backend. * * remote: If true, the writer will forward logs to remote @@ -46,7 +44,7 @@ public: * * Frontends must only be instantiated by the main thread. */ - WriterFrontend(const WriterBackend::WriterInfo& info, EnumVal* stream, EnumVal* writer, bool local, bool remote, int arg_remote_flags); + WriterFrontend(const WriterBackend::WriterInfo& info, EnumVal* stream, EnumVal* writer, bool local, bool remote, int remote_flags); /** * Destructor. From 22c89a83f5cf95ed77dcde11898a7bc55df54499 Mon Sep 17 00:00:00 2001 From: Johanna Amann Date: Fri, 24 Feb 2017 09:02:16 -0800 Subject: [PATCH 09/16] Update submodule [nomail] --- aux/bro-aux | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aux/bro-aux b/aux/bro-aux index b1e75f6a21..9f33570d53 160000 --- a/aux/bro-aux +++ b/aux/bro-aux @@ -1 +1 @@ -Subproject commit b1e75f6a212250b1730a438f27fc778618b67ec3 +Subproject commit 9f33570d53e1b970d7905e305940fd55637c5c76 From 0f695a7316fe2586fb1010675f45d549ffd00568 Mon Sep 17 00:00:00 2001 From: Daniel Thayer Date: Sat, 25 Feb 2017 21:53:02 -0600 Subject: [PATCH 10/16] Fix a test that sometimes fails on FreeBSD --- testing/btest/istate/pybroccoli.py | 1 + 1 file changed, 1 insertion(+) diff --git a/testing/btest/istate/pybroccoli.py b/testing/btest/istate/pybroccoli.py index 7600c2b7d4..0d7106d592 100644 --- a/testing/btest/istate/pybroccoli.py +++ b/testing/btest/istate/pybroccoli.py @@ -4,6 +4,7 @@ # @TEST-REQUIRES: test -e $BUILD/aux/broccoli/bindings/broccoli-python/_broccoli_intern.so # # @TEST-EXEC: btest-bg-run bro bro %INPUT $DIST/aux/broccoli/bindings/broccoli-python/tests/test.bro +# @TEST-EXEC: sleep 2 # @TEST-EXEC: btest-bg-run python PYTHONPATH=$DIST/aux/broccoli/bindings/broccoli-python/:$BUILD/aux/broccoli/bindings/broccoli-python python $DIST/aux/broccoli/bindings/broccoli-python/tests/test.py # @TEST-EXEC: btest-bg-wait -k 20 # @TEST-EXEC: btest-diff bro/.stdout From 58a2d06c93189cea3f91efe2566fbe1703b9dc72 Mon Sep 17 00:00:00 2001 From: Robin Sommer Date: Mon, 27 Feb 2017 08:22:16 -0800 Subject: [PATCH 11/16] Another fix for the new Broker-based remote logging. --- src/broker/Data.cc | 7 ++-- .../Baseline/broker.remote_log/recv.test.log | 4 +-- .../Baseline/broker.remote_log/send.test.log | 4 +-- testing/btest/broker/remote_log.test | 33 ++++++++++--------- testing/btest/broker/remote_log_types.test | 3 +- 5 files changed, 28 insertions(+), 23 deletions(-) diff --git a/src/broker/Data.cc b/src/broker/Data.cc index 5da6f5d4e0..6420144193 100644 --- a/src/broker/Data.cc +++ b/src/broker/Data.cc @@ -1080,16 +1080,19 @@ threading::Value* bro_broker::data_to_threading_val(broker::data d) auto type = broker::get(*r->get(0)); auto present = broker::get(*r->get(1)); - auto data = *r->get(2); + auto data = r->get(2); if ( ! (type && present) ) return nullptr; + if ( *present && ! data ) + return nullptr; + auto tv = new threading::Value; tv->type = static_cast(*type); tv->present = *present; - if ( present && ! broker::visit(threading_val_converter{tv->type, tv->val}, data) ) + if ( *present && ! broker::visit(threading_val_converter{tv->type, tv->val}, *data) ) { delete tv; return nullptr; diff --git a/testing/btest/Baseline/broker.remote_log/recv.test.log b/testing/btest/Baseline/broker.remote_log/recv.test.log index 0d6dae756c..b79e1b53b9 100644 --- a/testing/btest/Baseline/broker.remote_log/recv.test.log +++ b/testing/btest/Baseline/broker.remote_log/recv.test.log @@ -3,7 +3,7 @@ #empty_field (empty) #unset_field - #path test -#open 2015-01-26-22-47-11 +#open 2017-02-27-16-21-20 #fields msg num #types string count ping 0 @@ -12,4 +12,4 @@ ping 2 ping 3 ping 4 ping 5 -#close 2015-01-26-22-47-11 +#close 2017-02-27-16-21-20 diff --git a/testing/btest/Baseline/broker.remote_log/send.test.log b/testing/btest/Baseline/broker.remote_log/send.test.log index 0d6dae756c..93862c656b 100644 --- a/testing/btest/Baseline/broker.remote_log/send.test.log +++ b/testing/btest/Baseline/broker.remote_log/send.test.log @@ -3,7 +3,7 @@ #empty_field (empty) #unset_field - #path test -#open 2015-01-26-22-47-11 +#open 2017-02-27-16-21-19 #fields msg num #types string count ping 0 @@ -12,4 +12,4 @@ ping 2 ping 3 ping 4 ping 5 -#close 2015-01-26-22-47-11 +#close 2017-02-27-16-21-20 diff --git a/testing/btest/broker/remote_log.test b/testing/btest/broker/remote_log.test index 598f3f6e46..df8f8d8d8a 100644 --- a/testing/btest/broker/remote_log.test +++ b/testing/btest/broker/remote_log.test @@ -45,14 +45,14 @@ redef exit_only_after_terminate = T; event bro_init() { Broker::subscribe_to_logs("bro/log/"); - Broker::subscribe_to_events("bro/event/"); + Broker::subscribe_to_events("bro/event/"); Broker::listen(broker_port, "127.0.0.1"); } event quit_receiver() - { - terminate(); - } + { + terminate(); + } @TEST-END-FILE @@ -63,21 +63,22 @@ const broker_port: port &redef; redef exit_only_after_terminate = T; event bro_init() - { - Broker::enable_remote_logs(Test::LOG); - Broker::connect("127.0.0.1", broker_port, 1secs); - } + { + Broker::enable_remote_logs(Test::LOG); + Broker::publish_topic("bro/event/"); + Broker::connect("127.0.0.1", broker_port, 1secs); + } global n = 0; event do_write() { if ( n == 6 ) - { - local args = Broker::event_args(quit_receiver); - Broker::send_event("bro/event/", args); - schedule 1sec { quit_sender() }; - } + { + local args = Broker::event_args(quit_receiver); + Broker::send_event("bro/event/", args); + schedule 1sec { quit_sender() }; + } else { Log::write(Test::LOG, [$msg = "ping", $num = n]); @@ -87,9 +88,9 @@ event do_write() } event quit_sender() - { - terminate(); - } + { + terminate(); + } event Broker::outgoing_connection_established(peer_address: string, peer_port: port, diff --git a/testing/btest/broker/remote_log_types.test b/testing/btest/broker/remote_log_types.test index 9089e087cb..48011a8c07 100644 --- a/testing/btest/broker/remote_log_types.test +++ b/testing/btest/broker/remote_log_types.test @@ -60,7 +60,7 @@ redef exit_only_after_terminate = T; event bro_init() { Broker::subscribe_to_logs("bro/log/"); - Broker::subscribe_to_events("bro/event/"); + Broker::subscribe_to_events("bro/event/"); Broker::listen(broker_port, "127.0.0.1"); } @@ -80,6 +80,7 @@ redef exit_only_after_terminate = T; event bro_init() { Broker::enable_remote_logs(Test::LOG); + Broker::publish_topic("bro/event/"); Broker::connect("127.0.0.1", broker_port, 1secs); } From 01a39436353d6df202b152e307b6453c990c7fd2 Mon Sep 17 00:00:00 2001 From: Seth Hall Date: Tue, 28 Feb 2017 12:40:01 -0500 Subject: [PATCH 12/16] Do some updates to remove build time warnings. The linker was complaining about linking files that didn't have any symbols. These were actually empty files so I just got rid of them and removed references to them. --- src/analyzer/protocol/ayiya/AYIYA.cc | 1 - src/analyzer/protocol/ayiya/CMakeLists.txt | 1 - src/analyzer/protocol/ayiya/ayiya.pac | 4 ---- src/analyzer/protocol/ayiya/events.bif | 0 src/analyzer/protocol/gssapi/CMakeLists.txt | 2 +- src/analyzer/protocol/gssapi/gssapi.pac | 1 - src/analyzer/protocol/gssapi/types.bif | 1 - src/analyzer/protocol/pia/CMakeLists.txt | 1 - src/analyzer/protocol/pia/PIA.cc | 2 -- src/analyzer/protocol/pia/events.bif | 0 src/analyzer/protocol/zip/CMakeLists.txt | 1 - src/analyzer/protocol/zip/ZIP.cc | 2 -- src/analyzer/protocol/zip/events.bif | 0 .../canonified_loaded_scripts.log | 8 ++------ .../canonified_loaded_scripts.log | 8 ++------ testing/btest/Baseline/plugins.hooks/output | 20 ++++++------------- 16 files changed, 11 insertions(+), 41 deletions(-) delete mode 100644 src/analyzer/protocol/ayiya/events.bif delete mode 100644 src/analyzer/protocol/gssapi/types.bif delete mode 100644 src/analyzer/protocol/pia/events.bif delete mode 100644 src/analyzer/protocol/zip/events.bif diff --git a/src/analyzer/protocol/ayiya/AYIYA.cc b/src/analyzer/protocol/ayiya/AYIYA.cc index a1e00e9b38..9c4ac237ab 100644 --- a/src/analyzer/protocol/ayiya/AYIYA.cc +++ b/src/analyzer/protocol/ayiya/AYIYA.cc @@ -1,7 +1,6 @@ #include "AYIYA.h" #include "Func.h" -#include "events.bif.h" using namespace analyzer::ayiya; diff --git a/src/analyzer/protocol/ayiya/CMakeLists.txt b/src/analyzer/protocol/ayiya/CMakeLists.txt index ae23c25e2d..50113b72d7 100644 --- a/src/analyzer/protocol/ayiya/CMakeLists.txt +++ b/src/analyzer/protocol/ayiya/CMakeLists.txt @@ -5,6 +5,5 @@ include_directories(BEFORE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DI bro_plugin_begin(Bro AYIYA) bro_plugin_cc(AYIYA.cc Plugin.cc) -bro_plugin_bif(events.bif) bro_plugin_pac(ayiya.pac ayiya-protocol.pac ayiya-analyzer.pac) bro_plugin_end() diff --git a/src/analyzer/protocol/ayiya/ayiya.pac b/src/analyzer/protocol/ayiya/ayiya.pac index b1f3a6ef77..ff0af4d47c 100644 --- a/src/analyzer/protocol/ayiya/ayiya.pac +++ b/src/analyzer/protocol/ayiya/ayiya.pac @@ -2,10 +2,6 @@ %include binpac.pac %include bro.pac -%extern{ -#include "events.bif.h" -%} - analyzer AYIYA withcontext { connection: AYIYA_Conn; flow: AYIYA_Flow; diff --git a/src/analyzer/protocol/ayiya/events.bif b/src/analyzer/protocol/ayiya/events.bif deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/src/analyzer/protocol/gssapi/CMakeLists.txt b/src/analyzer/protocol/gssapi/CMakeLists.txt index 222c3cdf4e..d826d36bf7 100644 --- a/src/analyzer/protocol/gssapi/CMakeLists.txt +++ b/src/analyzer/protocol/gssapi/CMakeLists.txt @@ -5,7 +5,7 @@ include_directories(BEFORE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DI bro_plugin_begin(Bro GSSAPI) bro_plugin_cc(GSSAPI.cc Plugin.cc) -bro_plugin_bif(types.bif events.bif) +bro_plugin_bif(events.bif) bro_plugin_pac( gssapi.pac gssapi-protocol.pac diff --git a/src/analyzer/protocol/gssapi/gssapi.pac b/src/analyzer/protocol/gssapi/gssapi.pac index 07759e8daa..55b7fe4255 100644 --- a/src/analyzer/protocol/gssapi/gssapi.pac +++ b/src/analyzer/protocol/gssapi/gssapi.pac @@ -5,7 +5,6 @@ #include "analyzer/Manager.h" #include "analyzer/Analyzer.h" -#include "types.bif.h" #include "events.bif.h" %} diff --git a/src/analyzer/protocol/gssapi/types.bif b/src/analyzer/protocol/gssapi/types.bif deleted file mode 100644 index 996cee9ad8..0000000000 --- a/src/analyzer/protocol/gssapi/types.bif +++ /dev/null @@ -1 +0,0 @@ -# Empty. diff --git a/src/analyzer/protocol/pia/CMakeLists.txt b/src/analyzer/protocol/pia/CMakeLists.txt index ff55bcf0aa..02397f7aff 100644 --- a/src/analyzer/protocol/pia/CMakeLists.txt +++ b/src/analyzer/protocol/pia/CMakeLists.txt @@ -5,5 +5,4 @@ include_directories(BEFORE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DI bro_plugin_begin(Bro PIA) bro_plugin_cc(PIA.cc Plugin.cc) -bro_plugin_bif(events.bif) bro_plugin_end() diff --git a/src/analyzer/protocol/pia/PIA.cc b/src/analyzer/protocol/pia/PIA.cc index 7d73624dd0..8f5e23a1ce 100644 --- a/src/analyzer/protocol/pia/PIA.cc +++ b/src/analyzer/protocol/pia/PIA.cc @@ -3,8 +3,6 @@ #include "analyzer/protocol/tcp/TCP_Flags.h" #include "analyzer/protocol/tcp/TCP_Reassembler.h" -#include "events.bif.h" - using namespace analyzer::pia; PIA::PIA(analyzer::Analyzer* arg_as_analyzer) diff --git a/src/analyzer/protocol/pia/events.bif b/src/analyzer/protocol/pia/events.bif deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/src/analyzer/protocol/zip/CMakeLists.txt b/src/analyzer/protocol/zip/CMakeLists.txt index 814119f9f7..40c64afd6e 100644 --- a/src/analyzer/protocol/zip/CMakeLists.txt +++ b/src/analyzer/protocol/zip/CMakeLists.txt @@ -5,5 +5,4 @@ include_directories(BEFORE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DI bro_plugin_begin(Bro ZIP) bro_plugin_cc(ZIP.cc Plugin.cc) -bro_plugin_bif(events.bif) bro_plugin_end() diff --git a/src/analyzer/protocol/zip/ZIP.cc b/src/analyzer/protocol/zip/ZIP.cc index d14df95673..d44c6353cd 100644 --- a/src/analyzer/protocol/zip/ZIP.cc +++ b/src/analyzer/protocol/zip/ZIP.cc @@ -2,8 +2,6 @@ #include "ZIP.h" -#include "events.bif.h" - using namespace analyzer::zip; ZIP_Analyzer::ZIP_Analyzer(Connection* conn, bool orig, Method arg_method) diff --git a/src/analyzer/protocol/zip/events.bif b/src/analyzer/protocol/zip/events.bif deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log index 6587112ef2..d53b14ce58 100644 --- a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log @@ -3,7 +3,7 @@ #empty_field (empty) #unset_field - #path loaded_scripts -#open 2016-11-02-17-25-26 +#open 2017-02-28-17-15-30 #fields name #types string scripts/base/init-bare.bro @@ -58,7 +58,6 @@ scripts/base/init-bare.bro build/scripts/base/bif/top-k.bif.bro build/scripts/base/bif/plugins/__load__.bro build/scripts/base/bif/plugins/Bro_ARP.events.bif.bro - build/scripts/base/bif/plugins/Bro_AYIYA.events.bif.bro build/scripts/base/bif/plugins/Bro_BackDoor.events.bif.bro build/scripts/base/bif/plugins/Bro_BitTorrent.events.bif.bro build/scripts/base/bif/plugins/Bro_ConnSize.events.bif.bro @@ -74,7 +73,6 @@ scripts/base/init-bare.bro build/scripts/base/bif/plugins/Bro_FTP.events.bif.bro build/scripts/base/bif/plugins/Bro_FTP.functions.bif.bro build/scripts/base/bif/plugins/Bro_Gnutella.events.bif.bro - build/scripts/base/bif/plugins/Bro_GSSAPI.types.bif.bro build/scripts/base/bif/plugins/Bro_GSSAPI.events.bif.bro build/scripts/base/bif/plugins/Bro_GTPv1.events.bif.bro build/scripts/base/bif/plugins/Bro_HTTP.events.bif.bro @@ -96,7 +94,6 @@ scripts/base/init-bare.bro build/scripts/base/bif/plugins/Bro_NTLM.types.bif.bro build/scripts/base/bif/plugins/Bro_NTLM.events.bif.bro build/scripts/base/bif/plugins/Bro_NTP.events.bif.bro - build/scripts/base/bif/plugins/Bro_PIA.events.bif.bro build/scripts/base/bif/plugins/Bro_POP3.events.bif.bro build/scripts/base/bif/plugins/Bro_RADIUS.events.bif.bro build/scripts/base/bif/plugins/Bro_RDP.events.bif.bro @@ -150,7 +147,6 @@ scripts/base/init-bare.bro build/scripts/base/bif/plugins/Bro_Teredo.events.bif.bro build/scripts/base/bif/plugins/Bro_UDP.events.bif.bro build/scripts/base/bif/plugins/Bro_XMPP.events.bif.bro - build/scripts/base/bif/plugins/Bro_ZIP.events.bif.bro build/scripts/base/bif/plugins/Bro_FileEntropy.events.bif.bro build/scripts/base/bif/plugins/Bro_FileExtract.events.bif.bro build/scripts/base/bif/plugins/Bro_FileExtract.functions.bif.bro @@ -171,4 +167,4 @@ scripts/base/init-bare.bro build/scripts/base/bif/plugins/Bro_SQLiteWriter.sqlite.bif.bro scripts/policy/misc/loaded-scripts.bro scripts/base/utils/paths.bro -#close 2016-11-02-17-25-26 +#close 2017-02-28-17-15-30 diff --git a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log index 7a7b127752..e11edefe16 100644 --- a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log @@ -3,7 +3,7 @@ #empty_field (empty) #unset_field - #path loaded_scripts -#open 2016-11-02-17-25-18 +#open 2017-02-28-17-19-41 #fields name #types string scripts/base/init-bare.bro @@ -58,7 +58,6 @@ scripts/base/init-bare.bro build/scripts/base/bif/top-k.bif.bro build/scripts/base/bif/plugins/__load__.bro build/scripts/base/bif/plugins/Bro_ARP.events.bif.bro - build/scripts/base/bif/plugins/Bro_AYIYA.events.bif.bro build/scripts/base/bif/plugins/Bro_BackDoor.events.bif.bro build/scripts/base/bif/plugins/Bro_BitTorrent.events.bif.bro build/scripts/base/bif/plugins/Bro_ConnSize.events.bif.bro @@ -74,7 +73,6 @@ scripts/base/init-bare.bro build/scripts/base/bif/plugins/Bro_FTP.events.bif.bro build/scripts/base/bif/plugins/Bro_FTP.functions.bif.bro build/scripts/base/bif/plugins/Bro_Gnutella.events.bif.bro - build/scripts/base/bif/plugins/Bro_GSSAPI.types.bif.bro build/scripts/base/bif/plugins/Bro_GSSAPI.events.bif.bro build/scripts/base/bif/plugins/Bro_GTPv1.events.bif.bro build/scripts/base/bif/plugins/Bro_HTTP.events.bif.bro @@ -96,7 +94,6 @@ scripts/base/init-bare.bro build/scripts/base/bif/plugins/Bro_NTLM.types.bif.bro build/scripts/base/bif/plugins/Bro_NTLM.events.bif.bro build/scripts/base/bif/plugins/Bro_NTP.events.bif.bro - build/scripts/base/bif/plugins/Bro_PIA.events.bif.bro build/scripts/base/bif/plugins/Bro_POP3.events.bif.bro build/scripts/base/bif/plugins/Bro_RADIUS.events.bif.bro build/scripts/base/bif/plugins/Bro_RDP.events.bif.bro @@ -150,7 +147,6 @@ scripts/base/init-bare.bro build/scripts/base/bif/plugins/Bro_Teredo.events.bif.bro build/scripts/base/bif/plugins/Bro_UDP.events.bif.bro build/scripts/base/bif/plugins/Bro_XMPP.events.bif.bro - build/scripts/base/bif/plugins/Bro_ZIP.events.bif.bro build/scripts/base/bif/plugins/Bro_FileEntropy.events.bif.bro build/scripts/base/bif/plugins/Bro_FileExtract.events.bif.bro build/scripts/base/bif/plugins/Bro_FileExtract.functions.bif.bro @@ -359,4 +355,4 @@ scripts/base/init-default.bro scripts/base/misc/find-filtered-trace.bro scripts/base/misc/version.bro scripts/policy/misc/loaded-scripts.bro -#close 2016-11-02-17-25-18 +#close 2017-02-28-17-19-41 diff --git a/testing/btest/Baseline/plugins.hooks/output b/testing/btest/Baseline/plugins.hooks/output index c291302748..420d20ae12 100644 --- a/testing/btest/Baseline/plugins.hooks/output +++ b/testing/btest/Baseline/plugins.hooks/output @@ -247,7 +247,7 @@ 0.000000 MetaHookPost CallFunction(Log::__create_stream, , (Weird::LOG, [columns=, ev=Weird::log_weird, path=weird])) -> 0.000000 MetaHookPost CallFunction(Log::__create_stream, , (X509::LOG, [columns=, ev=X509::log_x509, path=x509])) -> 0.000000 MetaHookPost CallFunction(Log::__create_stream, , (mysql::LOG, [columns=, ev=MySQL::log_mysql, path=mysql])) -> -0.000000 MetaHookPost CallFunction(Log::__write, , (PacketFilter::LOG, [ts=1487443758.386684, node=bro, filter=ip or not ip, init=T, success=T])) -> +0.000000 MetaHookPost CallFunction(Log::__write, , (PacketFilter::LOG, [ts=1488302456.440387, node=bro, filter=ip or not ip, init=T, success=T])) -> 0.000000 MetaHookPost CallFunction(Log::add_default_filter, , (Cluster::LOG)) -> 0.000000 MetaHookPost CallFunction(Log::add_default_filter, , (Communication::LOG)) -> 0.000000 MetaHookPost CallFunction(Log::add_default_filter, , (Conn::LOG)) -> @@ -377,7 +377,7 @@ 0.000000 MetaHookPost CallFunction(Log::create_stream, , (Weird::LOG, [columns=, ev=Weird::log_weird, path=weird])) -> 0.000000 MetaHookPost CallFunction(Log::create_stream, , (X509::LOG, [columns=, ev=X509::log_x509, path=x509])) -> 0.000000 MetaHookPost CallFunction(Log::create_stream, , (mysql::LOG, [columns=, ev=MySQL::log_mysql, path=mysql])) -> -0.000000 MetaHookPost CallFunction(Log::write, , (PacketFilter::LOG, [ts=1487443758.386684, node=bro, filter=ip or not ip, init=T, success=T])) -> +0.000000 MetaHookPost CallFunction(Log::write, , (PacketFilter::LOG, [ts=1488302456.440387, node=bro, filter=ip or not ip, init=T, success=T])) -> 0.000000 MetaHookPost CallFunction(NetControl::check_plugins, , ()) -> 0.000000 MetaHookPost CallFunction(NetControl::init, , ()) -> 0.000000 MetaHookPost CallFunction(Notice::want_pp, , ()) -> @@ -416,7 +416,6 @@ 0.000000 MetaHookPost LoadFile(../main) -> -1 0.000000 MetaHookPost LoadFile(../plugin) -> -1 0.000000 MetaHookPost LoadFile(./Bro_ARP.events.bif.bro) -> -1 -0.000000 MetaHookPost LoadFile(./Bro_AYIYA.events.bif.bro) -> -1 0.000000 MetaHookPost LoadFile(./Bro_AsciiReader.ascii.bif.bro) -> -1 0.000000 MetaHookPost LoadFile(./Bro_AsciiWriter.ascii.bif.bro) -> -1 0.000000 MetaHookPost LoadFile(./Bro_BackDoor.events.bif.bro) -> -1 @@ -440,7 +439,6 @@ 0.000000 MetaHookPost LoadFile(./Bro_FileHash.events.bif.bro) -> -1 0.000000 MetaHookPost LoadFile(./Bro_Finger.events.bif.bro) -> -1 0.000000 MetaHookPost LoadFile(./Bro_GSSAPI.events.bif.bro) -> -1 -0.000000 MetaHookPost LoadFile(./Bro_GSSAPI.types.bif.bro) -> -1 0.000000 MetaHookPost LoadFile(./Bro_GTPv1.events.bif.bro) -> -1 0.000000 MetaHookPost LoadFile(./Bro_Gnutella.events.bif.bro) -> -1 0.000000 MetaHookPost LoadFile(./Bro_HTTP.events.bif.bro) -> -1 @@ -465,7 +463,6 @@ 0.000000 MetaHookPost LoadFile(./Bro_NetBIOS.functions.bif.bro) -> -1 0.000000 MetaHookPost LoadFile(./Bro_NoneWriter.none.bif.bro) -> -1 0.000000 MetaHookPost LoadFile(./Bro_PE.events.bif.bro) -> -1 -0.000000 MetaHookPost LoadFile(./Bro_PIA.events.bif.bro) -> -1 0.000000 MetaHookPost LoadFile(./Bro_POP3.events.bif.bro) -> -1 0.000000 MetaHookPost LoadFile(./Bro_RADIUS.events.bif.bro) -> -1 0.000000 MetaHookPost LoadFile(./Bro_RDP.events.bif.bro) -> -1 @@ -528,7 +525,6 @@ 0.000000 MetaHookPost LoadFile(./Bro_X509.functions.bif.bro) -> -1 0.000000 MetaHookPost LoadFile(./Bro_X509.types.bif.bro) -> -1 0.000000 MetaHookPost LoadFile(./Bro_XMPP.events.bif.bro) -> -1 -0.000000 MetaHookPost LoadFile(./Bro_ZIP.events.bif.bro) -> -1 0.000000 MetaHookPost LoadFile(./acld) -> -1 0.000000 MetaHookPost LoadFile(./addrs) -> -1 0.000000 MetaHookPost LoadFile(./analyzer.bif.bro) -> -1 @@ -968,7 +964,7 @@ 0.000000 MetaHookPre CallFunction(Log::__create_stream, , (Weird::LOG, [columns=, ev=Weird::log_weird, path=weird])) 0.000000 MetaHookPre CallFunction(Log::__create_stream, , (X509::LOG, [columns=, ev=X509::log_x509, path=x509])) 0.000000 MetaHookPre CallFunction(Log::__create_stream, , (mysql::LOG, [columns=, ev=MySQL::log_mysql, path=mysql])) -0.000000 MetaHookPre CallFunction(Log::__write, , (PacketFilter::LOG, [ts=1487443758.386684, node=bro, filter=ip or not ip, init=T, success=T])) +0.000000 MetaHookPre CallFunction(Log::__write, , (PacketFilter::LOG, [ts=1488302456.440387, node=bro, filter=ip or not ip, init=T, success=T])) 0.000000 MetaHookPre CallFunction(Log::add_default_filter, , (Cluster::LOG)) 0.000000 MetaHookPre CallFunction(Log::add_default_filter, , (Communication::LOG)) 0.000000 MetaHookPre CallFunction(Log::add_default_filter, , (Conn::LOG)) @@ -1098,7 +1094,7 @@ 0.000000 MetaHookPre CallFunction(Log::create_stream, , (Weird::LOG, [columns=, ev=Weird::log_weird, path=weird])) 0.000000 MetaHookPre CallFunction(Log::create_stream, , (X509::LOG, [columns=, ev=X509::log_x509, path=x509])) 0.000000 MetaHookPre CallFunction(Log::create_stream, , (mysql::LOG, [columns=, ev=MySQL::log_mysql, path=mysql])) -0.000000 MetaHookPre CallFunction(Log::write, , (PacketFilter::LOG, [ts=1487443758.386684, node=bro, filter=ip or not ip, init=T, success=T])) +0.000000 MetaHookPre CallFunction(Log::write, , (PacketFilter::LOG, [ts=1488302456.440387, node=bro, filter=ip or not ip, init=T, success=T])) 0.000000 MetaHookPre CallFunction(NetControl::check_plugins, , ()) 0.000000 MetaHookPre CallFunction(NetControl::init, , ()) 0.000000 MetaHookPre CallFunction(Notice::want_pp, , ()) @@ -1137,7 +1133,6 @@ 0.000000 MetaHookPre LoadFile(../main) 0.000000 MetaHookPre LoadFile(../plugin) 0.000000 MetaHookPre LoadFile(./Bro_ARP.events.bif.bro) -0.000000 MetaHookPre LoadFile(./Bro_AYIYA.events.bif.bro) 0.000000 MetaHookPre LoadFile(./Bro_AsciiReader.ascii.bif.bro) 0.000000 MetaHookPre LoadFile(./Bro_AsciiWriter.ascii.bif.bro) 0.000000 MetaHookPre LoadFile(./Bro_BackDoor.events.bif.bro) @@ -1161,7 +1156,6 @@ 0.000000 MetaHookPre LoadFile(./Bro_FileHash.events.bif.bro) 0.000000 MetaHookPre LoadFile(./Bro_Finger.events.bif.bro) 0.000000 MetaHookPre LoadFile(./Bro_GSSAPI.events.bif.bro) -0.000000 MetaHookPre LoadFile(./Bro_GSSAPI.types.bif.bro) 0.000000 MetaHookPre LoadFile(./Bro_GTPv1.events.bif.bro) 0.000000 MetaHookPre LoadFile(./Bro_Gnutella.events.bif.bro) 0.000000 MetaHookPre LoadFile(./Bro_HTTP.events.bif.bro) @@ -1186,7 +1180,6 @@ 0.000000 MetaHookPre LoadFile(./Bro_NetBIOS.functions.bif.bro) 0.000000 MetaHookPre LoadFile(./Bro_NoneWriter.none.bif.bro) 0.000000 MetaHookPre LoadFile(./Bro_PE.events.bif.bro) -0.000000 MetaHookPre LoadFile(./Bro_PIA.events.bif.bro) 0.000000 MetaHookPre LoadFile(./Bro_POP3.events.bif.bro) 0.000000 MetaHookPre LoadFile(./Bro_RADIUS.events.bif.bro) 0.000000 MetaHookPre LoadFile(./Bro_RDP.events.bif.bro) @@ -1249,7 +1242,6 @@ 0.000000 MetaHookPre LoadFile(./Bro_X509.functions.bif.bro) 0.000000 MetaHookPre LoadFile(./Bro_X509.types.bif.bro) 0.000000 MetaHookPre LoadFile(./Bro_XMPP.events.bif.bro) -0.000000 MetaHookPre LoadFile(./Bro_ZIP.events.bif.bro) 0.000000 MetaHookPre LoadFile(./acld) 0.000000 MetaHookPre LoadFile(./addrs) 0.000000 MetaHookPre LoadFile(./analyzer.bif.bro) @@ -1688,7 +1680,7 @@ 0.000000 | HookCallFunction Log::__create_stream(Weird::LOG, [columns=, ev=Weird::log_weird, path=weird]) 0.000000 | HookCallFunction Log::__create_stream(X509::LOG, [columns=, ev=X509::log_x509, path=x509]) 0.000000 | HookCallFunction Log::__create_stream(mysql::LOG, [columns=, ev=MySQL::log_mysql, path=mysql]) -0.000000 | HookCallFunction Log::__write(PacketFilter::LOG, [ts=1487443758.386684, node=bro, filter=ip or not ip, init=T, success=T]) +0.000000 | HookCallFunction Log::__write(PacketFilter::LOG, [ts=1488302456.440387, node=bro, filter=ip or not ip, init=T, success=T]) 0.000000 | HookCallFunction Log::add_default_filter(Cluster::LOG) 0.000000 | HookCallFunction Log::add_default_filter(Communication::LOG) 0.000000 | HookCallFunction Log::add_default_filter(Conn::LOG) @@ -1818,7 +1810,7 @@ 0.000000 | HookCallFunction Log::create_stream(Weird::LOG, [columns=, ev=Weird::log_weird, path=weird]) 0.000000 | HookCallFunction Log::create_stream(X509::LOG, [columns=, ev=X509::log_x509, path=x509]) 0.000000 | HookCallFunction Log::create_stream(mysql::LOG, [columns=, ev=MySQL::log_mysql, path=mysql]) -0.000000 | HookCallFunction Log::write(PacketFilter::LOG, [ts=1487443758.386684, node=bro, filter=ip or not ip, init=T, success=T]) +0.000000 | HookCallFunction Log::write(PacketFilter::LOG, [ts=1488302456.440387, node=bro, filter=ip or not ip, init=T, success=T]) 0.000000 | HookCallFunction NetControl::check_plugins() 0.000000 | HookCallFunction NetControl::init() 0.000000 | HookCallFunction Notice::want_pp() From 9341ff801c2bc0445553305a67d5c78442c0dbf3 Mon Sep 17 00:00:00 2001 From: Johanna Amann Date: Thu, 2 Mar 2017 08:53:38 -0800 Subject: [PATCH 13/16] Move threading to c++11 primitives (mostly). This moves all threading code in Bro from pthreads to the c++11 primitives, which make for shorter, easier to use, and less error-prone code. pthreads is still used in 2 places in Bro currently. BasicThread uses two bits of functionality that are not available using the c++ API (setting thread names & setting signal masks). Since all c++ implementations that I am aware of still use an underlying pthreads implementation, we just use native_handle to access the underlying pthreads implementation for these cases. I do not expect this to lead to problems in the forseable future. If we ever encounter a platform where a different thread architecture is used, we might have to change that around. This code is guarded by static_asserts, so we will notice if a platform uses a different implementation. sqlite also uses pthreads directly. --- src/input/readers/raw/Plugin.cc | 12 +--- src/input/readers/raw/Plugin.h | 8 +-- src/input/readers/raw/Raw.cc | 30 +++------ src/input/readers/raw/Raw.h | 6 +- src/threading/BasicThread.cc | 21 ++++--- src/threading/BasicThread.h | 10 +-- src/threading/MsgThread.h | 2 - src/threading/Queue.h | 104 +++++++++++--------------------- 8 files changed, 70 insertions(+), 123 deletions(-) diff --git a/src/input/readers/raw/Plugin.cc b/src/input/readers/raw/Plugin.cc index c7af84e34e..e16a233fe6 100644 --- a/src/input/readers/raw/Plugin.cc +++ b/src/input/readers/raw/Plugin.cc @@ -8,7 +8,6 @@ using namespace plugin::Bro_RawReader; Plugin::Plugin() { - init = false; } plugin::Configuration Plugin::Configure() @@ -23,21 +22,14 @@ plugin::Configuration Plugin::Configure() void Plugin::InitPreScript() { - if ( pthread_mutex_init(&fork_mutex, 0) != 0 ) - reporter->FatalError("cannot initialize raw reader's mutex"); - - init = true; } void Plugin::Done() { - pthread_mutex_destroy(&fork_mutex); - init = false; } -pthread_mutex_t* Plugin::ForkMutex() +std::unique_lock Plugin::ForkMutex() { - assert(init); - return &fork_mutex; + return std::unique_lock(fork_mutex, std::defer_lock); } diff --git a/src/input/readers/raw/Plugin.h b/src/input/readers/raw/Plugin.h index 59a5dfd2be..7b621c9440 100644 --- a/src/input/readers/raw/Plugin.h +++ b/src/input/readers/raw/Plugin.h @@ -1,8 +1,9 @@ -// See the file in the main distribution directory for copyright. +// See the file in the main distribution directory for copyright. #include "plugin/Plugin.h" #include "Raw.h" +#include namespace plugin { namespace Bro_RawReader { @@ -16,11 +17,10 @@ public: virtual void InitPreScript(); virtual void Done(); - pthread_mutex_t * ForkMutex(); + std::unique_lock ForkMutex(); private: - bool init; - pthread_mutex_t fork_mutex; + std::mutex fork_mutex; }; diff --git a/src/input/readers/raw/Raw.cc b/src/input/readers/raw/Raw.cc index cfa7b72602..c892c207cc 100644 --- a/src/input/readers/raw/Raw.cc +++ b/src/input/readers/raw/Raw.cc @@ -96,24 +96,16 @@ bool Raw::SetFDFlags(int fd, int cmd, int flags) } -bool Raw::LockForkMutex() +std::unique_lock Raw::AcquireForkMutex() { - int res = pthread_mutex_lock(plugin::Bro_RawReader::plugin.ForkMutex()); - if ( res == 0 ) - return true; - - Error(Fmt("cannot lock fork mutex: %d", res)); - return false; + auto lock = plugin::Bro_RawReader::plugin.ForkMutex(); + try { + lock.lock(); + } catch(const std::system_error& e) { + reporter->FatalErrorWithCore("cannot lock fork mutex: %s", e.what()); } -bool Raw::UnlockForkMutex() - { - int res = pthread_mutex_unlock(plugin::Bro_RawReader::plugin.ForkMutex()); - if ( res == 0 ) - return true; - - Error(Fmt("cannot unlock fork mutex: %d", res)); - return false; + return lock; } bool Raw::Execute() @@ -126,12 +118,10 @@ bool Raw::Execute() // never crops up... ("never" meaning I haven't seen in it in // hundreds of tests using 50+ threads where before I'd see the issue // w/ just 2 threads ~33% of the time). - if ( ! LockForkMutex() ) - return false; + auto lock = AcquireForkMutex(); if ( pipe(pipes) != 0 || pipe(pipes+2) || pipe(pipes+4) ) { - UnlockForkMutex(); Error(Fmt("Could not open pipe: %d", errno)); return false; } @@ -139,7 +129,6 @@ bool Raw::Execute() childpid = fork(); if ( childpid < 0 ) { - UnlockForkMutex(); Error(Fmt("Could not create child process: %d", errno)); return false; } @@ -208,8 +197,7 @@ bool Raw::Execute() } } - if ( ! UnlockForkMutex() ) - return false; + lock.unlock(); ClosePipeEnd(stdout_out); diff --git a/src/input/readers/raw/Raw.h b/src/input/readers/raw/Raw.h index 2a166ae322..ec50ade8fe 100644 --- a/src/input/readers/raw/Raw.h +++ b/src/input/readers/raw/Raw.h @@ -4,8 +4,8 @@ #define INPUT_READERS_RAW_H #include -#include #include +#include #include "input/ReaderBackend.h" @@ -37,8 +37,7 @@ protected: private: void ClosePipeEnd(int i); bool SetFDFlags(int fd, int cmd, int flags); - bool LockForkMutex(); - bool UnlockForkMutex(); + std::unique_lock AcquireForkMutex(); bool OpenInput(); bool CloseInput(); @@ -87,7 +86,6 @@ private: }; static const int block_size; - static pthread_mutex_t fork_mutex; }; } diff --git a/src/threading/BasicThread.cc b/src/threading/BasicThread.cc index 86d7d7b560..ea6c8c433b 100644 --- a/src/threading/BasicThread.cc +++ b/src/threading/BasicThread.cc @@ -5,6 +5,7 @@ #include "bro-config.h" #include "BasicThread.h" #include "Manager.h" +#include "pthread.h" #ifdef HAVE_LINUX #include @@ -21,7 +22,6 @@ BasicThread::BasicThread() started = false; terminating = false; killed = false; - pthread = 0; buf_len = STD_FMT_BUF_LEN; buf = (char*) safe_malloc(buf_len); @@ -50,6 +50,7 @@ void BasicThread::SetName(const char* arg_name) void BasicThread::SetOSName(const char* arg_name) { + static_assert(std::is_same::value, "libstdc++ doesn't use pthread_t"); #ifdef HAVE_LINUX prctl(PR_SET_NAME, arg_name, 0, 0, 0); @@ -60,7 +61,7 @@ void BasicThread::SetOSName(const char* arg_name) #endif #ifdef FREEBSD - pthread_set_name_np(pthread_self(), arg_name, arg_name); + pthread_set_name_np(thread.native_handle(), arg_name, arg_name); #endif } @@ -108,9 +109,7 @@ void BasicThread::Start() started = true; - int err = pthread_create(&pthread, 0, BasicThread::launcher, this); - if ( err != 0 ) - reporter->FatalError("Cannot create thread %s: %s", name, Strerror(err)); + thread = std::thread(&BasicThread::launcher, this); DBG_LOG(DBG_THREADING, "Started thread %s", name); @@ -147,17 +146,18 @@ void BasicThread::Join() if ( ! started ) return; - if ( ! pthread ) + if ( ! thread.joinable() ) return; assert(terminating); - if ( pthread_join(pthread, 0) != 0 ) - reporter->FatalError("Failure joining thread %s", name); + try { + thread.join(); + } catch(const std::system_error& e) { + reporter->FatalError("Failure joining thread %s with error %s", name, e.what()); + } DBG_LOG(DBG_THREADING, "Joined with thread %s", name); - - pthread = 0; } void BasicThread::Kill() @@ -180,6 +180,7 @@ void BasicThread::Done() void* BasicThread::launcher(void *arg) { + static_assert(std::is_same::value, "libstdc++ doesn't use pthread_t"); BasicThread* thread = (BasicThread *)arg; // Block signals in thread. We handle signals only in the main diff --git a/src/threading/BasicThread.h b/src/threading/BasicThread.h index 6386e5ae66..ea829fce54 100644 --- a/src/threading/BasicThread.h +++ b/src/threading/BasicThread.h @@ -2,8 +2,7 @@ #ifndef THREADING_BASICTHREAD_H #define THREADING_BASICTHREAD_H -#include -#include +#include #include "util.h" @@ -35,6 +34,9 @@ public: */ BasicThread(); + BasicThread(BasicThread const&) = delete; + BasicThread& operator =(BasicThread const&) = delete; + /** * Returns a descriptive name for the thread. If not set via * SetName(). If not set, a default name is choosen automatically. @@ -192,11 +194,11 @@ protected: void Done(); private: - // pthread entry function. + // thread entry function. static void* launcher(void *arg); const char* name; - pthread_t pthread; + std::thread thread; bool started; // Set to to true once running. bool terminating; // Set to to true to signal termination. bool killed; // Set to true once forcefully killed. diff --git a/src/threading/MsgThread.h b/src/threading/MsgThread.h index 96da68e1d0..480ab2974c 100644 --- a/src/threading/MsgThread.h +++ b/src/threading/MsgThread.h @@ -2,8 +2,6 @@ #ifndef THREADING_MSGTHREAD_H #define THREADING_MSGTHREAD_H -#include - #include "DebugLogger.h" #include "BasicThread.h" diff --git a/src/threading/Queue.h b/src/threading/Queue.h index 6d21bfd998..48b41ad81a 100644 --- a/src/threading/Queue.h +++ b/src/threading/Queue.h @@ -1,7 +1,8 @@ #ifndef THREADING_QUEUE_H #define THREADING_QUEUE_H -#include +#include +#include #include #include #include @@ -22,7 +23,7 @@ namespace threading { * * All Queue instances must be instantiated by Bro's main thread. * - * TODO: Unclear how critical performance is for this qeueue. We could like;y + * TODO: Unclear how critical performance is for this qeueue. We could likely * optimize it further if helpful. */ template @@ -71,9 +72,10 @@ public: */ bool MaybeReady() { return (num_reads != num_writes); } - /** Wake up the reader if it's currently blocked for input. This is - primarily to give it a chance to check termination quickly. - **/ + /** + * Wake up the reader if it's currently blocked for input. This is + * primarily to give it a chance to check termination quickly. + */ void WakeUp(); /** @@ -94,14 +96,15 @@ public: * Returns statistics about the queue's usage. * * @param stats A pointer to a structure that will be filled with - * current numbers. */ + * current numbers. + */ void GetStats(Stats* stats); private: static const int NUM_QUEUES = 8; - pthread_mutex_t mutex[NUM_QUEUES]; // Mutex protected shared accesses. - pthread_cond_t has_data[NUM_QUEUES]; // Signals when data becomes available + std::mutex mutex[NUM_QUEUES]; // Mutex protected shared accesses. + std::condition_variable has_data[NUM_QUEUES]; // Signals when data becomes available std::queue messages[NUM_QUEUES]; // Actually holds the queued messages int read_ptr; // Where the next operation will read from @@ -115,17 +118,17 @@ private: uint64_t num_writes; }; -inline static void safe_lock(pthread_mutex_t* mutex) +inline static std::unique_lock safe_lock(std::mutex& m) { - int res = pthread_mutex_lock(mutex); - if ( res != 0 ) - reporter->FatalErrorWithCore("cannot lock mutex: %d(%s)", res, strerror(res)); + std::unique_lock lock(m, std::defer_lock); + + try { + lock.lock(); + } catch(const std::system_error& e) { + reporter->FatalErrorWithCore("cannot lock mutex: %s", e.what()); } -inline static void safe_unlock(pthread_mutex_t* mutex) - { - if ( pthread_mutex_unlock(mutex) != 0 ) - reporter->FatalErrorWithCore("cannot unlock mutex"); + return lock; } template @@ -136,50 +139,28 @@ inline Queue::Queue(BasicThread* arg_reader, BasicThread* arg_writer) num_reads = num_writes = 0; reader = arg_reader; writer = arg_writer; - - for( int i = 0; i < NUM_QUEUES; ++i ) - { - if ( pthread_cond_init(&has_data[i], 0) != 0 ) - reporter->FatalError("cannot init queue condition variable"); - - if ( pthread_mutex_init(&mutex[i], 0) != 0 ) - reporter->FatalError("cannot init queue mutex"); - } } template inline Queue::~Queue() { - for( int i = 0; i < NUM_QUEUES; ++i ) - { - pthread_cond_destroy(&has_data[i]); - pthread_mutex_destroy(&mutex[i]); - } } template inline T Queue::Get() { - safe_lock(&mutex[read_ptr]); + auto lock = safe_lock(mutex[read_ptr]); int old_read_ptr = read_ptr; if ( messages[read_ptr].empty() && ! ((reader && reader->Killed()) || (writer && writer->Killed())) ) { - struct timespec ts; - ts.tv_sec = time(0) + 5; - ts.tv_nsec = 0; - - pthread_cond_timedwait(&has_data[read_ptr], &mutex[read_ptr], &ts); - safe_unlock(&mutex[read_ptr]); - return 0; + if ( has_data[read_ptr].wait_for(lock, std::chrono::seconds(5)) == std::cv_status::timeout ) + return nullptr; } - else if ( messages[read_ptr].empty() ) - { - safe_unlock(&mutex[read_ptr]); - return 0; - } + if ( messages[read_ptr].empty() ) + return nullptr; T data = messages[read_ptr].front(); messages[read_ptr].pop(); @@ -187,15 +168,13 @@ inline T Queue::Get() read_ptr = (read_ptr + 1) % NUM_QUEUES; ++num_reads; - safe_unlock(&mutex[old_read_ptr]); - return data; } template inline void Queue::Put(T data) { - safe_lock(&mutex[write_ptr]); + auto lock = safe_lock(mutex[write_ptr]); int old_write_ptr = write_ptr; @@ -203,25 +182,24 @@ inline void Queue::Put(T data) messages[write_ptr].push(data); - if ( need_signal ) - pthread_cond_signal(&has_data[write_ptr]); - write_ptr = (write_ptr + 1) % NUM_QUEUES; ++num_writes; - safe_unlock(&mutex[old_write_ptr]); + if ( need_signal ) + { + lock.unlock(); + has_data[old_write_ptr].notify_one(); + } } template inline bool Queue::Ready() { - safe_lock(&mutex[read_ptr]); + auto lock = safe_lock(mutex[read_ptr]); bool ret = (messages[read_ptr].size()); - safe_unlock(&mutex[read_ptr]); - return ret; } @@ -229,17 +207,15 @@ template inline uint64_t Queue::Size() { // Need to lock all queues. + std::vector> locks; for ( int i = 0; i < NUM_QUEUES; i++ ) - safe_lock(&mutex[i]); + locks.push_back(safe_lock(mutex[i])); uint64_t size = 0; for ( int i = 0; i < NUM_QUEUES; i++ ) size += messages[i].size(); - for ( int i = 0; i < NUM_QUEUES; i++ ) - safe_unlock(&mutex[i]); - return size; } @@ -248,29 +224,21 @@ inline void Queue::GetStats(Stats* stats) { // To be safe, we look all queues. That's probably unneccessary, but // doesn't really hurt. + std::vector> locks; for ( int i = 0; i < NUM_QUEUES; i++ ) - safe_lock(&mutex[i]); + locks.push_back(safe_lock(mutex[i])); stats->num_reads = num_reads; stats->num_writes = num_writes; - - for ( int i = 0; i < NUM_QUEUES; i++ ) - safe_unlock(&mutex[i]); } template inline void Queue::WakeUp() { for ( int i = 0; i < NUM_QUEUES; i++ ) - { - safe_lock(&mutex[i]); - pthread_cond_signal(&has_data[i]); - safe_unlock(&mutex[i]); - } + has_data[i].notify_all(); } } - #endif - From 766bab077144e9341f4f29bdedebdf621ccee584 Mon Sep 17 00:00:00 2001 From: Robin Sommer Date: Thu, 2 Mar 2017 16:45:07 -0800 Subject: [PATCH 14/16] Updating submodule. --- aux/btest | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aux/btest b/aux/btest index 2fb0e089b3..14c18508bb 160000 --- a/aux/btest +++ b/aux/btest @@ -1 +1 @@ -Subproject commit 2fb0e089b37cc62b4616c8d4309ee80fb7fc6a27 +Subproject commit 14c18508bb385c8f55cd79738ee87ca2b85eba27 From f616903e5ffc1b28142bb6f4e94424993d772801 Mon Sep 17 00:00:00 2001 From: Robin Sommer Date: Fri, 3 Mar 2017 10:44:14 -0800 Subject: [PATCH 15/16] Updating submodule(s). [nomail] --- CHANGES | 2 +- VERSION | 2 +- aux/plugins | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGES b/CHANGES index 9c58d9338f..862174c068 100644 --- a/CHANGES +++ b/CHANGES @@ -1,5 +1,5 @@ -2.5-91 | 2017-03-03 10:30:12 -0800 +2.5-92 | 2017-03-03 10:44:14 -0800 * Move most threading to C++11 primitives (mostly). (Johanna Amann) diff --git a/VERSION b/VERSION index 2cca69f0f3..540a520883 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.5-91 +2.5-92 diff --git a/aux/plugins b/aux/plugins index 2322840bcd..c4b5df3aa8 160000 --- a/aux/plugins +++ b/aux/plugins @@ -1 +1 @@ -Subproject commit 2322840bcdbd618ae7bd24e22d874fb30ab89bbb +Subproject commit c4b5df3aa8e5c58a2dc5e5040c7da8369894f24d From dc2cfd8a1071ec3b30bd130d859758b3353f4b11 Mon Sep 17 00:00:00 2001 From: Robin Sommer Date: Fri, 3 Mar 2017 12:51:54 -0800 Subject: [PATCH 16/16] Updating submodule(s). [nomail] --- aux/btest | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aux/btest b/aux/btest index 14c18508bb..dceda16935 160000 --- a/aux/btest +++ b/aux/btest @@ -1 +1 @@ -Subproject commit 14c18508bb385c8f55cd79738ee87ca2b85eba27 +Subproject commit dceda169351ddd0c7fe7a5ae5496be1d7af2367b