diff --git a/scripts/base/frameworks/comm/main.bro b/scripts/base/frameworks/comm/main.bro index c69d36db52..efe3069a1c 100644 --- a/scripts/base/frameworks/comm/main.bro +++ b/scripts/base/frameworks/comm/main.bro @@ -10,4 +10,13 @@ export { peers: bool &default = T; unsolicited: bool &default = F; }; + + type Data: record { + d: opaque of Comm::Data &optional; + }; + + type EventArgs: record { + name: string &optional; # nil for invalid event/args. + args: vector of Comm::Data; + }; } diff --git a/src/comm/CMakeLists.txt b/src/comm/CMakeLists.txt index c152adc49a..95ad701d71 100644 --- a/src/comm/CMakeLists.txt +++ b/src/comm/CMakeLists.txt @@ -6,6 +6,7 @@ include_directories(BEFORE ) set(comm_SRCS + Data.cc Manager.cc ) diff --git a/src/comm/Data.cc b/src/comm/Data.cc new file mode 100644 index 0000000000..58d5b30085 --- /dev/null +++ b/src/comm/Data.cc @@ -0,0 +1,533 @@ +#include "Data.h" +#include "comm/comm.bif.h" + +using namespace std; + +OpaqueType* comm::opaque_of_data_type; + +static broker::port::protocol to_broker_port_proto(TransportProto tp) + { + switch ( tp ) { + case TRANSPORT_TCP: + return broker::port::protocol::tcp; + case TRANSPORT_UDP: + return broker::port::protocol::udp; + case TRANSPORT_ICMP: + return broker::port::protocol::icmp; + case TRANSPORT_UNKNOWN: + default: + return broker::port::protocol::unknown; + } + } + +static TransportProto to_bro_port_proto(broker::port::protocol tp) + { + switch ( tp ) { + case broker::port::protocol::tcp: + return TRANSPORT_TCP; + case broker::port::protocol::udp: + return TRANSPORT_UDP; + case broker::port::protocol::icmp: + return TRANSPORT_ICMP; + case broker::port::protocol::unknown: + default: + return TRANSPORT_UNKNOWN; + } + } + +struct val_converter { + using result_type = Val*; + + BroType* type; + + result_type operator()(bool a) + { + if ( type->Tag() == TYPE_BOOL ) + return new Val(a, TYPE_BOOL); + return nullptr; + } + + result_type operator()(uint64_t a) + { + if ( type->Tag() == TYPE_COUNT ) + return new Val(a, TYPE_COUNT); + if ( type->Tag() == TYPE_COUNTER ) + return new Val(a, TYPE_COUNTER); + return nullptr; + } + + result_type operator()(int64_t a) + { + if ( type->Tag() == TYPE_INT ) + return new Val(a, TYPE_INT); + return nullptr; + } + + result_type operator()(double a) + { + if ( type->Tag() == TYPE_DOUBLE ) + return new Val(a, TYPE_DOUBLE); + return nullptr; + } + + result_type operator()(const std::string& a) + { + switch ( type->Tag() ) { + case TYPE_STRING: + return new StringVal(a.size(), a.data()); + case TYPE_FILE: + { + auto file = BroFile::GetFile(a.data()); + + if ( file ) + { + Ref(file); + return new Val(file); + } + + return nullptr; + } + case TYPE_FUNC: + { + auto id = lookup_ID(a.data(), GLOBAL_MODULE_NAME); + auto rval = id ? id->ID_Val() : nullptr; + Unref(id); + + if ( rval && rval->Type()->Tag() == TYPE_FUNC ) + return rval; + + return nullptr; + } + default: + return nullptr; + } + } + + result_type operator()(const broker::address& a) + { + if ( type->Tag() == TYPE_ADDR ) + { + auto bits = reinterpret_cast(&a.bytes()); + return new AddrVal(IPAddr(*bits)); + } + + return nullptr; + } + + result_type operator()(const broker::subnet& a) + { + if ( type->Tag() == TYPE_SUBNET ) + { + auto bits = reinterpret_cast(&a.network().bytes()); + return new SubNetVal(IPPrefix(IPAddr(*bits), a.length())); + } + + return nullptr; + } + + result_type operator()(const broker::port& a) + { + if ( type->Tag() == TYPE_PORT ) + return new PortVal(a.number(), to_bro_port_proto(a.type())); + + return nullptr; + } + + result_type operator()(const broker::time_point& a) + { + if ( type->Tag() == TYPE_TIME ) + return new Val(a.value, TYPE_TIME); + + return nullptr; + } + + result_type operator()(const broker::time_duration& a) + { + if ( type->Tag() == TYPE_INTERVAL ) + return new Val(a.value, TYPE_INTERVAL); + + return nullptr; + } + + result_type operator()(const broker::enum_value& a) + { + if ( type->Tag() == TYPE_ENUM ) + { + auto etype = type->AsEnumType(); + auto i = etype->Lookup(GLOBAL_MODULE_NAME, a.name.data()); + + if ( i == -1 ) + return nullptr; + + return new EnumVal(i, etype); + } + + return nullptr; + } + + result_type operator()(broker::set& a) + { + if ( ! type->IsSet() ) + return nullptr; + + auto tt = type->AsTableType(); + auto rval = new TableVal(tt); + + for ( auto& item : a ) + { + auto indices = broker::get(item); + + if ( ! indices ) + { + Unref(rval); + return nullptr; + } + + auto expected_index_types = tt->Indices()->Types(); + + if ( expected_index_types->length() != indices->size() ) + { + Unref(rval); + return nullptr; + } + + auto list_val = new ListVal(TYPE_ANY); + + for ( auto i = 0u; i < indices->size(); ++i ) + { + auto index_val = comm::data_to_val(move((*indices)[i]), + (*expected_index_types)[i]); + + if ( ! index_val ) + { + Unref(rval); + Unref(list_val); + return nullptr; + } + + list_val->Append(index_val); + } + + + rval->Assign(list_val, nullptr); + Unref(list_val); + } + + return rval; + } + + result_type operator()(broker::table& a) + { + if ( ! type->IsTable() ) + return nullptr; + + auto tt = type->AsTableType(); + auto rval = new TableVal(tt); + + for ( auto& item : a ) + { + auto indices = broker::get(item.first); + + if ( ! indices ) + { + Unref(rval); + return nullptr; + } + + auto expected_index_types = tt->Indices()->Types(); + + if ( expected_index_types->length() != indices->size() ) + { + Unref(rval); + return nullptr; + } + + auto list_val = new ListVal(TYPE_ANY); + + for ( auto i = 0u; i < indices->size(); ++i ) + { + auto index_val = comm::data_to_val(move((*indices)[i]), + (*expected_index_types)[i]); + + if ( ! index_val ) + { + Unref(rval); + Unref(list_val); + return nullptr; + } + + list_val->Append(index_val); + } + + auto value_val = comm::data_to_val(move(item.second), + tt->YieldType()); + + if ( ! value_val ) + { + Unref(rval); + Unref(list_val); + return nullptr; + } + + rval->Assign(list_val, value_val); + Unref(list_val); + } + + return rval; + } + + result_type operator()(broker::vector& a) + { + if ( type->Tag() != TYPE_VECTOR ) + return nullptr; + + auto vt = type->AsVectorType(); + auto rval = new VectorVal(vt); + + for ( auto& item : a ) + { + auto item_val = comm::data_to_val(move(item), vt->YieldType()); + + if ( ! item_val ) + { + Unref(rval); + return nullptr; + } + + rval->Assign(rval->Size(), item_val); + } + + return rval; + } + + result_type operator()(broker::record& a) + { + if ( type->Tag() != TYPE_RECORD ) + return nullptr; + + auto rt = type->AsRecordType(); + + if ( a.fields.size() != rt->NumFields() ) + return nullptr; + + auto rval = new RecordVal(rt); + + for ( auto i = 0u; i < a.fields.size(); ++i ) + { + if ( ! a.fields[i] ) + { + rval->Assign(i, nullptr); + continue; + } + + auto item_val = comm::data_to_val(move(*a.fields[i]), + rt->FieldType(i)); + + if ( ! item_val ) + { + Unref(rval); + return nullptr; + } + + rval->Assign(i, item_val); + } + + return nullptr; + } +}; + +Val* comm::data_to_val(broker::data d, BroType* type) + { + return broker::visit(val_converter{type}, d); + } + +broker::util::optional comm::val_to_data(const Val* v) + { + switch ( v->Type()->Tag() ) { + case TYPE_BOOL: + return {v->AsBool()}; + case TYPE_INT: + return {v->AsInt()}; + case TYPE_COUNT: + return {v->AsCount()}; + case TYPE_COUNTER: + return {v->AsCounter()}; + case TYPE_PORT: + { + auto p = v->AsPortVal(); + return {broker::port(p->Port(), to_broker_port_proto(p->PortType()))}; + } + case TYPE_ADDR: + { + auto a = v->AsAddr(); + in6_addr tmp; + a.CopyIPv6(&tmp); + return {broker::address(reinterpret_cast(&tmp), + broker::address::family::ipv6, + broker::address::byte_order::network)}; + } + break; + case TYPE_SUBNET: + { + auto s = v->AsSubNet(); + in6_addr tmp; + s.Prefix().CopyIPv6(&tmp); + auto a = broker::address(reinterpret_cast(&tmp), + broker::address::family::ipv6, + broker::address::byte_order::network); + return {broker::subnet(a, s.Length())}; + } + break; + case TYPE_DOUBLE: + return {v->AsDouble()}; + case TYPE_TIME: + return {broker::time_point(v->AsTime())}; + case TYPE_INTERVAL: + return {broker::time_duration(v->AsInterval())}; + case TYPE_ENUM: + { + auto enum_type = v->Type()->AsEnumType(); + auto enum_name = enum_type->Lookup(v->AsEnum()); + return {broker::enum_value(enum_name ? "" : enum_name)}; + } + case TYPE_STRING: + { + auto s = v->AsString(); + return {string(reinterpret_cast(s->Bytes()), s->Len())}; + } + case TYPE_FILE: + return {string(v->AsFile()->Name())}; + case TYPE_FUNC: + return {string(v->AsFunc()->Name())}; + case TYPE_TABLE: + { + auto is_set = v->Type()->IsSet(); + auto table = v->AsTable(); + auto table_val = v->AsTableVal(); + auto c = table->InitForIteration(); + broker::data rval; + + if ( is_set ) + rval = broker::set(); + else + rval = broker::table(); + + struct iter_guard { + iter_guard(HashKey* arg_k, ListVal* arg_lv) + : k(arg_k), lv(arg_lv) + {} + + ~iter_guard() + { + delete k; + Unref(lv); + } + + HashKey* k; + ListVal* lv; + }; + + for ( auto i = 0; i < table->Length(); ++i ) + { + HashKey* k; + auto entry = table->NextEntry(k, c); + auto vl = table_val->RecoverIndex(k); + iter_guard ig(k, vl); + broker::vector key; + + for ( auto k = 0; k < vl->Length(); ++k ) + { + auto key_part = val_to_data((*vl->Vals())[k]); + + if ( ! key_part ) + return {}; + + key.emplace_back(move(*key_part)); + } + + if ( is_set ) + broker::get(rval)->emplace(move(key)); + else + { + auto val = val_to_data(entry->Value()); + + if ( ! val ) + return {}; + + broker::get(rval)->emplace(move(key), + move(*val)); + } + } + + return {rval}; + } + case TYPE_VECTOR: + { + auto vec = v->AsVectorVal(); + broker::vector rval; + rval.reserve(vec->Size()); + + for ( auto i = 0u; i < vec->Size(); ++i ) + { + auto item_val = vec->Lookup(i); + + if ( ! item_val ) + continue; + + auto item = val_to_data(item_val); + + if ( ! item ) + return {}; + + rval.emplace_back(move(*item)); + } + + return {rval}; + } + case TYPE_RECORD: + { + auto rec = v->AsRecordVal(); + broker::record rval; + auto num_fields = v->Type()->AsRecordType()->NumFields(); + rval.fields.reserve(num_fields); + + for ( auto i = 0u; i < num_fields; ++i ) + { + auto item_val = rec->LookupWithDefault(i); + + if ( ! item_val ) + { + rval.fields.emplace_back(broker::record::field{}); + continue; + } + + auto item = val_to_data(item_val); + Unref(item_val); + + if ( ! item ) + return {}; + + rval.fields.emplace_back(broker::record::field{move(*item)}); + } + + return {rval}; + } + default: + reporter->Error("unsupported Comm::Data type: %s", + type_name(v->Type()->Tag())); + break; + } + + return {}; + } + +RecordVal* comm::make_data_val(const Val* v) + { + auto rval = new RecordVal(BifType::Record::Comm::Data); + auto data = val_to_data(v); + + if ( data ) + rval->Assign(0, new DataVal(move(*data))); + + return rval; + } diff --git a/src/comm/Data.h b/src/comm/Data.h new file mode 100644 index 0000000000..e3197b61da --- /dev/null +++ b/src/comm/Data.h @@ -0,0 +1,29 @@ +#ifndef BRO_COMM_DATA_H +#define BRO_COMM_DATA_H + +#include +#include "Val.h" + +namespace comm { + +extern OpaqueType* opaque_of_data_type; + +RecordVal* make_data_val(const Val* v); + +broker::util::optional val_to_data(const Val* v); + +Val* data_to_val(broker::data d, BroType* type); + +class DataVal : public OpaqueVal { +public: + + DataVal(broker::data arg_data) + : OpaqueVal(comm::opaque_of_data_type), data(std::move(arg_data)) + {} + + broker::data data; +}; + +} // namespace comm + +#endif // BRO_COMM_DATA_H diff --git a/src/comm/Manager.cc b/src/comm/Manager.cc index 7027daa79e..b4f118706a 100644 --- a/src/comm/Manager.cc +++ b/src/comm/Manager.cc @@ -1,4 +1,5 @@ #include "Manager.h" +#include "Data.h" #include #include #include @@ -32,6 +33,9 @@ bool comm::Manager::InitPostScript() send_flags_peers_idx = require_field(send_flags_type, "peers"); send_flags_unsolicited_idx = require_field(send_flags_type, "unsolicited"); + comm::opaque_of_data_type = new OpaqueType("Comm::Data"); + vector_of_data_type = new VectorType(internal_type("Comm::Data")->Ref()); + auto res = broker::init(); if ( res ) @@ -103,6 +107,96 @@ bool comm::Manager::Print(string topic, string msg, const Val* flags) return true; } +bool comm::Manager::Event(std::string topic, const RecordVal* args, + const Val* flags) + { + if ( ! args->Lookup(0) ) + return false; + + auto event_name = args->Lookup(0)->AsString()->CheckString(); + auto vv = args->Lookup(1)->AsVectorVal(); + broker::message msg; + msg.reserve(vv->Size() + 1); + msg.emplace_back(event_name); + + for ( auto i = 0u; i < vv->Size(); ++i ) + { + auto val = vv->Lookup(i)->AsRecordVal()->Lookup(0); + auto data_val = dynamic_cast(val); + msg.emplace_back(data_val->data); + } + + endpoint->send(move(topic), move(msg), get_flags(flags)); + return true; + } + +RecordVal* comm::Manager::MakeEventArgs(const val_list* args) + { + auto rval = new RecordVal(BifType::Record::Comm::EventArgs); + auto arg_vec = new VectorVal(vector_of_data_type); + rval->Assign(1, arg_vec); + const Func* func; + + for ( auto i = 0u; i < args->length(); ++i ) + { + auto arg_val = (*args)[i]; + + if ( i == 0 ) + { + // Event val must come first. + + if ( arg_val->Type()->Tag() != TYPE_FUNC ) + { + reporter->Error("1st param of Comm::event_args must be event"); + return rval; + } + + func = arg_val->AsFunc(); + + if ( func->Flavor() != FUNC_FLAVOR_EVENT ) + { + reporter->Error("1st param of Comm::event_args must be event"); + return rval; + } + + auto num_args = func->FType()->Args()->NumFields(); + + if ( num_args != args->length() - 1 ) + { + reporter->Error("bad # of Comm::event_args: got %d, expect %d", + args->length(), num_args + 1); + return rval; + } + + rval->Assign(0, new StringVal(func->Name())); + continue; + } + + auto expected_type = (*func->FType()->ArgTypes()->Types())[i - 1]; + + if ( ! same_type((*args)[i]->Type(), expected_type) ) + { + rval->Assign(0, 0); + reporter->Error("Comm::event_args param %d type mismatch", i); + return rval; + } + + auto data_val = make_data_val((*args)[i]); + + if ( ! data_val->Lookup(0) ) + { + Unref(data_val); + rval->Assign(0, 0); + reporter->Error("Comm::event_args unsupported event/params"); + return rval; + } + + arg_vec->Assign(i - 1, data_val); + } + + return rval; + } + bool comm::Manager::SubscribeToPrints(string topic_prefix) { auto& q = print_subscriptions[topic_prefix]; @@ -119,6 +213,22 @@ bool comm::Manager::UnsubscribeToPrints(const string& topic_prefix) return print_subscriptions.erase(topic_prefix); } +bool comm::Manager::SubscribeToEvents(string topic_prefix) + { + auto& q = event_subscriptions[topic_prefix]; + + if ( q ) + return false; + + q = broker::message_queue(move(topic_prefix), *endpoint); + return true; + } + +bool comm::Manager::UnsubscribeToEvents(const string& topic_prefix) + { + return event_subscriptions.erase(topic_prefix); + } + int comm::Manager::get_flags(const Val* flags) { auto r = flags->AsRecordVal(); @@ -149,6 +259,9 @@ void comm::Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write, for ( const auto& ps : print_subscriptions ) read->Insert(ps.second.fd()); + + for ( const auto& ps : event_subscriptions ) + read->Insert(ps.second.fd()); } double comm::Manager::NextTimestamp(double* local_network_time) @@ -251,5 +364,69 @@ void comm::Manager::Process() } } + for ( const auto& es : event_subscriptions ) + { + auto event_messages = es.second.want_pop(); + + if ( event_messages.empty() ) + continue; + + idle = false; + + for ( auto& em : event_messages ) + { + if ( em.empty() ) + { + reporter->Warning("got empty event message"); + continue; + } + + std::string* event_name = broker::get(em[0]); + + if ( ! event_name ) + { + reporter->Warning("got event message w/o event name: %d", + static_cast(broker::which(em[0]))); + continue; + } + + EventHandlerPtr ehp = event_registry->Lookup(event_name->data()); + + if ( ! ehp ) + continue; + + auto arg_types = ehp->FType()->ArgTypes()->Types(); + + if ( arg_types->length() != em.size() - 1 ) + { + reporter->Warning("got event message with invalid # of args," + " got %zd, expected %d", em.size() - 1, + arg_types->length()); + continue; + } + + val_list* vl = new val_list; + + for ( auto i = 1u; i < em.size(); ++i ) + { + auto val = data_to_val(move(em[i]), (*arg_types)[i - 1]); + + if ( val ) + vl->append(val); + else + { + reporter->Warning("failed to convert remote event arg # %d", + i - 1); + break; + } + } + + if ( vl->length() == em.size() - 1 ) + mgr.QueueEvent(ehp, vl); + else + delete_vals(vl); + } + } + SetIdle(idle); } diff --git a/src/comm/Manager.h b/src/comm/Manager.h index 0f7d5a4a1c..020f78a03b 100644 --- a/src/comm/Manager.h +++ b/src/comm/Manager.h @@ -31,10 +31,18 @@ public: bool Print(std::string topic, std::string msg, const Val* flags); + bool Event(std::string topic, const RecordVal* args, const Val* flags); + + RecordVal* MakeEventArgs(const val_list* args); + bool SubscribeToPrints(std::string topic_prefix); bool UnsubscribeToPrints(const std::string& topic_prefix); + bool SubscribeToEvents(std::string topic_prefix); + + bool UnsubscribeToEvents(const std::string& topic_prefix); + private: int get_flags(const Val* flags); @@ -53,10 +61,13 @@ private: std::unique_ptr endpoint; std::map, broker::peering> peers; std::map print_subscriptions; + std::map event_subscriptions; int send_flags_self_idx; int send_flags_peers_idx; int send_flags_unsolicited_idx; + + VectorType* vector_of_data_type; }; } // namespace comm diff --git a/src/comm/comm.bif b/src/comm/comm.bif index 6294864bba..fe405222cc 100644 --- a/src/comm/comm.bif +++ b/src/comm/comm.bif @@ -1,17 +1,29 @@ %%{ #include "comm/Manager.h" +#include "comm/Data.h" %%} module Comm; type Comm::SendFlags: record; +type Comm::Data: record; + +type Comm::EventArgs: record; + +function Comm::data%(d: any%): Comm::Data + %{ + return comm::make_data_val(d); + %} + event Comm::remote_connection_established%(peer_address: string, peer_port: port, peer_name: string%); + event Comm::remote_connection_broken%(peer_address: string, peer_port: port%); + event Comm::remote_connection_incompatible%(peer_address: string, peer_port: port%); @@ -62,14 +74,40 @@ function Comm::print%(topic: string, msg: string, return new Val(rval, TYPE_BOOL); %} -function Comm::subscribe_to_prints%(topic_prefix: string &default = ""%): bool +function Comm::subscribe_to_prints%(topic_prefix: string%): bool %{ auto rval = comm_mgr->SubscribeToPrints(topic_prefix->CheckString()); return new Val(rval, TYPE_BOOL); %} -function Comm::unsubscribe_to_prints%(topic_prefix: string &default = ""%): bool +function Comm::unsubscribe_to_prints%(topic_prefix: string%): bool %{ auto rval = comm_mgr->UnsubscribeToPrints(topic_prefix->CheckString()); return new Val(rval, TYPE_BOOL); %} + +function Comm::event_args%(...%): Comm::EventArgs + %{ + auto rval = comm_mgr->MakeEventArgs(@ARGS@); + return rval; + %} + +function Comm::event%(topic: string, args: Comm::EventArgs, + flags: SendFlags &default = SendFlags()%): bool + %{ + auto rval = comm_mgr->Event(topic->CheckString(), args->AsRecordVal(), + flags); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::subscribe_to_events%(topic_prefix: string%): bool + %{ + auto rval = comm_mgr->SubscribeToEvents(topic_prefix->CheckString()); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::unsubscribe_to_events%(topic_prefix: string%): bool + %{ + auto rval = comm_mgr->UnsubscribeToEvents(topic_prefix->CheckString()); + return new Val(rval, TYPE_BOOL); + %}