Merge remote-tracking branch 'origin/master' into topic/seth/input-thread-behavior

This commit is contained in:
Seth Hall 2017-03-09 12:18:41 -05:00
commit ffdd684eaa
53 changed files with 1355 additions and 341 deletions

24
CHANGES
View file

@ -1,4 +1,28 @@
2.5-92 | 2017-03-03 10:44:14 -0800
* Move most threading to C++11 primitives (mostly). (Johanna Amann)
* Fix a test that sometimes fails on FreeBSD. (Daniel Thayer)
* Remove build time warnings. (Seth Hall)
2.5-84 | 2017-02-27 15:08:55 -0500
* Change semantics of Broker's remote logging to match old communication
framework. (Robin Sommer)
* Add and fix documentation for HookSetupAnalyzerTree (Johanna Amann)
2.5-76 | 2017-02-23 10:19:57 -0800
* Kerberos ciphertext had some additional ASN.1 content being lumped
in. (Vlad Grigorescu)
* Updated Windows version detection to include Windows 10. (Fatema
Bannatwala, Keith Lehigh, Mike, Seth Hall).
2.5-70 | 2017-02-20 00:20:02 -0500
* Rework the RADIUS base script.

View file

@ -1 +1 @@
2.5-70
2.5-92

@ -1 +1 @@
Subproject commit b1e75f6a212250b1730a438f27fc778618b67ec3
Subproject commit 9f33570d53e1b970d7905e305940fd55637c5c76

@ -1 +1 @@
Subproject commit 9d5c7bcac9b04710931bc8a42b545f0691561b2f
Subproject commit dceda169351ddd0c7fe7a5ae5496be1d7af2367b

@ -1 +1 @@
Subproject commit 2322840bcdbd618ae7bd24e22d874fb30ab89bbb
Subproject commit c4b5df3aa8e5c58a2dc5e5040c7da8369894f24d

View file

@ -2730,8 +2730,7 @@ bool RemoteSerializer::ProcessLogCreateWriter()
id_val = new EnumVal(id, internal_type("Log::ID")->AsEnumType());
writer_val = new EnumVal(writer, internal_type("Log::Writer")->AsEnumType());
if ( ! log_mgr->CreateWriter(id_val, writer_val, info, num_fields, fields,
true, false, true) )
if ( ! log_mgr->CreateWriterForRemoteLog(id_val, writer_val, info, num_fields, fields) )
{
delete_fields_up_to = num_fields;
goto error;
@ -2803,7 +2802,7 @@ bool RemoteSerializer::ProcessLogWrite()
id_val = new EnumVal(id, internal_type("Log::ID")->AsEnumType());
writer_val = new EnumVal(writer, internal_type("Log::Writer")->AsEnumType());
success = log_mgr->Write(id_val, writer_val, path, num_fields, vals);
success = log_mgr->WriteFromRemote(id_val, writer_val, path, num_fields, vals);
Unref(id_val);
Unref(writer_val);

View file

@ -1,7 +1,6 @@
#include "AYIYA.h"
#include "Func.h"
#include "events.bif.h"
using namespace analyzer::ayiya;

View file

@ -5,6 +5,5 @@ include_directories(BEFORE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DI
bro_plugin_begin(Bro AYIYA)
bro_plugin_cc(AYIYA.cc Plugin.cc)
bro_plugin_bif(events.bif)
bro_plugin_pac(ayiya.pac ayiya-protocol.pac ayiya-analyzer.pac)
bro_plugin_end()

View file

@ -2,10 +2,6 @@
%include binpac.pac
%include bro.pac
%extern{
#include "events.bif.h"
%}
analyzer AYIYA withcontext {
connection: AYIYA_Conn;
flow: AYIYA_Flow;

View file

@ -5,7 +5,7 @@ include_directories(BEFORE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DI
bro_plugin_begin(Bro GSSAPI)
bro_plugin_cc(GSSAPI.cc Plugin.cc)
bro_plugin_bif(types.bif events.bif)
bro_plugin_bif(events.bif)
bro_plugin_pac(
gssapi.pac
gssapi-protocol.pac

View file

@ -5,7 +5,6 @@
#include "analyzer/Manager.h"
#include "analyzer/Analyzer.h"
#include "types.bif.h"
#include "events.bif.h"
%}

View file

@ -1 +0,0 @@
# Empty.

View file

@ -95,7 +95,7 @@ RecordVal* proc_ticket(const KRB_Ticket* ticket)
rv->Assign(1, bytestring_to_val(ticket->realm()->data()->content()));
rv->Assign(2, GetStringFromPrincipalName(ticket->sname()));
rv->Assign(3, asn1_integer_to_val(ticket->enc_part()->data()->etype()->data(), TYPE_COUNT));
rv->Assign(4, bytestring_to_val(ticket->enc_part()->data()->ciphertext()));
rv->Assign(4, bytestring_to_val(ticket->enc_part()->data()->ciphertext()->encoding()->content()));
return rv;
}
@ -162,7 +162,7 @@ type KRB_Encrypted_Data = record {
true -> next_meta: ASN1EncodingMeta;
false -> none_meta: empty;
};
ciphertext : bytestring &length=have_kvno ? next_meta.length : kvno_meta.length;
ciphertext : ASN1OctetString &length=have_kvno ? next_meta.length : kvno_meta.length;
} &let {
have_kvno : bool = kvno_meta.index == 1;
};

View file

@ -5,5 +5,4 @@ include_directories(BEFORE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DI
bro_plugin_begin(Bro PIA)
bro_plugin_cc(PIA.cc Plugin.cc)
bro_plugin_bif(events.bif)
bro_plugin_end()

View file

@ -3,8 +3,6 @@
#include "analyzer/protocol/tcp/TCP_Flags.h"
#include "analyzer/protocol/tcp/TCP_Reassembler.h"
#include "events.bif.h"
using namespace analyzer::pia;
PIA::PIA(analyzer::Analyzer* arg_as_analyzer)

View file

@ -5,5 +5,4 @@ include_directories(BEFORE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DI
bro_plugin_begin(Bro ZIP)
bro_plugin_cc(ZIP.cc Plugin.cc)
bro_plugin_bif(events.bif)
bro_plugin_end()

View file

@ -2,8 +2,6 @@
#include "ZIP.h"
#include "events.bif.h"
using namespace analyzer::zip;
ZIP_Analyzer::ZIP_Analyzer(Connection* conn, bool orig, Method arg_method)

View file

@ -709,3 +709,435 @@ bool bro_broker::DataVal::DoUnserialize(UnserialInfo* info)
delete [] serial;
return true;
}
static broker::util::optional<broker::data> threading_val_to_data_internal(TypeTag type, const threading::Value::_val& val)
{
switch ( type ) {
case TYPE_BOOL:
return {val.int_val != 0};
case TYPE_INT:
return {val.int_val};
case TYPE_COUNT:
case TYPE_COUNTER:
return {val.uint_val};
case TYPE_PORT:
return {broker::port(val.port_val.port, to_broker_port_proto(val.port_val.proto))};
case TYPE_ADDR:
{
IPAddr a;
switch ( val.addr_val.family ) {
case IPv4:
a = IPAddr(val.addr_val.in.in4);
break;
case IPv6:
a = IPAddr(val.addr_val.in.in6);
break;
default:
reporter->InternalError("unsupported protocol family in threading_val_to_data");
}
in6_addr tmp;
a.CopyIPv6(&tmp);
return {broker::address(reinterpret_cast<const uint32_t*>(&tmp),
broker::address::family::ipv6,
broker::address::byte_order::network)};
}
case TYPE_SUBNET:
{
IPAddr a;
int length;
switch ( val.subnet_val.prefix.family ) {
case IPv4:
a = IPAddr(val.subnet_val.prefix.in.in4);
length = (val.subnet_val.length - 96);
break;
case IPv6:
a = IPAddr(val.subnet_val.prefix.in.in6);
length = val.subnet_val.length;
break;
default:
reporter->InternalError("unsupported protocol family in threading_val_to_data");
}
in6_addr tmp;
a.CopyIPv6(&tmp);
auto s = broker::address(reinterpret_cast<const uint32_t*>(&tmp),
broker::address::family::ipv6,
broker::address::byte_order::network);
return {broker::subnet(s, length)};
}
case TYPE_DOUBLE:
return {val.double_val};
case TYPE_TIME:
return {broker::time_point(val.double_val)};
case TYPE_INTERVAL:
return {broker::time_duration(val.double_val)};
case TYPE_ENUM:
return {broker::enum_value(std::string(val.string_val.data, val.string_val.length))};
case TYPE_STRING:
case TYPE_FILE:
case TYPE_FUNC:
return {std::string(val.string_val.data, val.string_val.length)};
case TYPE_TABLE:
{
auto s = broker::set();
for ( int i = 0; i < val.set_val.size; ++i )
{
auto c = bro_broker::threading_val_to_data(val.set_val.vals[i]);
if ( ! c )
return {};
s.emplace(*c);
}
return {move(s)};
}
case TYPE_VECTOR:
{
auto s = broker::vector();
for ( int i = 0; i < val.vector_val.size; ++i )
{
auto c = bro_broker::threading_val_to_data(val.vector_val.vals[i]);
if ( ! c )
return {};
s.emplace_back(*c);
}
return {move(s)};
}
default:
reporter->InternalError("unsupported type %s in threading_val_to_data",
type_name(type));
}
return {};
}
broker::util::optional<broker::data> bro_broker::threading_val_to_data(const threading::Value* v)
{
broker::util::optional<broker::data> d;
if ( v->present )
{
d = threading_val_to_data_internal(v->type, v->val);
if ( ! d )
return {};
}
auto type = broker::record::field(static_cast<uint64_t>(v->type));
auto present = broker::record::field(v->present);
auto data = (v->present) ? broker::record::field(*d) : broker::util::optional<broker::data>();
return {broker::record({move(type), move(present), move(data)})};
};
struct threading_val_converter {
using result_type = bool;
TypeTag type;
threading::Value::_val& val;
result_type operator()(bool a)
{
if ( type == TYPE_BOOL )
{
val.int_val = (a ? 1 : 0);
return true;
}
return false;
}
result_type operator()(uint64_t a)
{
if ( type == TYPE_COUNT || type == TYPE_COUNTER )
{
val.uint_val = a;
return true;
}
return false;
}
result_type operator()(int64_t a)
{
if ( type == TYPE_INT )
{
val.int_val = a;
return true;
}
return false;
}
result_type operator()(double a)
{
if ( type == TYPE_DOUBLE )
{
val.double_val = a;
return true;
}
return false;
}
result_type operator()(const std::string& a)
{
if ( type == TYPE_STRING || type == TYPE_FILE || type == TYPE_FUNC )
{
auto n = a.size();
val.string_val.length = n;
val.string_val.data = new char[n];
memcpy(val.string_val.data, a.data(), n);
return true;
}
return false;
}
result_type operator()(const broker::address& a)
{
if ( type == TYPE_ADDR )
{
auto bits = reinterpret_cast<const in6_addr*>(&a.bytes());
auto b = IPAddr(*bits);
if ( a.is_v4() )
{
val.addr_val.family = IPv4;
b.CopyIPv4(&val.addr_val.in.in4);
return true;
}
if ( a.is_v6() )
{
val.addr_val.family = IPv6;
b.CopyIPv6(&val.addr_val.in.in6);
return true;
}
}
return false;
}
result_type operator()(const broker::subnet& s)
{
if ( type == TYPE_SUBNET )
{
auto bits = reinterpret_cast<const in6_addr*>(&s.network().bytes());
auto a = IPAddr(*bits);
val.subnet_val.length = s.length();
if ( s.network().is_v4() )
{
val.subnet_val.prefix.family = IPv4;
a.CopyIPv4(&val.subnet_val.prefix.in.in4);
val.subnet_val.length += 96;
return true;
}
if ( s.network().is_v6() )
{
val.subnet_val.prefix.family = IPv6;
a.CopyIPv6(&val.subnet_val.prefix.in.in6);
return true;
}
}
return false;
}
result_type operator()(const broker::port& a)
{
if ( type == TYPE_PORT )
{
val.port_val.port = a.number();
val.port_val.proto = bro_broker::to_bro_port_proto(a.type());
return true;
}
return false;
}
result_type operator()(const broker::time_point& a)
{
if ( type == TYPE_TIME )
{
val.double_val = a.value;
return true;
}
return false;
}
result_type operator()(const broker::time_duration& a)
{
if ( type == TYPE_INTERVAL )
{
val.double_val = a.value;
return true;
}
return false;
}
result_type operator()(const broker::enum_value& a)
{
if ( type == TYPE_ENUM )
{
auto n = a.name.size();
val.string_val.length = n;
val.string_val.data = new char[n];
memcpy(val.string_val.data, a.name.data(), n);
return true;
}
return false;
}
result_type operator()(const broker::set& a)
{
if ( type == TYPE_TABLE )
{
val.set_val.size = a.size();
val.set_val.vals = new threading::Value* [val.set_val.size];
auto p = val.set_val.vals;
for ( auto& i : a )
*p++ = bro_broker::data_to_threading_val(move(i));
return true;
}
return false;
}
result_type operator()(const broker::table& a)
{
return false;
}
result_type operator()(const broker::vector& a)
{
if ( type == TYPE_VECTOR )
{
val.vector_val.size = a.size();
val.vector_val.vals = new threading::Value* [val.vector_val.size];
auto p = val.vector_val.vals;
for ( auto& i : a )
*p++ = bro_broker::data_to_threading_val(move(i));
return true;
}
return false;
}
result_type operator()(const broker::record& a)
{
return false;
}
};
threading::Value* bro_broker::data_to_threading_val(broker::data d)
{
auto r = broker::get<broker::record>(d);
if ( ! r )
return nullptr;
auto type = broker::get<uint64_t>(*r->get(0));
auto present = broker::get<bool>(*r->get(1));
auto data = r->get(2);
if ( ! (type && present) )
return nullptr;
if ( *present && ! data )
return nullptr;
auto tv = new threading::Value;
tv->type = static_cast<TypeTag>(*type);
tv->present = *present;
if ( *present && ! broker::visit(threading_val_converter{tv->type, tv->val}, *data) )
{
delete tv;
return nullptr;
}
return tv;
}
broker::data bro_broker::threading_field_to_data(const threading::Field* f)
{
auto name = broker::record::field(f->name);
auto type = broker::record::field(static_cast<uint64_t>(f->type));
auto subtype = broker::record::field(static_cast<uint64_t>(f->subtype));
auto optional = broker::record::field(f->optional);
broker::util::optional<broker::data> secondary;
if ( f->secondary_name )
secondary = {f->secondary_name};
return move(broker::record({name, secondary, type, subtype, optional}));
}
threading::Field* bro_broker::data_to_threading_field(broker::data d)
{
auto r = broker::get<broker::record>(d);
if ( ! r )
return nullptr;
auto name = broker::get<std::string>(*r->get(0));
auto secondary = r->get(1);
auto type = broker::get<uint64_t>(*r->get(2));
auto subtype = broker::get<uint64_t>(*r->get(3));
auto optional = broker::get<bool>(*r->get(4));
if ( ! (name && type && subtype && optional) )
return nullptr;
if ( secondary && ! broker::is<std::string>(*secondary) )
return nullptr;
return new threading::Field(name->c_str(),
secondary ? broker::get<std::string>(*secondary)->c_str() : nullptr,
static_cast<TypeTag>(*type),
static_cast<TypeTag>(*subtype),
*optional);
}

View file

@ -61,6 +61,36 @@ broker::util::optional<broker::data> val_to_data(Val* v);
*/
Val* data_to_val(broker::data d, BroType* type, bool require_log_attr = false);
/**
* Convert a Bro threading::Value to a Broker data value.
* @param v a Bro threading::Value.
* @return a Broker data value if the Bro threading::Value could be converted to one.
*/
broker::util::optional<broker::data> threading_val_to_data(const threading::Value* v);
/**
* Convert a Bro threading::Field to a Broker data value.
* @param f a Bro threading::Field.
* @return a Broker data value if the Bro threading::Field could be converted to one.
*/
broker::data threading_field_to_data(const threading::Field* f);
/**
* Convert a Broker data value to a Bro threading::Value.
* @param d a Broker data value.
* @return a pointer to a new Bro threading::Value or a nullptr if the conversion was not
* possible.
*/
threading::Value* data_to_threading_val(broker::data d);
/**
* Convert a Broker data value to a Bro threading::Value.
* @param d a Broker data value.
* @return a pointer to a new Bro threading::Value or a nullptr if the conversion was not
* possible.
*/
threading::Field* data_to_threading_field(broker::data d);
/**
* A Bro value which wraps a Broker data value.
*/

View file

@ -20,10 +20,17 @@ using namespace std;
VectorType* bro_broker::Manager::vector_of_data_type;
EnumType* bro_broker::Manager::log_id_type;
EnumType* bro_broker::Manager::writer_id_type;
int bro_broker::Manager::send_flags_self_idx;
int bro_broker::Manager::send_flags_peers_idx;
int bro_broker::Manager::send_flags_unsolicited_idx;
struct unref_guard {
unref_guard(Val* v) : val(v) {}
~unref_guard() { Unref(val); }
Val* val;
};
bro_broker::Manager::Manager()
: iosource::IOSource(), next_timestamp(-1)
{
@ -83,6 +90,7 @@ bool bro_broker::Manager::Enable(Val* broker_endpoint_flags)
send_flags_unsolicited_idx = require_field(send_flags_type, "unsolicited");
log_id_type = internal_type("Log::ID")->AsEnumType();
writer_id_type = internal_type("Log::Writer")->AsEnumType();
bro_broker::opaque_of_data_type = new OpaqueType("Broker::Data");
bro_broker::opaque_of_set_iterator = new OpaqueType("Broker::SetIterator");
@ -206,8 +214,10 @@ bool bro_broker::Manager::Event(std::string topic, broker::message msg, int flag
return true;
}
bool bro_broker::Manager::Log(EnumVal* stream, RecordVal* columns, RecordType* info,
int flags)
bool bro_broker::Manager::CreateLog(EnumVal* stream, EnumVal* writer,
const logging::WriterBackend::WriterInfo& info,
int num_fields, const threading::Field* const * fields,
int flags, const string& peer)
{
if ( ! Enabled() )
return false;
@ -221,40 +231,82 @@ bool bro_broker::Manager::Log(EnumVal* stream, RecordVal* columns, RecordType* i
return false;
}
broker::record column_data;
auto writer_name = writer->Type()->AsEnumType()->Lookup(writer->AsEnum());
for ( auto i = 0u; i < static_cast<size_t>(info->NumFields()); ++i )
if ( ! writer_name )
{
if ( ! info->FieldDecl(i)->FindAttr(ATTR_LOG) )
continue;
auto field_val = columns->LookupWithDefault(i);
if ( ! field_val )
{
column_data.fields.emplace_back(broker::record::field{});
continue;
}
auto opt_field_data = val_to_data(field_val);
Unref(field_val);
if ( ! opt_field_data )
{
reporter->Error("Failed to remotely log stream %s: "
"unsupported type '%s'",
stream_name,
type_name(info->FieldDecl(i)->type->Tag()));
reporter->Error("Failed to remotely log: writer %d doesn't have name",
writer->AsEnum());
return false;
}
column_data.fields.emplace_back(
broker::record::field{move(*opt_field_data)});
auto writer_info = info.ToBroker();
broker::vector fields_data;
for ( auto i = 0; i < num_fields; ++i )
{
auto field_data = threading_field_to_data(fields[i]);
fields_data.push_back(move(field_data));
}
broker::message msg{broker::enum_value{stream_name}, move(column_data)};
// TODO: If peer is given, send message to just that one destination.
std::string topic = std::string("bro/log/") + stream_name;
auto bstream_name = broker::enum_value(move(stream_name));
auto bwriter_name = broker::enum_value(move(writer_name));
broker::message msg{move("create"), move(bstream_name), move(bwriter_name), move(writer_info), move(fields_data)};
endpoint->send(move(topic), move(msg), flags);
return true;
}
bool bro_broker::Manager::Log(EnumVal* stream, EnumVal* writer, string path, int num_vals, const threading::Value* const * vals, int flags)
{
if ( ! Enabled() )
return false;
auto stream_name = stream->Type()->AsEnumType()->Lookup(stream->AsEnum());
if ( ! stream_name )
{
reporter->Error("Failed to remotely log: stream %d doesn't have name",
stream->AsEnum());
return false;
}
auto writer_name = writer->Type()->AsEnumType()->Lookup(writer->AsEnum());
if ( ! writer_name )
{
reporter->Error("Failed to remotely log: writer %d doesn't have name",
writer->AsEnum());
return false;
}
broker::vector vals_data;
for ( auto i = 0; i < num_vals; ++i )
{
auto field_data = threading_val_to_data(vals[i]);
if ( ! field_data )
{
reporter->Error("Failed to remotely log stream %s: "
"unsupported type for field #%d",
stream_name, i);
return false;
}
vals_data.push_back(move(*field_data));
}
std::string topic = std::string("bro/log/") + stream_name;
auto bstream_name = broker::enum_value(move(stream_name));
auto bwriter_name = broker::enum_value(move(writer_name));
broker::message msg{move("write"), move(bstream_name), move(bwriter_name), move(path), move(vals_data)};
endpoint->send(move(topic), move(msg), flags);
return true;
}
@ -639,6 +691,8 @@ void bro_broker::Manager::Process()
{
switch ( u.status ) {
case broker::outgoing_connection_status::tag::established:
log_mgr->SendAllWritersTo(u.peer_name);
if ( Broker::outgoing_connection_established )
{
val_list* vl = new val_list;
@ -684,6 +738,8 @@ void bro_broker::Manager::Process()
{
switch ( u.status ) {
case broker::incoming_connection_status::tag::established:
log_mgr->SendAllWritersTo(u.peer_name);
if ( Broker::incoming_connection_established )
{
val_list* vl = new val_list;
@ -809,12 +865,6 @@ void bro_broker::Manager::Process()
}
}
struct unref_guard {
unref_guard(Val* v) : val(v) {}
~unref_guard() { Unref(val); }
Val* val;
};
for ( auto& ls : log_subscriptions )
{
auto log_messages = ls.second.q.want_pop();
@ -826,59 +876,19 @@ void bro_broker::Manager::Process()
for ( auto& lm : log_messages )
{
if ( lm.size() != 2 )
if ( lm.size() < 1 )
{
reporter->Warning("got bad remote log size: %zd (expect 2)",
lm.size());
reporter->Warning("got bad remote log message, no type field");
continue;
}
if ( ! broker::get<broker::enum_value>(lm[0]) )
{
reporter->Warning("got remote log w/o stream id: %d",
if ( lm[0] == "create" )
ProcessCreateLog(std::move(lm));
else if ( lm[0] == "write" )
ProcessWriteLog(std::move(lm));
else
reporter->Warning("got remote log w/o known type: %d",
static_cast<int>(broker::which(lm[0])));
continue;
}
if ( ! broker::get<broker::record>(lm[1]) )
{
reporter->Warning("got remote log w/o columns: %d",
static_cast<int>(broker::which(lm[1])));
continue;
}
auto stream_id = data_to_val(move(lm[0]), log_id_type);
if ( ! stream_id )
{
reporter->Warning("failed to unpack remote log stream id");
continue;
}
unref_guard stream_id_unreffer{stream_id};
auto columns_type = log_mgr->StreamColumns(stream_id->AsEnumVal());
if ( ! columns_type )
{
reporter->Warning("got remote log for unknown stream: %s",
stream_id->Type()->AsEnumType()->Lookup(
stream_id->AsEnum()));
continue;
}
auto columns = data_to_val(move(lm[1]), columns_type, true);
if ( ! columns )
{
reporter->Warning("failed to unpack remote log stream columns"
" for stream: %s",
stream_id->Type()->AsEnumType()->Lookup(
stream_id->AsEnum()));
continue;
}
log_mgr->Write(stream_id->AsEnumVal(), columns->AsRecordVal());
Unref(columns);
}
}
@ -979,6 +989,202 @@ void bro_broker::Manager::Process()
next_timestamp = -1;
}
bool bro_broker::Manager::ProcessCreateLog(broker::message msg)
{
if ( msg.size() != 5 )
{
reporter->Warning("got bad remote log create size: %zd (expected 5)",
msg.size());
return false;
}
unsigned int idx = 1; // Skip type at index 0.
// Get stream ID.
if ( ! broker::get<broker::enum_value>(msg[idx]) )
{
reporter->Warning("got remote log create w/o stream id: %d",
static_cast<int>(broker::which(msg[idx])));
return false;
}
auto stream_id = data_to_val(move(msg[idx]), log_id_type);
if ( ! stream_id )
{
reporter->Warning("failed to unpack remote log stream id");
return false;
}
unref_guard stream_id_unreffer{stream_id};
++idx;
// Get writer ID.
if ( ! broker::get<broker::enum_value>(msg[idx]) )
{
reporter->Warning("got remote log create w/o writer id: %d",
static_cast<int>(broker::which(msg[idx])));
return false;
}
auto writer_id = data_to_val(move(msg[idx]), writer_id_type);
if ( ! writer_id )
{
reporter->Warning("failed to unpack remote log writer id");
return false;
}
unref_guard writer_id_unreffer{writer_id};
++idx;
// Get writer info.
if ( ! broker::get<broker::record>(msg[idx]) )
{
reporter->Warning("got remote log create w/o writer info id: %d",
static_cast<int>(broker::which(msg[idx])));
return false;
}
auto writer_info = std::unique_ptr<logging::WriterBackend::WriterInfo>(new logging::WriterBackend::WriterInfo);
if ( ! writer_info->FromBroker(std::move(msg[idx])) )
{
reporter->Warning("failed to unpack remote log writer info");
return false;
}
++idx;
// Get log fields.
auto fields_data = broker::get<broker::vector>(msg[idx]);
if ( ! fields_data )
{
reporter->Warning("failed to unpack remote log fields");
return false;
}
auto num_fields = fields_data->size();
auto fields = new threading::Field* [num_fields];
for ( auto i = 0u; i < num_fields; ++i )
{
if ( auto field = data_to_threading_field((*fields_data)[i]) )
fields[i] = field;
else
{
reporter->Warning("failed to convert remote log field # %d", i);
return false;
}
}
if ( ! log_mgr->CreateWriterForRemoteLog(stream_id->AsEnumVal(), writer_id->AsEnumVal(), writer_info.get(), num_fields, fields) )
{
ODesc d;
stream_id->Describe(&d);
reporter->Warning("failed to create remote log stream for %s locally", d.Description());
}
writer_info.release(); // log_mgr took ownership.
return true;
}
bool bro_broker::Manager::ProcessWriteLog(broker::message msg)
{
if ( msg.size() != 5 )
{
reporter->Warning("got bad remote log size: %zd (expected 5)",
msg.size());
return false;
}
unsigned int idx = 1; // Skip type at index 0.
// Get stream ID.
if ( ! broker::get<broker::enum_value>(msg[idx]) )
{
reporter->Warning("got remote log w/o stream id: %d",
static_cast<int>(broker::which(msg[idx])));
return false;
}
auto stream_id = data_to_val(move(msg[idx]), log_id_type);
if ( ! stream_id )
{
reporter->Warning("failed to unpack remote log stream id");
return false;
}
unref_guard stream_id_unreffer{stream_id};
++idx;
// Get writer ID.
if ( ! broker::get<broker::enum_value>(msg[idx]) )
{
reporter->Warning("got remote log w/o writer id: %d",
static_cast<int>(broker::which(msg[idx])));
return false;
}
auto writer_id = data_to_val(move(msg[idx]), writer_id_type);
if ( ! writer_id )
{
reporter->Warning("failed to unpack remote log writer id");
return false;
}
unref_guard writer_id_unreffer{writer_id};
++idx;
// Get path.
auto path = broker::get<std::string>(msg[idx]);
if ( ! path )
{
reporter->Warning("failed to unpack remote log path");
return false;
}
++idx;
// Get log values.
auto vals_data = broker::get<broker::vector>(msg[idx]);
if ( ! vals_data )
{
reporter->Warning("failed to unpack remote log values");
return false;
}
auto num_vals = vals_data->size();
auto vals = new threading::Value* [num_vals];
for ( auto i = 0u; i < num_vals; ++i )
{
if ( auto val = data_to_threading_val((*vals_data)[i]) )
vals[i] = val;
else
{
reporter->Warning("failed to convert remote log arg # %d", i);
return false;
}
}
log_mgr->WriteFromRemote(stream_id->AsEnumVal(), writer_id->AsEnumVal(), *path, num_vals, vals);
return true;
}
bool bro_broker::Manager::AddStore(StoreHandleVal* handle)
{
if ( ! Enabled() )

View file

@ -156,15 +156,34 @@ public:
/**
* Send a log entry to any interested peers. The topic name used is
* implicitly "bro/log/<stream-name>".
* @param stream_id the stream to which the log entry belongs.
* @param columns the data which comprises the log entry.
* @param info the record type corresponding to the log's columns.
* @param stream the stream to which the log entry belongs.
* @param writer the writer to use for outputting this log entry.
* @param path the log path to output the log entry to.
* @param num_vals the number of fields to log.
* @param vals the log values to log, of size num_vals.
* @param flags tune the behavior of how the message is send.
* See the Broker::SendFlags record type.
* @return true if the message is sent successfully.
*/
bool Log(EnumVal* stream_id, RecordVal* columns, RecordType* info,
int flags);
bool Log(EnumVal* stream, EnumVal* writer, string path, int num_vals,
const threading::Value* const * vals, int flags);
/**
* Send a message to create a log stream to any interested peers.
* The log stream may or may not already exist on the receiving side.
* The topic name used is implicitly "bro/log/<stream-name>".
* @param stream the stream to which the log entry belongs.
* @param writer the writer to use for outputting this log entry.
* @param info backend initialization information for the writer.
* @param num_fields the number of fields the log has.
* @param fields the log's fields, of size num_fields.
* @param flags tune the behavior of how the message is send.
* See the Broker::SendFlags record type.
* @param peer If given, send the message only to this peer.
* @return true if the message is sent successfully.
*/
bool CreateLog(EnumVal* stream, EnumVal* writer, const logging::WriterBackend::WriterInfo& info,
int num_fields, const threading::Field* const * fields, int flags, const string& peer = "");
/**
* Automatically send an event to any interested peers whenever it is
@ -325,7 +344,6 @@ public:
static int send_flags_to_int(Val* flags);
private:
// IOSource interface overrides:
void GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
iosource::FD_Set* except) override;
@ -340,6 +358,9 @@ private:
broker::endpoint& Endpoint()
{ return *endpoint; }
bool ProcessCreateLog(broker::message msg);
bool ProcessWriteLog(broker::message msg);
struct QueueWithStats {
broker::message_queue q;
size_t received = 0;
@ -360,6 +381,7 @@ private:
static VectorType* vector_of_data_type;
static EnumType* log_id_type;
static EnumType* writer_id_type;
static int send_flags_self_idx;
static int send_flags_peers_idx;
static int send_flags_unsolicited_idx;

View file

@ -8,7 +8,6 @@ using namespace plugin::Bro_RawReader;
Plugin::Plugin()
{
init = false;
}
plugin::Configuration Plugin::Configure()
@ -23,21 +22,14 @@ plugin::Configuration Plugin::Configure()
void Plugin::InitPreScript()
{
if ( pthread_mutex_init(&fork_mutex, 0) != 0 )
reporter->FatalError("cannot initialize raw reader's mutex");
init = true;
}
void Plugin::Done()
{
pthread_mutex_destroy(&fork_mutex);
init = false;
}
pthread_mutex_t* Plugin::ForkMutex()
std::unique_lock<std::mutex> Plugin::ForkMutex()
{
assert(init);
return &fork_mutex;
return std::move(std::unique_lock<std::mutex>(fork_mutex, std::defer_lock));
}

View file

@ -1,5 +1,7 @@
// See the file in the main distribution directory for copyright.
#include <mutex>
#include "plugin/Plugin.h"
#include "Raw.h"
@ -16,11 +18,10 @@ public:
virtual void InitPreScript();
virtual void Done();
pthread_mutex_t * ForkMutex();
std::unique_lock<std::mutex> ForkMutex();
private:
bool init;
pthread_mutex_t fork_mutex;
std::mutex fork_mutex;
};

View file

@ -96,24 +96,21 @@ bool Raw::SetFDFlags(int fd, int cmd, int flags)
}
bool Raw::LockForkMutex()
std::unique_lock<std::mutex> Raw::AcquireForkMutex()
{
int res = pthread_mutex_lock(plugin::Bro_RawReader::plugin.ForkMutex());
if ( res == 0 )
return true;
auto lock = plugin::Bro_RawReader::plugin.ForkMutex();
Error(Fmt("cannot lock fork mutex: %d", res));
return false;
try
{
lock.lock();
}
bool Raw::UnlockForkMutex()
catch ( const std::system_error& e )
{
int res = pthread_mutex_unlock(plugin::Bro_RawReader::plugin.ForkMutex());
if ( res == 0 )
return true;
reporter->FatalErrorWithCore("cannot lock fork mutex: %s", e.what());
}
Error(Fmt("cannot unlock fork mutex: %d", res));
return false;
return lock;
}
bool Raw::Execute()
@ -126,12 +123,10 @@ bool Raw::Execute()
// never crops up... ("never" meaning I haven't seen in it in
// hundreds of tests using 50+ threads where before I'd see the issue
// w/ just 2 threads ~33% of the time).
if ( ! LockForkMutex() )
return false;
auto lock = AcquireForkMutex();
if ( pipe(pipes) != 0 || pipe(pipes+2) || pipe(pipes+4) )
{
UnlockForkMutex();
Error(Fmt("Could not open pipe: %d", errno));
return false;
}
@ -139,7 +134,6 @@ bool Raw::Execute()
childpid = fork();
if ( childpid < 0 )
{
UnlockForkMutex();
Error(Fmt("Could not create child process: %d", errno));
return false;
}
@ -208,8 +202,7 @@ bool Raw::Execute()
}
}
if ( ! UnlockForkMutex() )
return false;
lock.unlock();
ClosePipeEnd(stdout_out);

View file

@ -4,8 +4,8 @@
#define INPUT_READERS_RAW_H
#include <vector>
#include <pthread.h>
#include <memory>
#include <mutex>
#include "input/ReaderBackend.h"
@ -37,8 +37,7 @@ protected:
private:
void ClosePipeEnd(int i);
bool SetFDFlags(int fd, int cmd, int flags);
bool LockForkMutex();
bool UnlockForkMutex();
std::unique_lock<std::mutex> AcquireForkMutex();
bool OpenInput();
bool CloseInput();
@ -87,7 +86,6 @@ private:
};
static const int block_size;
static pthread_mutex_t fork_mutex;
};
}

View file

@ -919,12 +919,6 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
#endif
}
#ifdef ENABLE_BROKER
if ( stream->enable_remote &&
! broker_mgr->Log(id, columns, stream->columns, stream->remote_flags) )
stream->enable_remote = false;
#endif
Unref(columns);
return true;
@ -1121,23 +1115,46 @@ threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
return vals;
}
bool Manager::CreateWriterForRemoteLog(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info,
int num_fields, const threading::Field* const* fields)
{
return CreateWriter(id, writer, info, num_fields, fields, true, false, true);
}
static void delete_info_and_fields(WriterBackend::WriterInfo* info, int num_fields, const threading::Field* const* fields)
{
for ( int i = 0; i < num_fields; i++ )
delete fields[i];
delete [] fields;
delete info;
}
WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info,
int num_fields, const threading::Field* const* fields, bool local, bool remote, bool from_remote,
const string& instantiating_filter)
{
WriterFrontend* result = 0;
Stream* stream = FindStream(id);
if ( ! stream )
{
// Don't know this stream.
delete_info_and_fields(info, num_fields, fields);
return 0;
}
Stream::WriterMap::iterator w =
stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), info->path));
if ( w != stream->writers.end() )
{
// If we already have a writer for this. That's fine, we just
// return it.
delete_info_and_fields(info, num_fields, fields);
return w->second->writer;
}
WriterInfo* winfo = new WriterInfo;
winfo->type = writer->Ref()->AsEnumVal();
@ -1190,7 +1207,11 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBacken
winfo->info->rotation_interval = winfo->interval;
winfo->info->rotation_base = parse_rotate_base_time(base_time);
winfo->writer = new WriterFrontend(*winfo->info, id, writer, local, remote);
#ifdef ENABLE_BROKER
winfo->writer = new WriterFrontend(*winfo->info, id, writer, local, remote, stream->remote_flags);
#else
winfo->writer = new WriterFrontend(*winfo->info, id, writer, local, remote, 0);
#endif
winfo->writer->Init(num_fields, fields);
InstallRotationTimer(winfo);
@ -1207,7 +1228,7 @@ void Manager::DeleteVals(int num_fields, threading::Value** vals)
delete [] vals;
}
bool Manager::Write(EnumVal* id, EnumVal* writer, string path, int num_fields,
bool Manager::WriteFromRemote(EnumVal* id, EnumVal* writer, string path, int num_fields,
threading::Value** vals)
{
Stream* stream = FindStream(id);
@ -1280,6 +1301,34 @@ void Manager::SendAllWritersTo(RemoteSerializer::PeerID peer)
}
}
void Manager::SendAllWritersTo(const string& peer)
{
#ifdef ENABLE_BROKER
for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s )
{
Stream* stream = (*s);
if ( ! stream )
continue;
for ( Stream::WriterMap::iterator i = stream->writers.begin();
i != stream->writers.end(); i++ )
{
WriterFrontend* writer = i->second->writer;
EnumVal writer_val(i->first.first, internal_type("Log::Writer")->AsEnumType());
broker_mgr->CreateLog((*s)->id,
&writer_val,
*i->second->info,
writer->NumFields(),
writer->Fields(),
stream->remote_flags,
peer);
}
}
#endif
}
bool Manager::SetBuf(EnumVal* id, bool enabled)
{
Stream* stream = FindStream(id);

View file

@ -129,6 +129,52 @@ public:
*/
bool Write(EnumVal* id, RecordVal* columns);
/**
* Create a new log writer frontend. This is exposed so that the
* communication system can recreate remote log streams locally.
*
* @param id The enum value corresponding to the log stream.
*
* @param writer The enum value corresponding to the desired log writer.
*
* @param info A fully initialized object defining the
* characteristics of the backend writer instance. The method takes
* ownership of this.
*
* @param num_fields The number of log fields to write.
*
* @param vals An array of log fields to write, of size num_fields.
* The method takes ownership of the array.
*
* @return Returns true if the writer was successfully created.
*/
bool CreateWriterForRemoteLog(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info,
int num_fields, const threading::Field* const* fields);
/**
* Writes out log entries that have already passed through all
* filters (and have raised any events). This is meant called for logs
* received already processed from remote.
*
* @param stream The enum value corresponding to the log stream.
*
* @param writer The enum value corresponding to the desired log writer.
*
* @param path The path of the target log stream to write to.
*
* @param num_fields The number of log values to write.
*
* @param vals An array of log values to write, of size num_fields.
* The method takes ownership of the array.
*/
bool WriteFromRemote(EnumVal* stream, EnumVal* writer, string path,
int num_fields, threading::Value** vals);
/**
* Announces all instantiated writers to a given Broker peer.
*/
void SendAllWritersTo(const string& peer);
/**
* Sets log streams buffering state. This adjusts all associated
* writers to the new state.
@ -203,10 +249,6 @@ protected:
int num_fields, const threading::Field* const* fields,
bool local, bool remote, bool from_remote, const string& instantiating_filter="");
// Takes ownership of values..
bool Write(EnumVal* id, EnumVal* writer, string path,
int num_fields, threading::Value** vals);
// Announces all instantiated writers to peer.
void SendAllWritersTo(RemoteSerializer::PeerID peer);

View file

@ -119,6 +119,65 @@ bool WriterBackend::WriterInfo::Write(SerializationFormat* fmt) const
return true;
}
#ifdef ENABLE_BROKER
broker::data WriterBackend::WriterInfo::ToBroker() const
{
auto bpath = broker::record::field(path);
auto brotation_base = broker::record::field(rotation_base);
auto brotation_interval = broker::record::field(rotation_interval);
auto bnetwork_time = broker::record::field(network_time);
auto t = broker::table();
for ( config_map::const_iterator i = config.begin(); i != config.end(); ++i )
{
auto key = std::string(i->first);
auto value = std::string(i->second);
t.insert(std::make_pair(key, value));
}
auto bconfig = broker::record::field(move(t));
return move(broker::record({bpath, brotation_base, brotation_interval, bnetwork_time, bconfig}));
}
bool WriterBackend::WriterInfo::FromBroker(broker::data d)
{
auto r = broker::get<broker::record>(d);
if ( ! r )
return false;
auto bpath = broker::get<std::string>(*r->get(0));
auto brotation_base = broker::get<double>(*r->get(1));
auto brotation_interval = broker::get<double>(*r->get(2));
auto bnetwork_time = broker::get<double>(*r->get(3));
auto bconfig = broker::get<broker::table>(*r->get(4));
if ( ! (bpath && brotation_base && brotation_interval && bnetwork_time && bconfig) )
return false;
path = copy_string(bpath->c_str());
rotation_base = *brotation_base;
rotation_interval = *brotation_interval;
network_time = *bnetwork_time;
for ( auto i : *bconfig )
{
auto k = broker::get<std::string>(i.first);
auto v = broker::get<std::string>(i.second);
if ( ! (k && v) )
return false;
auto p = std::make_pair(copy_string(k->c_str()), copy_string(v->c_str()));
config.insert(p);
}
return true;
}
#endif
WriterBackend::WriterBackend(WriterFrontend* arg_frontend) : MsgThread()
{
num_fields = 0;

View file

@ -7,6 +7,10 @@
#include "threading/MsgThread.h"
#ifdef ENABLE_BROKER
#include "broker/Data.h"
#endif
#include "Component.h"
class RemoteSerializer;
@ -110,15 +114,18 @@ public:
}
}
private:
const WriterInfo& operator=(const WriterInfo& other); // Disable.
friend class ::RemoteSerializer;
// Note, these need to be adapted when changing the struct's
// fields. They serialize/deserialize the struct.
bool Read(SerializationFormat* fmt);
bool Write(SerializationFormat* fmt) const;
#ifdef ENABLE_BROKER
broker::data ToBroker() const;
bool FromBroker(broker::data d);
#endif
private:
const WriterInfo& operator=(const WriterInfo& other); // Disable.
};
/**

View file

@ -2,6 +2,10 @@
#include "Net.h"
#include "threading/SerialTypes.h"
#ifdef ENABLE_BROKER
#include "broker/Manager.h"
#endif
#include "Manager.h"
#include "WriterFrontend.h"
#include "WriterBackend.h"
@ -97,7 +101,7 @@ private:
using namespace logging;
WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVal* arg_stream, EnumVal* arg_writer, bool arg_local, bool arg_remote)
WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVal* arg_stream, EnumVal* arg_writer, bool arg_local, bool arg_remote, int arg_remote_flags)
{
stream = arg_stream;
writer = arg_writer;
@ -108,6 +112,7 @@ WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVa
buf = true;
local = arg_local;
remote = arg_remote;
remote_flags = arg_remote_flags;
write_buffer = 0;
write_buffer_pos = 0;
info = new WriterBackend::WriterInfo(arg_info);
@ -167,12 +172,23 @@ void WriterFrontend::Init(int arg_num_fields, const Field* const * arg_fields)
backend->SendIn(new InitMessage(backend, arg_num_fields, arg_fields));
if ( remote )
{
remote_serializer->SendLogCreateWriter(stream,
writer,
*info,
arg_num_fields,
arg_fields);
#ifdef ENABLE_BROKER
broker_mgr->CreateLog(stream,
writer,
*info,
arg_num_fields,
arg_fields,
remote_flags);
#endif
}
}
void WriterFrontend::Write(int arg_num_fields, Value** vals)
@ -191,12 +207,23 @@ void WriterFrontend::Write(int arg_num_fields, Value** vals)
}
if ( remote )
{
remote_serializer->SendLogWrite(stream,
writer,
info->path,
num_fields,
vals);
#ifdef ENABLE_BROKER
broker_mgr->Log(stream,
writer,
info->path,
num_fields,
vals,
remote_flags);
#endif
}
if ( ! backend )
{
DeleteVals(arg_num_fields, vals);

View file

@ -34,16 +34,17 @@ public:
*
* info: The meta information struct for the writer.
*
* writer_name: A descriptive name for the writer's type.
*
* local: If true, the writer will instantiate a local backend.
*
* remote: If true, the writer will forward all data to remote
* remote: If true, the writer will forward logs to remote
* clients.
*
* remote_flags: Broker flags controlling where remote logs are
* propagated to.
*
* Frontends must only be instantiated by the main thread.
*/
WriterFrontend(const WriterBackend::WriterInfo& info, EnumVal* stream, EnumVal* writer, bool local, bool remote);
WriterFrontend(const WriterBackend::WriterInfo& info, EnumVal* stream, EnumVal* writer, bool local, bool remote, int remote_flags);
/**
* Destructor.
@ -214,6 +215,7 @@ protected:
bool buf; // True if buffering is enabled (default).
bool local; // True if logging locally.
bool remote; // True if loggin remotely.
int remote_flags; // Broker propagation flags.
const char* name; // Descriptive name of the
WriterBackend::WriterInfo* info; // The writer information.

View file

@ -39,7 +39,7 @@ enum HookType {
HOOK_DRAIN_EVENTS, //< Activates Plugin::HookDrainEvents()
HOOK_UPDATE_NETWORK_TIME, //< Activates Plugin::HookUpdateNetworkTime.
HOOK_BRO_OBJ_DTOR, //< Activates Plugin::HookBroObjDtor.
HOOK_SETUP_ANALYZER_TREE, //< Activates Plugin::HookAddToAnalyzerTree
HOOK_SETUP_ANALYZER_TREE, //< Activates Plugin::HookSetupAnalyzerTree
// Meta hooks.
META_HOOK_PRE, //< Activates Plugin::MetaHookPre().
@ -642,6 +642,13 @@ protected:
*/
virtual void HookUpdateNetworkTime(double network_time);
/**
* Hook that executes when a connection's initial analyzer tree
* has been fully set up. The hook can manipulate the tree at this time,
* for example by adding further analyzers.
*
* @param conn The connection.
*/
virtual void HookSetupAnalyzerTree(Connection *conn);
/**

View file

@ -5,6 +5,7 @@
#include "bro-config.h"
#include "BasicThread.h"
#include "Manager.h"
#include "pthread.h"
#ifdef HAVE_LINUX
#include <sys/prctl.h>
@ -21,7 +22,6 @@ BasicThread::BasicThread()
started = false;
terminating = false;
killed = false;
pthread = 0;
buf_len = STD_FMT_BUF_LEN;
buf = (char*) safe_malloc(buf_len);
@ -50,6 +50,7 @@ void BasicThread::SetName(const char* arg_name)
void BasicThread::SetOSName(const char* arg_name)
{
static_assert(std::is_same<std::thread::native_handle_type, pthread_t>::value, "libstdc++ doesn't use pthread_t");
#ifdef HAVE_LINUX
prctl(PR_SET_NAME, arg_name, 0, 0, 0);
@ -60,7 +61,7 @@ void BasicThread::SetOSName(const char* arg_name)
#endif
#ifdef FREEBSD
pthread_set_name_np(pthread_self(), arg_name, arg_name);
pthread_set_name_np(thread.native_handle(), arg_name, arg_name);
#endif
}
@ -108,9 +109,7 @@ void BasicThread::Start()
started = true;
int err = pthread_create(&pthread, 0, BasicThread::launcher, this);
if ( err != 0 )
reporter->FatalError("Cannot create thread %s: %s", name, Strerror(err));
thread = std::thread(&BasicThread::launcher, this);
DBG_LOG(DBG_THREADING, "Started thread %s", name);
@ -147,17 +146,21 @@ void BasicThread::Join()
if ( ! started )
return;
if ( ! pthread )
if ( ! thread.joinable() )
return;
assert(terminating);
if ( pthread_join(pthread, 0) != 0 )
reporter->FatalError("Failure joining thread %s", name);
try
{
thread.join();
}
catch ( const std::system_error& e )
{
reporter->FatalError("Failure joining thread %s with error %s", name, e.what());
}
DBG_LOG(DBG_THREADING, "Joined with thread %s", name);
pthread = 0;
}
void BasicThread::Kill()
@ -180,6 +183,7 @@ void BasicThread::Done()
void* BasicThread::launcher(void *arg)
{
static_assert(std::is_same<std::thread::native_handle_type, pthread_t>::value, "libstdc++ doesn't use pthread_t");
BasicThread* thread = (BasicThread *)arg;
// Block signals in thread. We handle signals only in the main

View file

@ -2,8 +2,7 @@
#ifndef THREADING_BASICTHREAD_H
#define THREADING_BASICTHREAD_H
#include <pthread.h>
#include <semaphore.h>
#include <thread>
#include "util.h"
@ -35,6 +34,9 @@ public:
*/
BasicThread();
BasicThread(BasicThread const&) = delete;
BasicThread& operator =(BasicThread const&) = delete;
/**
* Returns a descriptive name for the thread. If not set via
* SetName(). If not set, a default name is choosen automatically.
@ -192,11 +194,11 @@ protected:
void Done();
private:
// pthread entry function.
// thread entry function.
static void* launcher(void *arg);
const char* name;
pthread_t pthread;
std::thread thread;
bool started; // Set to to true once running.
bool terminating; // Set to to true to signal termination.
bool killed; // Set to true once forcefully killed.

View file

@ -2,8 +2,6 @@
#ifndef THREADING_MSGTHREAD_H
#define THREADING_MSGTHREAD_H
#include <pthread.h>
#include "DebugLogger.h"
#include "BasicThread.h"

View file

@ -1,7 +1,8 @@
#ifndef THREADING_QUEUE_H
#define THREADING_QUEUE_H
#include <pthread.h>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <deque>
#include <stdint.h>
@ -22,7 +23,7 @@ namespace threading {
*
* All Queue instances must be instantiated by Bro's main thread.
*
* TODO: Unclear how critical performance is for this qeueue. We could like;y
* TODO: Unclear how critical performance is for this qeueue. We could likely
* optimize it further if helpful.
*/
template<typename T>
@ -71,9 +72,10 @@ public:
*/
bool MaybeReady() { return (num_reads != num_writes); }
/** Wake up the reader if it's currently blocked for input. This is
primarily to give it a chance to check termination quickly.
**/
/**
* Wake up the reader if it's currently blocked for input. This is
* primarily to give it a chance to check termination quickly.
*/
void WakeUp();
/**
@ -94,14 +96,17 @@ public:
* Returns statistics about the queue's usage.
*
* @param stats A pointer to a structure that will be filled with
* current numbers. */
* current numbers.
*/
void GetStats(Stats* stats);
private:
static const int NUM_QUEUES = 8;
pthread_mutex_t mutex[NUM_QUEUES]; // Mutex protected shared accesses.
pthread_cond_t has_data[NUM_QUEUES]; // Signals when data becomes available
std::vector<std::unique_lock<std::mutex>> LocksForAllQueues();
std::mutex mutex[NUM_QUEUES]; // Mutex protected shared accesses.
std::condition_variable has_data[NUM_QUEUES]; // Signals when data becomes available
std::queue<T> messages[NUM_QUEUES]; // Actually holds the queued messages
int read_ptr; // Where the next operation will read from
@ -115,17 +120,18 @@ private:
uint64_t num_writes;
};
inline static void safe_lock(pthread_mutex_t* mutex)
inline static std::unique_lock<std::mutex> acquire_lock(std::mutex& m)
{
int res = pthread_mutex_lock(mutex);
if ( res != 0 )
reporter->FatalErrorWithCore("cannot lock mutex: %d(%s)", res, strerror(res));
try
{
return std::move(std::unique_lock<std::mutex>(m));
}
inline static void safe_unlock(pthread_mutex_t* mutex)
catch ( const std::system_error& e )
{
if ( pthread_mutex_unlock(mutex) != 0 )
reporter->FatalErrorWithCore("cannot unlock mutex");
reporter->FatalErrorWithCore("cannot lock mutex: %s", e.what());
// Never gets here.
throw std::exception();
}
}
template<typename T>
@ -136,50 +142,28 @@ inline Queue<T>::Queue(BasicThread* arg_reader, BasicThread* arg_writer)
num_reads = num_writes = 0;
reader = arg_reader;
writer = arg_writer;
for( int i = 0; i < NUM_QUEUES; ++i )
{
if ( pthread_cond_init(&has_data[i], 0) != 0 )
reporter->FatalError("cannot init queue condition variable");
if ( pthread_mutex_init(&mutex[i], 0) != 0 )
reporter->FatalError("cannot init queue mutex");
}
}
template<typename T>
inline Queue<T>::~Queue()
{
for( int i = 0; i < NUM_QUEUES; ++i )
{
pthread_cond_destroy(&has_data[i]);
pthread_mutex_destroy(&mutex[i]);
}
}
template<typename T>
inline T Queue<T>::Get()
{
safe_lock(&mutex[read_ptr]);
auto lock = acquire_lock(mutex[read_ptr]);
int old_read_ptr = read_ptr;
if ( messages[read_ptr].empty() && ! ((reader && reader->Killed()) || (writer && writer->Killed())) )
{
struct timespec ts;
ts.tv_sec = time(0) + 5;
ts.tv_nsec = 0;
pthread_cond_timedwait(&has_data[read_ptr], &mutex[read_ptr], &ts);
safe_unlock(&mutex[read_ptr]);
return 0;
if ( has_data[read_ptr].wait_for(lock, std::chrono::seconds(5)) == std::cv_status::timeout )
return nullptr;
}
else if ( messages[read_ptr].empty() )
{
safe_unlock(&mutex[read_ptr]);
return 0;
}
if ( messages[read_ptr].empty() )
return nullptr;
T data = messages[read_ptr].front();
messages[read_ptr].pop();
@ -187,15 +171,13 @@ inline T Queue<T>::Get()
read_ptr = (read_ptr + 1) % NUM_QUEUES;
++num_reads;
safe_unlock(&mutex[old_read_ptr]);
return data;
}
template<typename T>
inline void Queue<T>::Put(T data)
{
safe_lock(&mutex[write_ptr]);
auto lock = acquire_lock(mutex[write_ptr]);
int old_write_ptr = write_ptr;
@ -203,43 +185,59 @@ inline void Queue<T>::Put(T data)
messages[write_ptr].push(data);
if ( need_signal )
pthread_cond_signal(&has_data[write_ptr]);
write_ptr = (write_ptr + 1) % NUM_QUEUES;
++num_writes;
safe_unlock(&mutex[old_write_ptr]);
if ( need_signal )
{
lock.unlock();
has_data[old_write_ptr].notify_one();
}
}
template<typename T>
inline bool Queue<T>::Ready()
{
safe_lock(&mutex[read_ptr]);
auto lock = acquire_lock(mutex[read_ptr]);
bool ret = (messages[read_ptr].size());
safe_unlock(&mutex[read_ptr]);
return ret;
}
template<typename T>
inline std::vector<std::unique_lock<std::mutex>> Queue<T>::LocksForAllQueues()
{
std::vector<std::unique_lock<std::mutex>> locks;
try
{
for ( int i = 0; i < NUM_QUEUES; i++ )
locks.emplace_back(std::unique_lock<std::mutex>(mutex[i]));
}
catch ( const std::system_error& e )
{
reporter->FatalErrorWithCore("cannot lock all mutexes: %s", e.what());
// Never gets here.
throw std::exception();
}
return std::move(locks);
}
template<typename T>
inline uint64_t Queue<T>::Size()
{
// Need to lock all queues.
for ( int i = 0; i < NUM_QUEUES; i++ )
safe_lock(&mutex[i]);
auto locks = LocksForAllQueues();
uint64_t size = 0;
for ( int i = 0; i < NUM_QUEUES; i++ )
size += messages[i].size();
for ( int i = 0; i < NUM_QUEUES; i++ )
safe_unlock(&mutex[i]);
return size;
}
@ -248,29 +246,19 @@ inline void Queue<T>::GetStats(Stats* stats)
{
// To be safe, we look all queues. That's probably unneccessary, but
// doesn't really hurt.
for ( int i = 0; i < NUM_QUEUES; i++ )
safe_lock(&mutex[i]);
auto locks = LocksForAllQueues();
stats->num_reads = num_reads;
stats->num_writes = num_writes;
for ( int i = 0; i < NUM_QUEUES; i++ )
safe_unlock(&mutex[i]);
}
template<typename T>
inline void Queue<T>::WakeUp()
{
for ( int i = 0; i < NUM_QUEUES; i++ )
{
safe_lock(&mutex[i]);
pthread_cond_signal(&has_data[i]);
safe_unlock(&mutex[i]);
}
has_data[i].notify_all();
}
}
#endif

View file

@ -1,6 +0,0 @@
wrote log, [msg=ping, nolog=no, num=0]
wrote log, [msg=ping, nolog=no, num=1]
wrote log, [msg=ping, nolog=no, num=2]
wrote log, [msg=ping, nolog=no, num=3]
wrote log, [msg=ping, nolog=no, num=4]
wrote log, [msg=ping, nolog=no, num=5]

View file

@ -3,7 +3,7 @@
#empty_field (empty)
#unset_field -
#path test
#open 2015-01-26-22-47-11
#open 2017-02-27-16-21-20
#fields msg num
#types string count
ping 0
@ -12,4 +12,4 @@ ping 2
ping 3
ping 4
ping 5
#close 2015-01-26-22-47-11
#close 2017-02-27-16-21-20

View file

@ -3,7 +3,7 @@
#empty_field (empty)
#unset_field -
#path test
#open 2015-01-26-22-47-11
#open 2017-02-27-16-21-19
#fields msg num
#types string count
ping 0
@ -12,4 +12,4 @@ ping 2
ping 3
ping 4
ping 5
#close 2015-01-26-22-47-11
#close 2017-02-27-16-21-20

View file

@ -0,0 +1,10 @@
#separator \x09
#set_separator ,
#empty_field (empty)
#unset_field -
#path test
#open 2017-02-11-02-17-35
#fields b i e c p sn a d t iv s sc ss se vc ve f
#types bool int enum count port subnet addr double time interval string set[count] set[string] set[string] vector[count] vector[string] func
T -42 Test::LOG 21 123 10.0.0.0/24 1.2.3.4 3.14 1486779455.703438 100.000000 hurz 1 AA (empty) 10,20,30 (empty) foo\x0a{ \x0aif (0 < i) \x0a\x09return (Foo);\x0aelse\x0a\x09return (Bar);\x0a\x0a}
#close 2017-02-11-02-17-35

View file

@ -0,0 +1 @@
Broker::outgoing_connection_established, 127.0.0.1, 9999/tcp

View file

@ -0,0 +1,10 @@
#separator \x09
#set_separator ,
#empty_field (empty)
#unset_field -
#path test
#open 2017-02-11-02-17-35
#fields b i e c p sn a d t iv s sc ss se vc ve f
#types bool int enum count port subnet addr double time interval string set[count] set[string] set[string] vector[count] vector[string] func
T -42 Test::LOG 21 123 10.0.0.0/24 1.2.3.4 3.14 1486779455.703438 100.000000 hurz 1 AA (empty) 10,20,30 (empty) foo\x0a{ \x0aif (0 < i) \x0a\x09return (Foo);\x0aelse\x0a\x09return (Bar);\x0a\x0a}
#close 2017-02-11-02-17-36

View file

@ -3,7 +3,7 @@
#empty_field (empty)
#unset_field -
#path loaded_scripts
#open 2016-11-02-17-25-26
#open 2017-02-28-17-15-30
#fields name
#types string
scripts/base/init-bare.bro
@ -58,7 +58,6 @@ scripts/base/init-bare.bro
build/scripts/base/bif/top-k.bif.bro
build/scripts/base/bif/plugins/__load__.bro
build/scripts/base/bif/plugins/Bro_ARP.events.bif.bro
build/scripts/base/bif/plugins/Bro_AYIYA.events.bif.bro
build/scripts/base/bif/plugins/Bro_BackDoor.events.bif.bro
build/scripts/base/bif/plugins/Bro_BitTorrent.events.bif.bro
build/scripts/base/bif/plugins/Bro_ConnSize.events.bif.bro
@ -74,7 +73,6 @@ scripts/base/init-bare.bro
build/scripts/base/bif/plugins/Bro_FTP.events.bif.bro
build/scripts/base/bif/plugins/Bro_FTP.functions.bif.bro
build/scripts/base/bif/plugins/Bro_Gnutella.events.bif.bro
build/scripts/base/bif/plugins/Bro_GSSAPI.types.bif.bro
build/scripts/base/bif/plugins/Bro_GSSAPI.events.bif.bro
build/scripts/base/bif/plugins/Bro_GTPv1.events.bif.bro
build/scripts/base/bif/plugins/Bro_HTTP.events.bif.bro
@ -96,7 +94,6 @@ scripts/base/init-bare.bro
build/scripts/base/bif/plugins/Bro_NTLM.types.bif.bro
build/scripts/base/bif/plugins/Bro_NTLM.events.bif.bro
build/scripts/base/bif/plugins/Bro_NTP.events.bif.bro
build/scripts/base/bif/plugins/Bro_PIA.events.bif.bro
build/scripts/base/bif/plugins/Bro_POP3.events.bif.bro
build/scripts/base/bif/plugins/Bro_RADIUS.events.bif.bro
build/scripts/base/bif/plugins/Bro_RDP.events.bif.bro
@ -150,7 +147,6 @@ scripts/base/init-bare.bro
build/scripts/base/bif/plugins/Bro_Teredo.events.bif.bro
build/scripts/base/bif/plugins/Bro_UDP.events.bif.bro
build/scripts/base/bif/plugins/Bro_XMPP.events.bif.bro
build/scripts/base/bif/plugins/Bro_ZIP.events.bif.bro
build/scripts/base/bif/plugins/Bro_FileEntropy.events.bif.bro
build/scripts/base/bif/plugins/Bro_FileExtract.events.bif.bro
build/scripts/base/bif/plugins/Bro_FileExtract.functions.bif.bro
@ -171,4 +167,4 @@ scripts/base/init-bare.bro
build/scripts/base/bif/plugins/Bro_SQLiteWriter.sqlite.bif.bro
scripts/policy/misc/loaded-scripts.bro
scripts/base/utils/paths.bro
#close 2016-11-02-17-25-26
#close 2017-02-28-17-15-30

View file

@ -3,7 +3,7 @@
#empty_field (empty)
#unset_field -
#path loaded_scripts
#open 2016-11-02-17-25-18
#open 2017-02-28-17-19-41
#fields name
#types string
scripts/base/init-bare.bro
@ -58,7 +58,6 @@ scripts/base/init-bare.bro
build/scripts/base/bif/top-k.bif.bro
build/scripts/base/bif/plugins/__load__.bro
build/scripts/base/bif/plugins/Bro_ARP.events.bif.bro
build/scripts/base/bif/plugins/Bro_AYIYA.events.bif.bro
build/scripts/base/bif/plugins/Bro_BackDoor.events.bif.bro
build/scripts/base/bif/plugins/Bro_BitTorrent.events.bif.bro
build/scripts/base/bif/plugins/Bro_ConnSize.events.bif.bro
@ -74,7 +73,6 @@ scripts/base/init-bare.bro
build/scripts/base/bif/plugins/Bro_FTP.events.bif.bro
build/scripts/base/bif/plugins/Bro_FTP.functions.bif.bro
build/scripts/base/bif/plugins/Bro_Gnutella.events.bif.bro
build/scripts/base/bif/plugins/Bro_GSSAPI.types.bif.bro
build/scripts/base/bif/plugins/Bro_GSSAPI.events.bif.bro
build/scripts/base/bif/plugins/Bro_GTPv1.events.bif.bro
build/scripts/base/bif/plugins/Bro_HTTP.events.bif.bro
@ -96,7 +94,6 @@ scripts/base/init-bare.bro
build/scripts/base/bif/plugins/Bro_NTLM.types.bif.bro
build/scripts/base/bif/plugins/Bro_NTLM.events.bif.bro
build/scripts/base/bif/plugins/Bro_NTP.events.bif.bro
build/scripts/base/bif/plugins/Bro_PIA.events.bif.bro
build/scripts/base/bif/plugins/Bro_POP3.events.bif.bro
build/scripts/base/bif/plugins/Bro_RADIUS.events.bif.bro
build/scripts/base/bif/plugins/Bro_RDP.events.bif.bro
@ -150,7 +147,6 @@ scripts/base/init-bare.bro
build/scripts/base/bif/plugins/Bro_Teredo.events.bif.bro
build/scripts/base/bif/plugins/Bro_UDP.events.bif.bro
build/scripts/base/bif/plugins/Bro_XMPP.events.bif.bro
build/scripts/base/bif/plugins/Bro_ZIP.events.bif.bro
build/scripts/base/bif/plugins/Bro_FileEntropy.events.bif.bro
build/scripts/base/bif/plugins/Bro_FileExtract.events.bif.bro
build/scripts/base/bif/plugins/Bro_FileExtract.functions.bif.bro
@ -359,4 +355,4 @@ scripts/base/init-default.bro
scripts/base/misc/find-filtered-trace.bro
scripts/base/misc/version.bro
scripts/policy/misc/loaded-scripts.bro
#close 2016-11-02-17-25-18
#close 2017-02-28-17-19-41

View file

@ -247,7 +247,7 @@
0.000000 MetaHookPost CallFunction(Log::__create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::__create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::__create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::__write, <frame>, (PacketFilter::LOG, [ts=1487443758.386684, node=bro, filter=ip or not ip, init=T, success=T])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::__write, <frame>, (PacketFilter::LOG, [ts=1488302456.440387, node=bro, filter=ip or not ip, init=T, success=T])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::add_default_filter, <frame>, (Cluster::LOG)) -> <no result>
0.000000 MetaHookPost CallFunction(Log::add_default_filter, <frame>, (Communication::LOG)) -> <no result>
0.000000 MetaHookPost CallFunction(Log::add_default_filter, <frame>, (Conn::LOG)) -> <no result>
@ -377,7 +377,7 @@
0.000000 MetaHookPost CallFunction(Log::create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::write, <frame>, (PacketFilter::LOG, [ts=1487443758.386684, node=bro, filter=ip or not ip, init=T, success=T])) -> <no result>
0.000000 MetaHookPost CallFunction(Log::write, <frame>, (PacketFilter::LOG, [ts=1488302456.440387, node=bro, filter=ip or not ip, init=T, success=T])) -> <no result>
0.000000 MetaHookPost CallFunction(NetControl::check_plugins, <frame>, ()) -> <no result>
0.000000 MetaHookPost CallFunction(NetControl::init, <null>, ()) -> <no result>
0.000000 MetaHookPost CallFunction(Notice::want_pp, <frame>, ()) -> <no result>
@ -416,7 +416,6 @@
0.000000 MetaHookPost LoadFile(../main) -> -1
0.000000 MetaHookPost LoadFile(../plugin) -> -1
0.000000 MetaHookPost LoadFile(./Bro_ARP.events.bif.bro) -> -1
0.000000 MetaHookPost LoadFile(./Bro_AYIYA.events.bif.bro) -> -1
0.000000 MetaHookPost LoadFile(./Bro_AsciiReader.ascii.bif.bro) -> -1
0.000000 MetaHookPost LoadFile(./Bro_AsciiWriter.ascii.bif.bro) -> -1
0.000000 MetaHookPost LoadFile(./Bro_BackDoor.events.bif.bro) -> -1
@ -440,7 +439,6 @@
0.000000 MetaHookPost LoadFile(./Bro_FileHash.events.bif.bro) -> -1
0.000000 MetaHookPost LoadFile(./Bro_Finger.events.bif.bro) -> -1
0.000000 MetaHookPost LoadFile(./Bro_GSSAPI.events.bif.bro) -> -1
0.000000 MetaHookPost LoadFile(./Bro_GSSAPI.types.bif.bro) -> -1
0.000000 MetaHookPost LoadFile(./Bro_GTPv1.events.bif.bro) -> -1
0.000000 MetaHookPost LoadFile(./Bro_Gnutella.events.bif.bro) -> -1
0.000000 MetaHookPost LoadFile(./Bro_HTTP.events.bif.bro) -> -1
@ -465,7 +463,6 @@
0.000000 MetaHookPost LoadFile(./Bro_NetBIOS.functions.bif.bro) -> -1
0.000000 MetaHookPost LoadFile(./Bro_NoneWriter.none.bif.bro) -> -1
0.000000 MetaHookPost LoadFile(./Bro_PE.events.bif.bro) -> -1
0.000000 MetaHookPost LoadFile(./Bro_PIA.events.bif.bro) -> -1
0.000000 MetaHookPost LoadFile(./Bro_POP3.events.bif.bro) -> -1
0.000000 MetaHookPost LoadFile(./Bro_RADIUS.events.bif.bro) -> -1
0.000000 MetaHookPost LoadFile(./Bro_RDP.events.bif.bro) -> -1
@ -528,7 +525,6 @@
0.000000 MetaHookPost LoadFile(./Bro_X509.functions.bif.bro) -> -1
0.000000 MetaHookPost LoadFile(./Bro_X509.types.bif.bro) -> -1
0.000000 MetaHookPost LoadFile(./Bro_XMPP.events.bif.bro) -> -1
0.000000 MetaHookPost LoadFile(./Bro_ZIP.events.bif.bro) -> -1
0.000000 MetaHookPost LoadFile(./acld) -> -1
0.000000 MetaHookPost LoadFile(./addrs) -> -1
0.000000 MetaHookPost LoadFile(./analyzer.bif.bro) -> -1
@ -968,7 +964,7 @@
0.000000 MetaHookPre CallFunction(Log::__create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird]))
0.000000 MetaHookPre CallFunction(Log::__create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509]))
0.000000 MetaHookPre CallFunction(Log::__create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql]))
0.000000 MetaHookPre CallFunction(Log::__write, <frame>, (PacketFilter::LOG, [ts=1487443758.386684, node=bro, filter=ip or not ip, init=T, success=T]))
0.000000 MetaHookPre CallFunction(Log::__write, <frame>, (PacketFilter::LOG, [ts=1488302456.440387, node=bro, filter=ip or not ip, init=T, success=T]))
0.000000 MetaHookPre CallFunction(Log::add_default_filter, <frame>, (Cluster::LOG))
0.000000 MetaHookPre CallFunction(Log::add_default_filter, <frame>, (Communication::LOG))
0.000000 MetaHookPre CallFunction(Log::add_default_filter, <frame>, (Conn::LOG))
@ -1098,7 +1094,7 @@
0.000000 MetaHookPre CallFunction(Log::create_stream, <frame>, (Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird]))
0.000000 MetaHookPre CallFunction(Log::create_stream, <frame>, (X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509]))
0.000000 MetaHookPre CallFunction(Log::create_stream, <frame>, (mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql]))
0.000000 MetaHookPre CallFunction(Log::write, <frame>, (PacketFilter::LOG, [ts=1487443758.386684, node=bro, filter=ip or not ip, init=T, success=T]))
0.000000 MetaHookPre CallFunction(Log::write, <frame>, (PacketFilter::LOG, [ts=1488302456.440387, node=bro, filter=ip or not ip, init=T, success=T]))
0.000000 MetaHookPre CallFunction(NetControl::check_plugins, <frame>, ())
0.000000 MetaHookPre CallFunction(NetControl::init, <null>, ())
0.000000 MetaHookPre CallFunction(Notice::want_pp, <frame>, ())
@ -1137,7 +1133,6 @@
0.000000 MetaHookPre LoadFile(../main)
0.000000 MetaHookPre LoadFile(../plugin)
0.000000 MetaHookPre LoadFile(./Bro_ARP.events.bif.bro)
0.000000 MetaHookPre LoadFile(./Bro_AYIYA.events.bif.bro)
0.000000 MetaHookPre LoadFile(./Bro_AsciiReader.ascii.bif.bro)
0.000000 MetaHookPre LoadFile(./Bro_AsciiWriter.ascii.bif.bro)
0.000000 MetaHookPre LoadFile(./Bro_BackDoor.events.bif.bro)
@ -1161,7 +1156,6 @@
0.000000 MetaHookPre LoadFile(./Bro_FileHash.events.bif.bro)
0.000000 MetaHookPre LoadFile(./Bro_Finger.events.bif.bro)
0.000000 MetaHookPre LoadFile(./Bro_GSSAPI.events.bif.bro)
0.000000 MetaHookPre LoadFile(./Bro_GSSAPI.types.bif.bro)
0.000000 MetaHookPre LoadFile(./Bro_GTPv1.events.bif.bro)
0.000000 MetaHookPre LoadFile(./Bro_Gnutella.events.bif.bro)
0.000000 MetaHookPre LoadFile(./Bro_HTTP.events.bif.bro)
@ -1186,7 +1180,6 @@
0.000000 MetaHookPre LoadFile(./Bro_NetBIOS.functions.bif.bro)
0.000000 MetaHookPre LoadFile(./Bro_NoneWriter.none.bif.bro)
0.000000 MetaHookPre LoadFile(./Bro_PE.events.bif.bro)
0.000000 MetaHookPre LoadFile(./Bro_PIA.events.bif.bro)
0.000000 MetaHookPre LoadFile(./Bro_POP3.events.bif.bro)
0.000000 MetaHookPre LoadFile(./Bro_RADIUS.events.bif.bro)
0.000000 MetaHookPre LoadFile(./Bro_RDP.events.bif.bro)
@ -1249,7 +1242,6 @@
0.000000 MetaHookPre LoadFile(./Bro_X509.functions.bif.bro)
0.000000 MetaHookPre LoadFile(./Bro_X509.types.bif.bro)
0.000000 MetaHookPre LoadFile(./Bro_XMPP.events.bif.bro)
0.000000 MetaHookPre LoadFile(./Bro_ZIP.events.bif.bro)
0.000000 MetaHookPre LoadFile(./acld)
0.000000 MetaHookPre LoadFile(./addrs)
0.000000 MetaHookPre LoadFile(./analyzer.bif.bro)
@ -1688,7 +1680,7 @@
0.000000 | HookCallFunction Log::__create_stream(Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])
0.000000 | HookCallFunction Log::__create_stream(X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])
0.000000 | HookCallFunction Log::__create_stream(mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])
0.000000 | HookCallFunction Log::__write(PacketFilter::LOG, [ts=1487443758.386684, node=bro, filter=ip or not ip, init=T, success=T])
0.000000 | HookCallFunction Log::__write(PacketFilter::LOG, [ts=1488302456.440387, node=bro, filter=ip or not ip, init=T, success=T])
0.000000 | HookCallFunction Log::add_default_filter(Cluster::LOG)
0.000000 | HookCallFunction Log::add_default_filter(Communication::LOG)
0.000000 | HookCallFunction Log::add_default_filter(Conn::LOG)
@ -1818,7 +1810,7 @@
0.000000 | HookCallFunction Log::create_stream(Weird::LOG, [columns=<no value description>, ev=Weird::log_weird, path=weird])
0.000000 | HookCallFunction Log::create_stream(X509::LOG, [columns=<no value description>, ev=X509::log_x509, path=x509])
0.000000 | HookCallFunction Log::create_stream(mysql::LOG, [columns=<no value description>, ev=MySQL::log_mysql, path=mysql])
0.000000 | HookCallFunction Log::write(PacketFilter::LOG, [ts=1487443758.386684, node=bro, filter=ip or not ip, init=T, success=T])
0.000000 | HookCallFunction Log::write(PacketFilter::LOG, [ts=1488302456.440387, node=bro, filter=ip or not ip, init=T, success=T])
0.000000 | HookCallFunction NetControl::check_plugins()
0.000000 | HookCallFunction NetControl::init()
0.000000 | HookCallFunction Notice::want_pp()

View file

@ -1,3 +1,3 @@
KRB_AP_REQUEST
[pvno=5, realm=VLADG.NET, service_name=krbtgt/VLADG.NET, cipher=18, ciphertext=\x04\x81\xfa{\x9fY\xd0f\x8dS\xf4I\x88\x04\xfa\xc1\xd8m\xa2\xb7+\xbb\x19\xcag\x0c\x13\xd1g*\xfc\x18\xd1\xb1\x80!\xbd\x85\xec\xf9\x9b\xfa-\x18\xb6\xf5h\x91\xe7\x99\xf4\xdb\x93\xa0\xc7\x90\x1e\xa9\x95v\xd3\x12\xfa,9\x1d\x0b\xd0\xa1\xd25\x0f\x1f[G\xdf\xd0\xbbd\x06$2\xd1\xae\x130qZiY\x07@\xe9\xf9\xff\xa4\x9a\xd4\x09\xf0\x0d\xc1R\x10M\xbdKOV\xfd\xf6\x13\xf6\x9a\x95N\xdf!\xf6x\x94\xd8j\xa5\xdcp\xa8\x04\x99\x02x\xdb$\xd8\xfa_o\x8dV\xc8\x0a\xfe\x00\xf3&c\x0c8\xd1\xd0\xe9\x8e\xab\xfe&\xfe\x00\x8d$\x98I\xe5\x8d\x94rM4%\xd8\xfe\xa9\x08\x06\xc6\x95H7\xf7HCq\xb9\x0d$\x95?\x83B\x82\xdd\xea\xc3f3\xcc\xbb\x09\x0d-\x09;\xa6i%\xcd\xba\x11\xd4\xe0\x12w\xd0G&\xdaj\x82\x7f;\xf3\x1d\x10\xa4l\x06\x16l\x1bc\xa1\xd1\x15!\x00\x8a\xff\x8a\x06\xe7U^:<d\xba"\x02I\xf0\xce\xc7\xad\xb2]
[pvno=5, realm=VLADG.NET, service_name=krbtgt/VLADG.NET, cipher=18, ciphertext={\x9fY\xd0f\x8dS\xf4I\x88\x04\xfa\xc1\xd8m\xa2\xb7+\xbb\x19\xcag\x0c\x13\xd1g*\xfc\x18\xd1\xb1\x80!\xbd\x85\xec\xf9\x9b\xfa-\x18\xb6\xf5h\x91\xe7\x99\xf4\xdb\x93\xa0\xc7\x90\x1e\xa9\x95v\xd3\x12\xfa,9\x1d\x0b\xd0\xa1\xd25\x0f\x1f[G\xdf\xd0\xbbd\x06$2\xd1\xae\x130qZiY\x07@\xe9\xf9\xff\xa4\x9a\xd4\x09\xf0\x0d\xc1R\x10M\xbdKOV\xfd\xf6\x13\xf6\x9a\x95N\xdf!\xf6x\x94\xd8j\xa5\xdcp\xa8\x04\x99\x02x\xdb$\xd8\xfa_o\x8dV\xc8\x0a\xfe\x00\xf3&c\x0c8\xd1\xd0\xe9\x8e\xab\xfe&\xfe\x00\x8d$\x98I\xe5\x8d\x94rM4%\xd8\xfe\xa9\x08\x06\xc6\x95H7\xf7HCq\xb9\x0d$\x95?\x83B\x82\xdd\xea\xc3f3\xcc\xbb\x09\x0d-\x09;\xa6i%\xcd\xba\x11\xd4\xe0\x12w\xd0G&\xdaj\x82\x7f;\xf3\x1d\x10\xa4l\x06\x16l\x1bc\xa1\xd1\x15!\x00\x8a\xff\x8a\x06\xe7U^:<d\xba"\x02I\xf0\xce\xc7\xad\xb2]
[use_session_key=F, mutual_required=F]

View file

@ -3,8 +3,8 @@
#empty_field (empty)
#unset_field -
#path kerberos
#open 2017-02-18-18-38-58
#open 2017-02-22-05-02-14
#fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p request_type client service success error_msg from till cipher forwardable renewable client_cert_subject client_cert_fuid server_cert_subject server_cert_fuid auth_ticket new_ticket
#types time string addr port addr port string string string bool string time time string bool bool string string string string string string
1429583645.478441 CHhAvVGS1DHFjwGM9 192.168.1.31 64889 192.168.1.32 88 TGS vladg/VLADG.NET krbtgt/VLADG.NET T - - 0.000000 aes256-cts-hmac-sha1-96 T F - - - - ea0f395c3823e8cc7216d82e2405b428 fa8064009b03606f84e475feca5dcd12
#close 2017-02-18-18-38-58
1429583645.478441 CHhAvVGS1DHFjwGM9 192.168.1.31 64889 192.168.1.32 88 TGS vladg/VLADG.NET krbtgt/VLADG.NET T - - 0.000000 aes256-cts-hmac-sha1-96 T F - - - - a09fbd89918320cc12a26d4f0c4e6aa2 396a9d9e8975cc5024a83c6e86101f06
#close 2017-02-22-05-02-14

View file

@ -12,6 +12,11 @@
@TEST-START-FILE common.bro
global quit_receiver: event();
global quit_sender: event();
module Test;
export {
@ -22,14 +27,12 @@ export {
nolog: string &default="no";
num: count &log;
};
global log_test: event(rec: Test::Info);
}
event bro_init() &priority=5
{
Broker::enable();
Log::create_stream(Test::LOG, [$columns=Test::Info, $ev=log_test]);
Log::create_stream(Test::LOG, [$columns=Test::Info]);
}
@TEST-END-FILE
@ -42,19 +45,18 @@ redef exit_only_after_terminate = T;
event bro_init()
{
Broker::subscribe_to_logs("bro/log/");
Broker::subscribe_to_events("bro/event/");
Broker::listen(broker_port, "127.0.0.1");
}
event Test::log_test(rec: Test::Info)
event quit_receiver()
{
print "wrote log", rec;
if ( rec$num == 5 )
terminate();
}
@TEST-END-FILE
@TEST-START-FILE send.bro
const broker_port: port &redef;
@ -63,6 +65,7 @@ redef exit_only_after_terminate = T;
event bro_init()
{
Broker::enable_remote_logs(Test::LOG);
Broker::publish_topic("bro/event/");
Broker::connect("127.0.0.1", broker_port, 1secs);
}
@ -71,7 +74,11 @@ global n = 0;
event do_write()
{
if ( n == 6 )
return;
{
local args = Broker::event_args(quit_receiver);
Broker::send_event("bro/event/", args);
schedule 1sec { quit_sender() };
}
else
{
Log::write(Test::LOG, [$msg = "ping", $num = n]);
@ -80,6 +87,11 @@ event do_write()
}
}
event quit_sender()
{
terminate();
}
event Broker::outgoing_connection_established(peer_address: string,
peer_port: port,
peer_name: string)

View file

@ -0,0 +1,134 @@
# @TEST-SERIALIZE: brokercomm
# @TEST-REQUIRES: grep -q ENABLE_BROKER:BOOL=true $BUILD/CMakeCache.txt
# @TEST-EXEC: btest-bg-run recv "bro -b ../common.bro ../recv.bro broker_port=$BROKER_PORT >recv.out"
# @TEST-EXEC: btest-bg-run send "bro -b ../common.bro ../send.bro broker_port=$BROKER_PORT >send.out"
# @TEST-EXEC: btest-bg-wait 20
# @TEST-EXEC: btest-diff recv/recv.out
# @TEST-EXEC: btest-diff recv/test.log
# @TEST-EXEC: btest-diff send/send.out
# @TEST-EXEC: btest-diff send/test.log
@TEST-START-FILE common.bro
global quit_receiver: event();
global quit_sender: event();
module Test;
export {
redef enum Log::ID += { LOG };
type Info: record {
b: bool;
i: int;
e: Log::ID;
c: count;
p: port;
sn: subnet;
a: addr;
d: double;
t: time;
iv: interval;
s: string;
sc: set[count];
ss: set[string];
se: set[string];
vc: vector of count;
ve: vector of string;
f: function(i: count) : string;
} &log;
}
event bro_init() &priority=5
{
Broker::enable();
Log::create_stream(Test::LOG, [$columns=Test::Info]);
}
@TEST-END-FILE
@TEST-START-FILE recv.bro
const broker_port: port &redef;
redef exit_only_after_terminate = T;
event bro_init()
{
Broker::subscribe_to_logs("bro/log/");
Broker::subscribe_to_events("bro/event/");
Broker::listen(broker_port, "127.0.0.1");
}
event quit_receiver()
{
terminate();
}
@TEST-END-FILE
@TEST-START-FILE send.bro
const broker_port: port &redef;
redef exit_only_after_terminate = T;
event bro_init()
{
Broker::enable_remote_logs(Test::LOG);
Broker::publish_topic("bro/event/");
Broker::connect("127.0.0.1", broker_port, 1secs);
}
event quit_sender()
{
terminate();
}
function foo(i : count) : string
{
if ( i > 0 )
return "Foo";
else
return "Bar";
}
event Broker::outgoing_connection_established(peer_address: string,
peer_port: port,
peer_name: string)
{
print "Broker::outgoing_connection_established", peer_address, peer_port;
local empty_set: set[string];
local empty_vector: vector of string;
Log::write(Test::LOG, [
$b=T,
$i=-42,
$e=Test::LOG,
$c=21,
$p=123/tcp,
$sn=10.0.0.1/24,
$a=1.2.3.4,
$d=3.14,
$t=network_time(),
$iv=100secs,
$s="hurz",
$sc=set(1), # set(1,2,3,4), # Output not stable for multi-element sets.
$ss=set("AA"), # set("AA", "BB", "CC") # Output not stable for multi-element sets.
$se=empty_set,
$vc=vector(10, 20, 30),
$ve=empty_vector,
$f=foo
]);
local args = Broker::event_args(quit_receiver);
Broker::send_event("bro/event/", args);
schedule 1sec { quit_sender() };
}
@TEST-END-FILE

View file

@ -4,6 +4,7 @@
# @TEST-REQUIRES: test -e $BUILD/aux/broccoli/bindings/broccoli-python/_broccoli_intern.so
#
# @TEST-EXEC: btest-bg-run bro bro %INPUT $DIST/aux/broccoli/bindings/broccoli-python/tests/test.bro
# @TEST-EXEC: sleep 2
# @TEST-EXEC: btest-bg-run python PYTHONPATH=$DIST/aux/broccoli/bindings/broccoli-python/:$BUILD/aux/broccoli/bindings/broccoli-python python $DIST/aux/broccoli/bindings/broccoli-python/tests/test.py
# @TEST-EXEC: btest-bg-wait -k 20
# @TEST-EXEC: btest-diff bro/.stdout