broker integration: add remote events

This commit is contained in:
Jon Siwek 2015-01-15 15:45:08 -06:00
parent 1e462481dc
commit 7e563b7275
7 changed files with 800 additions and 2 deletions

View file

@ -10,4 +10,13 @@ export {
peers: bool &default = T; peers: bool &default = T;
unsolicited: bool &default = F; unsolicited: bool &default = F;
}; };
type Data: record {
d: opaque of Comm::Data &optional;
};
type EventArgs: record {
name: string &optional; # nil for invalid event/args.
args: vector of Comm::Data;
};
} }

View file

@ -6,6 +6,7 @@ include_directories(BEFORE
) )
set(comm_SRCS set(comm_SRCS
Data.cc
Manager.cc Manager.cc
) )

533
src/comm/Data.cc Normal file
View file

@ -0,0 +1,533 @@
#include "Data.h"
#include "comm/comm.bif.h"
using namespace std;
OpaqueType* comm::opaque_of_data_type;
static broker::port::protocol to_broker_port_proto(TransportProto tp)
{
switch ( tp ) {
case TRANSPORT_TCP:
return broker::port::protocol::tcp;
case TRANSPORT_UDP:
return broker::port::protocol::udp;
case TRANSPORT_ICMP:
return broker::port::protocol::icmp;
case TRANSPORT_UNKNOWN:
default:
return broker::port::protocol::unknown;
}
}
static TransportProto to_bro_port_proto(broker::port::protocol tp)
{
switch ( tp ) {
case broker::port::protocol::tcp:
return TRANSPORT_TCP;
case broker::port::protocol::udp:
return TRANSPORT_UDP;
case broker::port::protocol::icmp:
return TRANSPORT_ICMP;
case broker::port::protocol::unknown:
default:
return TRANSPORT_UNKNOWN;
}
}
struct val_converter {
using result_type = Val*;
BroType* type;
result_type operator()(bool a)
{
if ( type->Tag() == TYPE_BOOL )
return new Val(a, TYPE_BOOL);
return nullptr;
}
result_type operator()(uint64_t a)
{
if ( type->Tag() == TYPE_COUNT )
return new Val(a, TYPE_COUNT);
if ( type->Tag() == TYPE_COUNTER )
return new Val(a, TYPE_COUNTER);
return nullptr;
}
result_type operator()(int64_t a)
{
if ( type->Tag() == TYPE_INT )
return new Val(a, TYPE_INT);
return nullptr;
}
result_type operator()(double a)
{
if ( type->Tag() == TYPE_DOUBLE )
return new Val(a, TYPE_DOUBLE);
return nullptr;
}
result_type operator()(const std::string& a)
{
switch ( type->Tag() ) {
case TYPE_STRING:
return new StringVal(a.size(), a.data());
case TYPE_FILE:
{
auto file = BroFile::GetFile(a.data());
if ( file )
{
Ref(file);
return new Val(file);
}
return nullptr;
}
case TYPE_FUNC:
{
auto id = lookup_ID(a.data(), GLOBAL_MODULE_NAME);
auto rval = id ? id->ID_Val() : nullptr;
Unref(id);
if ( rval && rval->Type()->Tag() == TYPE_FUNC )
return rval;
return nullptr;
}
default:
return nullptr;
}
}
result_type operator()(const broker::address& a)
{
if ( type->Tag() == TYPE_ADDR )
{
auto bits = reinterpret_cast<const in6_addr*>(&a.bytes());
return new AddrVal(IPAddr(*bits));
}
return nullptr;
}
result_type operator()(const broker::subnet& a)
{
if ( type->Tag() == TYPE_SUBNET )
{
auto bits = reinterpret_cast<const in6_addr*>(&a.network().bytes());
return new SubNetVal(IPPrefix(IPAddr(*bits), a.length()));
}
return nullptr;
}
result_type operator()(const broker::port& a)
{
if ( type->Tag() == TYPE_PORT )
return new PortVal(a.number(), to_bro_port_proto(a.type()));
return nullptr;
}
result_type operator()(const broker::time_point& a)
{
if ( type->Tag() == TYPE_TIME )
return new Val(a.value, TYPE_TIME);
return nullptr;
}
result_type operator()(const broker::time_duration& a)
{
if ( type->Tag() == TYPE_INTERVAL )
return new Val(a.value, TYPE_INTERVAL);
return nullptr;
}
result_type operator()(const broker::enum_value& a)
{
if ( type->Tag() == TYPE_ENUM )
{
auto etype = type->AsEnumType();
auto i = etype->Lookup(GLOBAL_MODULE_NAME, a.name.data());
if ( i == -1 )
return nullptr;
return new EnumVal(i, etype);
}
return nullptr;
}
result_type operator()(broker::set& a)
{
if ( ! type->IsSet() )
return nullptr;
auto tt = type->AsTableType();
auto rval = new TableVal(tt);
for ( auto& item : a )
{
auto indices = broker::get<broker::vector>(item);
if ( ! indices )
{
Unref(rval);
return nullptr;
}
auto expected_index_types = tt->Indices()->Types();
if ( expected_index_types->length() != indices->size() )
{
Unref(rval);
return nullptr;
}
auto list_val = new ListVal(TYPE_ANY);
for ( auto i = 0u; i < indices->size(); ++i )
{
auto index_val = comm::data_to_val(move((*indices)[i]),
(*expected_index_types)[i]);
if ( ! index_val )
{
Unref(rval);
Unref(list_val);
return nullptr;
}
list_val->Append(index_val);
}
rval->Assign(list_val, nullptr);
Unref(list_val);
}
return rval;
}
result_type operator()(broker::table& a)
{
if ( ! type->IsTable() )
return nullptr;
auto tt = type->AsTableType();
auto rval = new TableVal(tt);
for ( auto& item : a )
{
auto indices = broker::get<broker::vector>(item.first);
if ( ! indices )
{
Unref(rval);
return nullptr;
}
auto expected_index_types = tt->Indices()->Types();
if ( expected_index_types->length() != indices->size() )
{
Unref(rval);
return nullptr;
}
auto list_val = new ListVal(TYPE_ANY);
for ( auto i = 0u; i < indices->size(); ++i )
{
auto index_val = comm::data_to_val(move((*indices)[i]),
(*expected_index_types)[i]);
if ( ! index_val )
{
Unref(rval);
Unref(list_val);
return nullptr;
}
list_val->Append(index_val);
}
auto value_val = comm::data_to_val(move(item.second),
tt->YieldType());
if ( ! value_val )
{
Unref(rval);
Unref(list_val);
return nullptr;
}
rval->Assign(list_val, value_val);
Unref(list_val);
}
return rval;
}
result_type operator()(broker::vector& a)
{
if ( type->Tag() != TYPE_VECTOR )
return nullptr;
auto vt = type->AsVectorType();
auto rval = new VectorVal(vt);
for ( auto& item : a )
{
auto item_val = comm::data_to_val(move(item), vt->YieldType());
if ( ! item_val )
{
Unref(rval);
return nullptr;
}
rval->Assign(rval->Size(), item_val);
}
return rval;
}
result_type operator()(broker::record& a)
{
if ( type->Tag() != TYPE_RECORD )
return nullptr;
auto rt = type->AsRecordType();
if ( a.fields.size() != rt->NumFields() )
return nullptr;
auto rval = new RecordVal(rt);
for ( auto i = 0u; i < a.fields.size(); ++i )
{
if ( ! a.fields[i] )
{
rval->Assign(i, nullptr);
continue;
}
auto item_val = comm::data_to_val(move(*a.fields[i]),
rt->FieldType(i));
if ( ! item_val )
{
Unref(rval);
return nullptr;
}
rval->Assign(i, item_val);
}
return nullptr;
}
};
Val* comm::data_to_val(broker::data d, BroType* type)
{
return broker::visit(val_converter{type}, d);
}
broker::util::optional<broker::data> comm::val_to_data(const Val* v)
{
switch ( v->Type()->Tag() ) {
case TYPE_BOOL:
return {v->AsBool()};
case TYPE_INT:
return {v->AsInt()};
case TYPE_COUNT:
return {v->AsCount()};
case TYPE_COUNTER:
return {v->AsCounter()};
case TYPE_PORT:
{
auto p = v->AsPortVal();
return {broker::port(p->Port(), to_broker_port_proto(p->PortType()))};
}
case TYPE_ADDR:
{
auto a = v->AsAddr();
in6_addr tmp;
a.CopyIPv6(&tmp);
return {broker::address(reinterpret_cast<const uint32_t*>(&tmp),
broker::address::family::ipv6,
broker::address::byte_order::network)};
}
break;
case TYPE_SUBNET:
{
auto s = v->AsSubNet();
in6_addr tmp;
s.Prefix().CopyIPv6(&tmp);
auto a = broker::address(reinterpret_cast<const uint32_t*>(&tmp),
broker::address::family::ipv6,
broker::address::byte_order::network);
return {broker::subnet(a, s.Length())};
}
break;
case TYPE_DOUBLE:
return {v->AsDouble()};
case TYPE_TIME:
return {broker::time_point(v->AsTime())};
case TYPE_INTERVAL:
return {broker::time_duration(v->AsInterval())};
case TYPE_ENUM:
{
auto enum_type = v->Type()->AsEnumType();
auto enum_name = enum_type->Lookup(v->AsEnum());
return {broker::enum_value(enum_name ? "<unknown enum>" : enum_name)};
}
case TYPE_STRING:
{
auto s = v->AsString();
return {string(reinterpret_cast<const char*>(s->Bytes()), s->Len())};
}
case TYPE_FILE:
return {string(v->AsFile()->Name())};
case TYPE_FUNC:
return {string(v->AsFunc()->Name())};
case TYPE_TABLE:
{
auto is_set = v->Type()->IsSet();
auto table = v->AsTable();
auto table_val = v->AsTableVal();
auto c = table->InitForIteration();
broker::data rval;
if ( is_set )
rval = broker::set();
else
rval = broker::table();
struct iter_guard {
iter_guard(HashKey* arg_k, ListVal* arg_lv)
: k(arg_k), lv(arg_lv)
{}
~iter_guard()
{
delete k;
Unref(lv);
}
HashKey* k;
ListVal* lv;
};
for ( auto i = 0; i < table->Length(); ++i )
{
HashKey* k;
auto entry = table->NextEntry(k, c);
auto vl = table_val->RecoverIndex(k);
iter_guard ig(k, vl);
broker::vector key;
for ( auto k = 0; k < vl->Length(); ++k )
{
auto key_part = val_to_data((*vl->Vals())[k]);
if ( ! key_part )
return {};
key.emplace_back(move(*key_part));
}
if ( is_set )
broker::get<broker::set>(rval)->emplace(move(key));
else
{
auto val = val_to_data(entry->Value());
if ( ! val )
return {};
broker::get<broker::table>(rval)->emplace(move(key),
move(*val));
}
}
return {rval};
}
case TYPE_VECTOR:
{
auto vec = v->AsVectorVal();
broker::vector rval;
rval.reserve(vec->Size());
for ( auto i = 0u; i < vec->Size(); ++i )
{
auto item_val = vec->Lookup(i);
if ( ! item_val )
continue;
auto item = val_to_data(item_val);
if ( ! item )
return {};
rval.emplace_back(move(*item));
}
return {rval};
}
case TYPE_RECORD:
{
auto rec = v->AsRecordVal();
broker::record rval;
auto num_fields = v->Type()->AsRecordType()->NumFields();
rval.fields.reserve(num_fields);
for ( auto i = 0u; i < num_fields; ++i )
{
auto item_val = rec->LookupWithDefault(i);
if ( ! item_val )
{
rval.fields.emplace_back(broker::record::field{});
continue;
}
auto item = val_to_data(item_val);
Unref(item_val);
if ( ! item )
return {};
rval.fields.emplace_back(broker::record::field{move(*item)});
}
return {rval};
}
default:
reporter->Error("unsupported Comm::Data type: %s",
type_name(v->Type()->Tag()));
break;
}
return {};
}
RecordVal* comm::make_data_val(const Val* v)
{
auto rval = new RecordVal(BifType::Record::Comm::Data);
auto data = val_to_data(v);
if ( data )
rval->Assign(0, new DataVal(move(*data)));
return rval;
}

29
src/comm/Data.h Normal file
View file

@ -0,0 +1,29 @@
#ifndef BRO_COMM_DATA_H
#define BRO_COMM_DATA_H
#include <broker/data.hh>
#include "Val.h"
namespace comm {
extern OpaqueType* opaque_of_data_type;
RecordVal* make_data_val(const Val* v);
broker::util::optional<broker::data> val_to_data(const Val* v);
Val* data_to_val(broker::data d, BroType* type);
class DataVal : public OpaqueVal {
public:
DataVal(broker::data arg_data)
: OpaqueVal(comm::opaque_of_data_type), data(std::move(arg_data))
{}
broker::data data;
};
} // namespace comm
#endif // BRO_COMM_DATA_H

View file

@ -1,4 +1,5 @@
#include "Manager.h" #include "Manager.h"
#include "Data.h"
#include <broker/broker.hh> #include <broker/broker.hh>
#include <cstdio> #include <cstdio>
#include <unistd.h> #include <unistd.h>
@ -32,6 +33,9 @@ bool comm::Manager::InitPostScript()
send_flags_peers_idx = require_field(send_flags_type, "peers"); send_flags_peers_idx = require_field(send_flags_type, "peers");
send_flags_unsolicited_idx = require_field(send_flags_type, "unsolicited"); send_flags_unsolicited_idx = require_field(send_flags_type, "unsolicited");
comm::opaque_of_data_type = new OpaqueType("Comm::Data");
vector_of_data_type = new VectorType(internal_type("Comm::Data")->Ref());
auto res = broker::init(); auto res = broker::init();
if ( res ) if ( res )
@ -103,6 +107,96 @@ bool comm::Manager::Print(string topic, string msg, const Val* flags)
return true; return true;
} }
bool comm::Manager::Event(std::string topic, const RecordVal* args,
const Val* flags)
{
if ( ! args->Lookup(0) )
return false;
auto event_name = args->Lookup(0)->AsString()->CheckString();
auto vv = args->Lookup(1)->AsVectorVal();
broker::message msg;
msg.reserve(vv->Size() + 1);
msg.emplace_back(event_name);
for ( auto i = 0u; i < vv->Size(); ++i )
{
auto val = vv->Lookup(i)->AsRecordVal()->Lookup(0);
auto data_val = dynamic_cast<DataVal*>(val);
msg.emplace_back(data_val->data);
}
endpoint->send(move(topic), move(msg), get_flags(flags));
return true;
}
RecordVal* comm::Manager::MakeEventArgs(const val_list* args)
{
auto rval = new RecordVal(BifType::Record::Comm::EventArgs);
auto arg_vec = new VectorVal(vector_of_data_type);
rval->Assign(1, arg_vec);
const Func* func;
for ( auto i = 0u; i < args->length(); ++i )
{
auto arg_val = (*args)[i];
if ( i == 0 )
{
// Event val must come first.
if ( arg_val->Type()->Tag() != TYPE_FUNC )
{
reporter->Error("1st param of Comm::event_args must be event");
return rval;
}
func = arg_val->AsFunc();
if ( func->Flavor() != FUNC_FLAVOR_EVENT )
{
reporter->Error("1st param of Comm::event_args must be event");
return rval;
}
auto num_args = func->FType()->Args()->NumFields();
if ( num_args != args->length() - 1 )
{
reporter->Error("bad # of Comm::event_args: got %d, expect %d",
args->length(), num_args + 1);
return rval;
}
rval->Assign(0, new StringVal(func->Name()));
continue;
}
auto expected_type = (*func->FType()->ArgTypes()->Types())[i - 1];
if ( ! same_type((*args)[i]->Type(), expected_type) )
{
rval->Assign(0, 0);
reporter->Error("Comm::event_args param %d type mismatch", i);
return rval;
}
auto data_val = make_data_val((*args)[i]);
if ( ! data_val->Lookup(0) )
{
Unref(data_val);
rval->Assign(0, 0);
reporter->Error("Comm::event_args unsupported event/params");
return rval;
}
arg_vec->Assign(i - 1, data_val);
}
return rval;
}
bool comm::Manager::SubscribeToPrints(string topic_prefix) bool comm::Manager::SubscribeToPrints(string topic_prefix)
{ {
auto& q = print_subscriptions[topic_prefix]; auto& q = print_subscriptions[topic_prefix];
@ -119,6 +213,22 @@ bool comm::Manager::UnsubscribeToPrints(const string& topic_prefix)
return print_subscriptions.erase(topic_prefix); return print_subscriptions.erase(topic_prefix);
} }
bool comm::Manager::SubscribeToEvents(string topic_prefix)
{
auto& q = event_subscriptions[topic_prefix];
if ( q )
return false;
q = broker::message_queue(move(topic_prefix), *endpoint);
return true;
}
bool comm::Manager::UnsubscribeToEvents(const string& topic_prefix)
{
return event_subscriptions.erase(topic_prefix);
}
int comm::Manager::get_flags(const Val* flags) int comm::Manager::get_flags(const Val* flags)
{ {
auto r = flags->AsRecordVal(); auto r = flags->AsRecordVal();
@ -149,6 +259,9 @@ void comm::Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
for ( const auto& ps : print_subscriptions ) for ( const auto& ps : print_subscriptions )
read->Insert(ps.second.fd()); read->Insert(ps.second.fd());
for ( const auto& ps : event_subscriptions )
read->Insert(ps.second.fd());
} }
double comm::Manager::NextTimestamp(double* local_network_time) double comm::Manager::NextTimestamp(double* local_network_time)
@ -251,5 +364,69 @@ void comm::Manager::Process()
} }
} }
for ( const auto& es : event_subscriptions )
{
auto event_messages = es.second.want_pop();
if ( event_messages.empty() )
continue;
idle = false;
for ( auto& em : event_messages )
{
if ( em.empty() )
{
reporter->Warning("got empty event message");
continue;
}
std::string* event_name = broker::get<std::string>(em[0]);
if ( ! event_name )
{
reporter->Warning("got event message w/o event name: %d",
static_cast<int>(broker::which(em[0])));
continue;
}
EventHandlerPtr ehp = event_registry->Lookup(event_name->data());
if ( ! ehp )
continue;
auto arg_types = ehp->FType()->ArgTypes()->Types();
if ( arg_types->length() != em.size() - 1 )
{
reporter->Warning("got event message with invalid # of args,"
" got %zd, expected %d", em.size() - 1,
arg_types->length());
continue;
}
val_list* vl = new val_list;
for ( auto i = 1u; i < em.size(); ++i )
{
auto val = data_to_val(move(em[i]), (*arg_types)[i - 1]);
if ( val )
vl->append(val);
else
{
reporter->Warning("failed to convert remote event arg # %d",
i - 1);
break;
}
}
if ( vl->length() == em.size() - 1 )
mgr.QueueEvent(ehp, vl);
else
delete_vals(vl);
}
}
SetIdle(idle); SetIdle(idle);
} }

View file

@ -31,10 +31,18 @@ public:
bool Print(std::string topic, std::string msg, const Val* flags); bool Print(std::string topic, std::string msg, const Val* flags);
bool Event(std::string topic, const RecordVal* args, const Val* flags);
RecordVal* MakeEventArgs(const val_list* args);
bool SubscribeToPrints(std::string topic_prefix); bool SubscribeToPrints(std::string topic_prefix);
bool UnsubscribeToPrints(const std::string& topic_prefix); bool UnsubscribeToPrints(const std::string& topic_prefix);
bool SubscribeToEvents(std::string topic_prefix);
bool UnsubscribeToEvents(const std::string& topic_prefix);
private: private:
int get_flags(const Val* flags); int get_flags(const Val* flags);
@ -53,10 +61,13 @@ private:
std::unique_ptr<broker::endpoint> endpoint; std::unique_ptr<broker::endpoint> endpoint;
std::map<std::pair<std::string, uint16_t>, broker::peering> peers; std::map<std::pair<std::string, uint16_t>, broker::peering> peers;
std::map<std::string, broker::message_queue> print_subscriptions; std::map<std::string, broker::message_queue> print_subscriptions;
std::map<std::string, broker::message_queue> event_subscriptions;
int send_flags_self_idx; int send_flags_self_idx;
int send_flags_peers_idx; int send_flags_peers_idx;
int send_flags_unsolicited_idx; int send_flags_unsolicited_idx;
VectorType* vector_of_data_type;
}; };
} // namespace comm } // namespace comm

View file

@ -1,17 +1,29 @@
%%{ %%{
#include "comm/Manager.h" #include "comm/Manager.h"
#include "comm/Data.h"
%%} %%}
module Comm; module Comm;
type Comm::SendFlags: record; type Comm::SendFlags: record;
type Comm::Data: record;
type Comm::EventArgs: record;
function Comm::data%(d: any%): Comm::Data
%{
return comm::make_data_val(d);
%}
event Comm::remote_connection_established%(peer_address: string, event Comm::remote_connection_established%(peer_address: string,
peer_port: port, peer_port: port,
peer_name: string%); peer_name: string%);
event Comm::remote_connection_broken%(peer_address: string, event Comm::remote_connection_broken%(peer_address: string,
peer_port: port%); peer_port: port%);
event Comm::remote_connection_incompatible%(peer_address: string, event Comm::remote_connection_incompatible%(peer_address: string,
peer_port: port%); peer_port: port%);
@ -62,14 +74,40 @@ function Comm::print%(topic: string, msg: string,
return new Val(rval, TYPE_BOOL); return new Val(rval, TYPE_BOOL);
%} %}
function Comm::subscribe_to_prints%(topic_prefix: string &default = ""%): bool function Comm::subscribe_to_prints%(topic_prefix: string%): bool
%{ %{
auto rval = comm_mgr->SubscribeToPrints(topic_prefix->CheckString()); auto rval = comm_mgr->SubscribeToPrints(topic_prefix->CheckString());
return new Val(rval, TYPE_BOOL); return new Val(rval, TYPE_BOOL);
%} %}
function Comm::unsubscribe_to_prints%(topic_prefix: string &default = ""%): bool function Comm::unsubscribe_to_prints%(topic_prefix: string%): bool
%{ %{
auto rval = comm_mgr->UnsubscribeToPrints(topic_prefix->CheckString()); auto rval = comm_mgr->UnsubscribeToPrints(topic_prefix->CheckString());
return new Val(rval, TYPE_BOOL); return new Val(rval, TYPE_BOOL);
%} %}
function Comm::event_args%(...%): Comm::EventArgs
%{
auto rval = comm_mgr->MakeEventArgs(@ARGS@);
return rval;
%}
function Comm::event%(topic: string, args: Comm::EventArgs,
flags: SendFlags &default = SendFlags()%): bool
%{
auto rval = comm_mgr->Event(topic->CheckString(), args->AsRecordVal(),
flags);
return new Val(rval, TYPE_BOOL);
%}
function Comm::subscribe_to_events%(topic_prefix: string%): bool
%{
auto rval = comm_mgr->SubscribeToEvents(topic_prefix->CheckString());
return new Val(rval, TYPE_BOOL);
%}
function Comm::unsubscribe_to_events%(topic_prefix: string%): bool
%{
auto rval = comm_mgr->UnsubscribeToEvents(topic_prefix->CheckString());
return new Val(rval, TYPE_BOOL);
%}