diff --git a/CMakeLists.txt b/CMakeLists.txt index 1f0fcf8d07..b31e60ac01 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -181,6 +181,8 @@ if ( ENABLE_BROKER ) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") add_subdirectory(aux/broker) set(brodeps ${brodeps} broker) + add_definitions(-DENABLE_BROKER) + include_directories(BEFORE ${CMAKE_CURRENT_SOURCE_DIR}/aux/broker) endif () add_subdirectory(src) diff --git a/aux/broker b/aux/broker index a1b51def07..331966d1f3 160000 --- a/aux/broker +++ b/aux/broker @@ -1 +1 @@ -Subproject commit a1b51def07cfb191d0a83a78c7102560740dbcb3 +Subproject commit 331966d1f3d24c63bedbda79e477f759c4d267f9 diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 13c6e45006..55ca12c873 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -161,6 +161,10 @@ add_subdirectory(iosource) add_subdirectory(logging) add_subdirectory(probabilistic) +if ( ENABLE_BROKER ) + add_subdirectory(comm) +endif () + set(bro_SUBDIRS # Order is important here. ${bro_PLUGIN_LIBS} diff --git a/src/comm/CMakeLists.txt b/src/comm/CMakeLists.txt new file mode 100644 index 0000000000..c152adc49a --- /dev/null +++ b/src/comm/CMakeLists.txt @@ -0,0 +1,15 @@ +include(BroSubdir) + +include_directories(BEFORE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_BINARY_DIR} +) + +set(comm_SRCS + Manager.cc +) + +bif_target(comm.bif) + +bro_add_subdir_library(comm ${comm_SRCS} ${BIF_OUTPUT_CC}) +add_dependencies(bro_comm generate_outputs) diff --git a/src/comm/Manager.cc b/src/comm/Manager.cc new file mode 100644 index 0000000000..ac67208ca6 --- /dev/null +++ b/src/comm/Manager.cc @@ -0,0 +1,115 @@ +#include "Manager.h" +#include +#include +#include +#include "util.h" +#include "Reporter.h" + +bool comm::Manager::InitPreScript() + { + auto res = broker::init(); + + if ( res ) + { + fprintf(stderr, "broker::init failed: %s\n", broker::strerror(res)); + return false; + } + + char host[256]; + const char* name; + + if ( gethostname(host, sizeof(host)) == 0 ) + name = fmt("bro@%s.%ld", host, static_cast(getpid())); + else + name = fmt("bro@.%ld", static_cast(getpid())); + + endpoint = std::unique_ptr(new broker::endpoint(name)); + return true; + } + +bool comm::Manager::InitPostScript() + { + return true; + } + +bool comm::Manager::Listen(uint16_t port, const char* addr) + { + auto rval = endpoint->listen(port, addr); + + if ( ! rval ) + { + reporter->Error("Failed to listen on %s:%" PRIu16 " : %s", + addr ? addr : "INADDR_ANY", port, + endpoint->last_error().data()); + } + + return rval; + } + +bool comm::Manager::Connect(string addr, uint16_t port, + std::chrono::duration retry_interval) + { + auto& peer = peers[std::make_pair(addr, port)]; + + if ( peer ) + return false; + + peer = endpoint->peer(std::move(addr), port, retry_interval); + return true; + } + +bool comm::Manager::Disconnect(const string& addr, uint16_t port) + { + auto it = peers.find(std::make_pair(addr, port)); + + if ( it == peers.end() ) + return false; + + return endpoint->unpeer(it->second); + } + +void comm::Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write, + iosource::FD_Set* except) + { + read->Insert(endpoint->peer_status().fd()); + } + +double comm::Manager::NextTimestamp(double* local_network_time) + { + // TODO: do something better? + return timer_mgr->Time(); + } + +void comm::Manager::Process() + { + bool idle = true; + auto peer_status_updates = endpoint->peer_status().want_pop(); + + if ( ! peer_status_updates.empty() ) + idle = false; + + for ( auto& u : peer_status_updates ) + { + if ( ! u.relation.remote() ) + continue; + + // TODO: generate events + switch ( u.status ) { + case broker::peer_status::tag::established: + printf("established\n"); + break; + case broker::peer_status::tag::disconnected: + printf("disconnected\n"); + break; + case broker::peer_status::tag::incompatible: + printf("incompatible\n"); + break; + default: + reporter->InternalWarning("unknown broker::peer_status::tag : %d", + static_cast(u.status)); + break; + } + } + + SetIdle(idle); + } diff --git a/src/comm/Manager.h b/src/comm/Manager.h new file mode 100644 index 0000000000..412c125d14 --- /dev/null +++ b/src/comm/Manager.h @@ -0,0 +1,52 @@ +#ifndef BRO_COMM_MANAGER_H +#define BRO_COMM_MANAGER_H + +#include +#include +#include +#include +#include "Reporter.h" +#include "iosource/IOSource.h" + +namespace comm { + +// TODO: documentation + +// Manages various forms of communication between peer Bro processes +// or possibly between different parts of a single Bro process. +class Manager : public iosource::IOSource { +public: + + bool InitPreScript(); + + bool InitPostScript(); + + bool Listen(uint16_t port, const char* addr = nullptr); + + bool Connect(std::string addr, uint16_t port, + std::chrono::duration retry_interval); + + bool Disconnect(const std::string& addr, uint16_t port); + +private: + + // IOSource interface overrides: + void GetFds(iosource::FD_Set* read, iosource::FD_Set* write, + iosource::FD_Set* except) override; + + double NextTimestamp(double* local_network_time) override; + + void Process() override; + + const char* Tag() override + { return "Comm::Manager"; } + + std::unique_ptr endpoint; + std::map, broker::peering> peers; +}; + +} // namespace comm + +extern comm::Manager* comm_mgr; + +#endif // BRO_COMM_MANAGER_H diff --git a/src/comm/comm.bif b/src/comm/comm.bif new file mode 100644 index 0000000000..ce54b916ca --- /dev/null +++ b/src/comm/comm.bif @@ -0,0 +1,43 @@ + +module Comm; + +%%{ +#include "comm/Manager.h" +%%} + +function Comm::listen%(p: port, a: string &default=""%): bool + %{ + if ( ! p->IsTCP() ) + { + reporter->Error("listen port must use tcp"); + return new Val(false, TYPE_BOOL); + } + + auto rval = comm_mgr->Listen(p->Port(), a->Len() ? a->CheckString() : 0); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::connect%(a: string, p: port, retry: interval%): bool + %{ + if ( ! p->IsTCP() ) + { + reporter->Error("remote connection port must use tcp"); + return new Val(false, TYPE_BOOL); + } + + auto rval = comm_mgr->Connect(a->CheckString(), p->Port(), + std::chrono::duration(retry)); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::disconnect%(a: string, p: port%): bool + %{ + if ( ! p->IsTCP() ) + { + reporter->Error("remote connection port must use tcp"); + return new Val(false, TYPE_BOOL); + } + + auto rval = comm_mgr->Disconnect(a->CheckString(), p->Port()); + return new Val(rval, TYPE_BOOL); + %} diff --git a/src/main.cc b/src/main.cc index 15aea3d3fe..a7099cb90b 100644 --- a/src/main.cc +++ b/src/main.cc @@ -63,6 +63,10 @@ extern "C" void OPENSSL_add_all_algorithms_conf(void); #include "3rdparty/sqlite3.h" +#ifdef ENABLE_BROKER +#include +#endif + Brofiler brofiler; #ifndef HAVE_STRSEP @@ -94,6 +98,9 @@ analyzer::Manager* analyzer_mgr = 0; file_analysis::Manager* file_mgr = 0; broxygen::Manager* broxygen_mgr = 0; iosource::Manager* iosource_mgr = 0; +#ifdef ENABLE_BROKER +comm::Manager* comm_mgr = 0; +#endif Stmt* stmts; EventHandlerPtr net_done = 0; RuleMatcher* rule_matcher = 0; @@ -851,6 +858,16 @@ int main(int argc, char** argv) input_mgr = new input::Manager(); file_mgr = new file_analysis::Manager(); +#ifdef ENABLE_BROKER + comm_mgr = new comm::Manager(); + + if ( ! comm_mgr->InitPreScript() ) + { + fprintf(stderr, "Failed to initialize communication manager."); + exit(1); + } +#endif + plugin_mgr->InitPreScript(); analyzer_mgr->InitPreScript(); file_mgr->InitPreScript(); @@ -925,6 +942,11 @@ int main(int argc, char** argv) exit(rc); } +#ifdef ENABLE_BROKER + comm_mgr->InitPostScript(); + iosource_mgr->Register(comm_mgr); +#endif + #ifdef USE_PERFTOOLS_DEBUG } #endif