broker integration: add API for connecting to peers

This commit is contained in:
Jon Siwek 2015-01-13 17:14:21 -06:00
parent 7120098ca2
commit 1e8d6cd917
8 changed files with 254 additions and 1 deletions

View file

@ -181,6 +181,8 @@ if ( ENABLE_BROKER )
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
add_subdirectory(aux/broker)
set(brodeps ${brodeps} broker)
add_definitions(-DENABLE_BROKER)
include_directories(BEFORE ${CMAKE_CURRENT_SOURCE_DIR}/aux/broker)
endif ()
add_subdirectory(src)

@ -1 +1 @@
Subproject commit a1b51def07cfb191d0a83a78c7102560740dbcb3
Subproject commit 331966d1f3d24c63bedbda79e477f759c4d267f9

View file

@ -161,6 +161,10 @@ add_subdirectory(iosource)
add_subdirectory(logging)
add_subdirectory(probabilistic)
if ( ENABLE_BROKER )
add_subdirectory(comm)
endif ()
set(bro_SUBDIRS
# Order is important here.
${bro_PLUGIN_LIBS}

15
src/comm/CMakeLists.txt Normal file
View file

@ -0,0 +1,15 @@
include(BroSubdir)
include_directories(BEFORE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR}
)
set(comm_SRCS
Manager.cc
)
bif_target(comm.bif)
bro_add_subdir_library(comm ${comm_SRCS} ${BIF_OUTPUT_CC})
add_dependencies(bro_comm generate_outputs)

115
src/comm/Manager.cc Normal file
View file

@ -0,0 +1,115 @@
#include "Manager.h"
#include <broker/broker.hh>
#include <cstdio>
#include <unistd.h>
#include "util.h"
#include "Reporter.h"
bool comm::Manager::InitPreScript()
{
auto res = broker::init();
if ( res )
{
fprintf(stderr, "broker::init failed: %s\n", broker::strerror(res));
return false;
}
char host[256];
const char* name;
if ( gethostname(host, sizeof(host)) == 0 )
name = fmt("bro@%s.%ld", host, static_cast<long>(getpid()));
else
name = fmt("bro@<unknown>.%ld", static_cast<long>(getpid()));
endpoint = std::unique_ptr<broker::endpoint>(new broker::endpoint(name));
return true;
}
bool comm::Manager::InitPostScript()
{
return true;
}
bool comm::Manager::Listen(uint16_t port, const char* addr)
{
auto rval = endpoint->listen(port, addr);
if ( ! rval )
{
reporter->Error("Failed to listen on %s:%" PRIu16 " : %s",
addr ? addr : "INADDR_ANY", port,
endpoint->last_error().data());
}
return rval;
}
bool comm::Manager::Connect(string addr, uint16_t port,
std::chrono::duration<double> retry_interval)
{
auto& peer = peers[std::make_pair(addr, port)];
if ( peer )
return false;
peer = endpoint->peer(std::move(addr), port, retry_interval);
return true;
}
bool comm::Manager::Disconnect(const string& addr, uint16_t port)
{
auto it = peers.find(std::make_pair(addr, port));
if ( it == peers.end() )
return false;
return endpoint->unpeer(it->second);
}
void comm::Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
iosource::FD_Set* except)
{
read->Insert(endpoint->peer_status().fd());
}
double comm::Manager::NextTimestamp(double* local_network_time)
{
// TODO: do something better?
return timer_mgr->Time();
}
void comm::Manager::Process()
{
bool idle = true;
auto peer_status_updates = endpoint->peer_status().want_pop();
if ( ! peer_status_updates.empty() )
idle = false;
for ( auto& u : peer_status_updates )
{
if ( ! u.relation.remote() )
continue;
// TODO: generate events
switch ( u.status ) {
case broker::peer_status::tag::established:
printf("established\n");
break;
case broker::peer_status::tag::disconnected:
printf("disconnected\n");
break;
case broker::peer_status::tag::incompatible:
printf("incompatible\n");
break;
default:
reporter->InternalWarning("unknown broker::peer_status::tag : %d",
static_cast<int>(u.status));
break;
}
}
SetIdle(idle);
}

52
src/comm/Manager.h Normal file
View file

@ -0,0 +1,52 @@
#ifndef BRO_COMM_MANAGER_H
#define BRO_COMM_MANAGER_H
#include <broker/endpoint.hh>
#include <memory>
#include <string>
#include <map>
#include "Reporter.h"
#include "iosource/IOSource.h"
namespace comm {
// TODO: documentation
// Manages various forms of communication between peer Bro processes
// or possibly between different parts of a single Bro process.
class Manager : public iosource::IOSource {
public:
bool InitPreScript();
bool InitPostScript();
bool Listen(uint16_t port, const char* addr = nullptr);
bool Connect(std::string addr, uint16_t port,
std::chrono::duration<double> retry_interval);
bool Disconnect(const std::string& addr, uint16_t port);
private:
// IOSource interface overrides:
void GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
iosource::FD_Set* except) override;
double NextTimestamp(double* local_network_time) override;
void Process() override;
const char* Tag() override
{ return "Comm::Manager"; }
std::unique_ptr<broker::endpoint> endpoint;
std::map<std::pair<std::string, uint16_t>, broker::peering> peers;
};
} // namespace comm
extern comm::Manager* comm_mgr;
#endif // BRO_COMM_MANAGER_H

43
src/comm/comm.bif Normal file
View file

@ -0,0 +1,43 @@
module Comm;
%%{
#include "comm/Manager.h"
%%}
function Comm::listen%(p: port, a: string &default=""%): bool
%{
if ( ! p->IsTCP() )
{
reporter->Error("listen port must use tcp");
return new Val(false, TYPE_BOOL);
}
auto rval = comm_mgr->Listen(p->Port(), a->Len() ? a->CheckString() : 0);
return new Val(rval, TYPE_BOOL);
%}
function Comm::connect%(a: string, p: port, retry: interval%): bool
%{
if ( ! p->IsTCP() )
{
reporter->Error("remote connection port must use tcp");
return new Val(false, TYPE_BOOL);
}
auto rval = comm_mgr->Connect(a->CheckString(), p->Port(),
std::chrono::duration<double>(retry));
return new Val(rval, TYPE_BOOL);
%}
function Comm::disconnect%(a: string, p: port%): bool
%{
if ( ! p->IsTCP() )
{
reporter->Error("remote connection port must use tcp");
return new Val(false, TYPE_BOOL);
}
auto rval = comm_mgr->Disconnect(a->CheckString(), p->Port());
return new Val(rval, TYPE_BOOL);
%}

View file

@ -63,6 +63,10 @@ extern "C" void OPENSSL_add_all_algorithms_conf(void);
#include "3rdparty/sqlite3.h"
#ifdef ENABLE_BROKER
#include <comm/Manager.h>
#endif
Brofiler brofiler;
#ifndef HAVE_STRSEP
@ -94,6 +98,9 @@ analyzer::Manager* analyzer_mgr = 0;
file_analysis::Manager* file_mgr = 0;
broxygen::Manager* broxygen_mgr = 0;
iosource::Manager* iosource_mgr = 0;
#ifdef ENABLE_BROKER
comm::Manager* comm_mgr = 0;
#endif
Stmt* stmts;
EventHandlerPtr net_done = 0;
RuleMatcher* rule_matcher = 0;
@ -851,6 +858,16 @@ int main(int argc, char** argv)
input_mgr = new input::Manager();
file_mgr = new file_analysis::Manager();
#ifdef ENABLE_BROKER
comm_mgr = new comm::Manager();
if ( ! comm_mgr->InitPreScript() )
{
fprintf(stderr, "Failed to initialize communication manager.");
exit(1);
}
#endif
plugin_mgr->InitPreScript();
analyzer_mgr->InitPreScript();
file_mgr->InitPreScript();
@ -925,6 +942,11 @@ int main(int argc, char** argv)
exit(rc);
}
#ifdef ENABLE_BROKER
comm_mgr->InitPostScript();
iosource_mgr->Register(comm_mgr);
#endif
#ifdef USE_PERFTOOLS_DEBUG
}
#endif