diff --git a/scripts/base/frameworks/comm/main.bro b/scripts/base/frameworks/comm/main.bro index 974e5e43af..a2cd1f6ac0 100644 --- a/scripts/base/frameworks/comm/main.bro +++ b/scripts/base/frameworks/comm/main.bro @@ -15,9 +15,11 @@ export { d: opaque of Comm::Data &optional; }; + type DataVector: vector of Comm::Data; + type EventArgs: record { name: string &optional; # nil for invalid event/args. - args: vector of Comm::Data; + args: DataVector; }; type Comm::TableItem : record { @@ -25,3 +27,37 @@ export { val: Comm::Data; }; } + +module Store; + +export { + + type QueryStatus: enum { + SUCCESS, + FAILURE, + }; + + type ExpiryTime: record { + absolute: time &optional; + since_last_modification: interval &optional; + }; + + type QueryResult: record { + status: Store::QueryStatus; + result: Comm::Data; + }; + + type SQLiteOptions: record { + path: string &default = "store.sqlite"; + }; + + type RocksDBOptions: record { + path: string &default = "store.rocksdb"; + use_merge_operator: bool &default = F; + }; + + type BackendOptions: record { + sqlite: SQLiteOptions &default = SQLiteOptions(); + rocksdb: RocksDBOptions &default = RocksDBOptions(); + }; +} diff --git a/src/Trigger.cc b/src/Trigger.cc index 099027f4e0..772a991791 100644 --- a/src/Trigger.cc +++ b/src/Trigger.cc @@ -112,6 +112,7 @@ Trigger::Trigger(Expr* arg_cond, Stmt* arg_body, Stmt* arg_timeout_stmts, attached = 0; is_return = arg_is_return; location = arg_location; + timeout_value = -1; ++total_triggers; @@ -133,17 +134,22 @@ Trigger::Trigger(Expr* arg_cond, Stmt* arg_body, Stmt* arg_timeout_stmts, Val* timeout_val = arg_timeout ? arg_timeout->Eval(arg_frame) : 0; + if ( timeout_val ) + { + Unref(timeout_val); + timeout_value = timeout_val->AsInterval(); + } + // Make sure we don't get deleted if somebody calls a method like // Timeout() while evaluating the trigger. Ref(this); - if ( ! Eval() && timeout_val ) + if ( ! Eval() && timeout_value >= 0 ) { - timer = new TriggerTimer(timeout_val->AsInterval(), this); + timer = new TriggerTimer(timeout_value, this); timer_mgr->Add(timer); } - Unref(timeout_val); Unref(this); } diff --git a/src/Trigger.h b/src/Trigger.h index b752ea8ada..7662901dc5 100644 --- a/src/Trigger.h +++ b/src/Trigger.h @@ -32,6 +32,10 @@ public: // Executes timeout code and deletes the object. void Timeout(); + // Return the timeout interval (negative if none was specified). + double TimeoutValue() const + { return timeout_value; } + // Called if another entity needs to complete its operations first // in any case before this trigger can proceed. void Hold() { delayed = true; } @@ -87,6 +91,7 @@ private: Stmt* body; Stmt* timeout_stmts; Expr* timeout; + double timeout_value; Frame* frame; bool is_return; const Location* location; diff --git a/src/comm/CMakeLists.txt b/src/comm/CMakeLists.txt index 95ad701d71..da726e54d6 100644 --- a/src/comm/CMakeLists.txt +++ b/src/comm/CMakeLists.txt @@ -5,12 +5,20 @@ include_directories(BEFORE ${CMAKE_CURRENT_BINARY_DIR} ) +if ( ROCKSDB_INCLUDE_DIR ) + add_definitions(-DHAVE_ROCKSDB) + include_directories(BEFORE ${ROCKSDB_INCLUDE_DIR}) +endif () + set(comm_SRCS Data.cc Manager.cc + Store.cc ) -bif_target(comm.bif) +bif_target(data.bif) +bif_target(messaging.bif) +bif_target(store.bif) bro_add_subdir_library(comm ${comm_SRCS} ${BIF_OUTPUT_CC}) add_dependencies(bro_comm generate_outputs) diff --git a/src/comm/Data.cc b/src/comm/Data.cc index f32bedd885..3b1a240988 100644 --- a/src/comm/Data.cc +++ b/src/comm/Data.cc @@ -1,5 +1,5 @@ #include "Data.h" -#include "comm/comm.bif.h" +#include "comm/data.bif.h" using namespace std; diff --git a/src/comm/Manager.cc b/src/comm/Manager.cc index e64c74c377..9f17878cf6 100644 --- a/src/comm/Manager.cc +++ b/src/comm/Manager.cc @@ -1,12 +1,15 @@ #include "Manager.h" #include "Data.h" +#include "Store.h" #include #include #include #include "util.h" #include "Var.h" #include "Reporter.h" -#include "comm/comm.bif.h" +#include "comm/data.bif.h" +#include "comm/messaging.bif.h" +#include "comm/store.bif.h" #include "logging/Manager.h" using namespace std; @@ -17,6 +20,12 @@ int comm::Manager::send_flags_self_idx; int comm::Manager::send_flags_peers_idx; int comm::Manager::send_flags_unsolicited_idx; +comm::Manager::~Manager() + { + for ( auto& s : data_stores ) + CloseStore(s.first); + } + bool comm::Manager::InitPreScript() { return true; @@ -47,6 +56,7 @@ bool comm::Manager::InitPostScript() comm::opaque_of_table_iterator = new OpaqueType("Comm::TableIterator"); comm::opaque_of_vector_iterator = new OpaqueType("Comm::VectorIterator"); comm::opaque_of_record_iterator = new OpaqueType("Comm::RecordIterator"); + comm::opaque_of_store_handle = new OpaqueType("Store::Handle"); vector_of_data_type = new VectorType(internal_type("Comm::Data")->Ref()); auto res = broker::init(); @@ -385,6 +395,9 @@ void comm::Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write, for ( const auto& ps : log_subscriptions ) read->Insert(ps.second.fd()); + + for ( const auto& s : data_stores ) + read->Insert(s.second->store->responses().fd()); } double comm::Manager::NextTimestamp(double* local_network_time) @@ -393,6 +406,49 @@ double comm::Manager::NextTimestamp(double* local_network_time) return timer_mgr->Time(); } +struct response_converter { + using result_type = RecordVal*; + + result_type operator()(bool d) + { + return comm::make_data_val(broker::data{d}); + } + + result_type operator()(uint64_t d) + { + return comm::make_data_val(broker::data{d}); + } + + result_type operator()(broker::data& d) + { + return comm::make_data_val(move(d)); + } + + result_type operator()(std::vector& d) + { + return comm::make_data_val(broker::data{move(d)}); + } + + result_type operator()(broker::store::snapshot& d) + { + broker::table table; + + for ( auto& item : d.entries ) + { + auto& key = item.first; + auto& val = item.second.item; + table[move(key)] = move(val); + } + + return comm::make_data_val(broker::data{move(table)}); + } +}; + +static RecordVal* response_to_val(broker::store::response r) + { + return broker::visit(response_converter{}, r.reply.value); + } + void comm::Manager::Process() { bool idle = true; @@ -624,5 +680,92 @@ void comm::Manager::Process() } } + for ( const auto& s : data_stores ) + { + auto responses = s.second->store->responses().want_pop(); + + if ( responses.empty() ) + continue; + + idle = false; + + for ( auto& response : responses ) + { + auto ck = static_cast(response.cookie); + auto it = pending_queries.find(ck); + + if ( it == pending_queries.end() ) + { + reporter->Warning("unmatched response to query on store %s", + s.second->store->id().data()); + continue; + } + + auto query = *it; + + switch ( response.reply.stat ) { + case broker::store::result::status::timeout: + // Fine, trigger's timeout takes care of things. + break; + case broker::store::result::status::failure: + query->Result(query_result()); + break; + case broker::store::result::status::success: + query->Result(query_result(response_to_val(move(response)))); + break; + default: + reporter->InternalWarning("unknown store response status: %d", + static_cast(response.reply.stat)); + break; + } + + pending_queries.erase(it); + } + } + SetIdle(idle); } + +bool comm::Manager::AddStore(StoreHandleVal* handle) + { + if ( ! handle->store ) + return false; + + if ( data_stores.find(handle->store->id()) != data_stores.end() ) + return false; + + data_stores[handle->store->id()] = handle; + Ref(handle); + return true; + } + +bool comm::Manager::CloseStore(const broker::store::identifier& id) + { + auto it = data_stores.find(id); + + if ( it == data_stores.end() ) + return false; + + for ( auto it = pending_queries.begin(); it != pending_queries.end(); ) + { + auto query = *it; + + if ( query->StoreID() == id ) + { + it = pending_queries.erase(it); + query->Abort(); + delete query; + } + else + ++it; + } + + it->second->store = nullptr; + Unref(it->second); + return true; + } + +bool comm::Manager::TrackStoreQuery(StoreQueryCallback* cb) + { + return pending_queries.insert(cb).second; + } diff --git a/src/comm/Manager.h b/src/comm/Manager.h index 44f5eb0f2b..c9cc2c8464 100644 --- a/src/comm/Manager.h +++ b/src/comm/Manager.h @@ -6,6 +6,8 @@ #include #include #include +#include +#include "comm/Store.h" #include "Reporter.h" #include "iosource/IOSource.h" #include "Val.h" @@ -17,8 +19,11 @@ namespace comm { // Manages various forms of communication between peer Bro processes // or possibly between different parts of a single Bro process. class Manager : public iosource::IOSource { +friend class StoreHandleVal; public: + ~Manager(); + bool InitPreScript(); bool InitPostScript(); @@ -56,6 +61,12 @@ public: bool UnsubscribeToLogs(const std::string& topic_prefix); + bool AddStore(StoreHandleVal* handle); + + bool CloseStore(const broker::store::identifier& id); + + bool TrackStoreQuery(StoreQueryCallback* cb); + static int GetFlags(Val* flags); private: @@ -71,12 +82,18 @@ private: const char* Tag() override { return "Comm::Manager"; } + broker::endpoint& Endpoint() + { return *endpoint; } + std::unique_ptr endpoint; std::map, broker::peering> peers; std::map print_subscriptions; std::map event_subscriptions; std::map log_subscriptions; + std::map data_stores; + std::unordered_set pending_queries; + static VectorType* vector_of_data_type; static EnumType* log_id_type; static int send_flags_self_idx; diff --git a/src/comm/Store.cc b/src/comm/Store.cc new file mode 100644 index 0000000000..0d94795ce8 --- /dev/null +++ b/src/comm/Store.cc @@ -0,0 +1,141 @@ +#include "Store.h" +#include "comm/Manager.h" + +#include +#include +#include + +#ifdef HAVE_ROCKSDB +#include +#include +#endif + +OpaqueType* comm::opaque_of_store_handle; + +comm::StoreHandleVal::StoreHandleVal(broker::store::identifier id, + comm::StoreType arg_type, + broker::util::optional arg_back, + RecordVal* backend_options, std::chrono::duration resync) + : OpaqueVal(opaque_of_store_handle), + store(), store_type(arg_type), backend_type(arg_back) + { + using BifEnum::Store::BackendType; + std::unique_ptr backend; + + if ( backend_type ) + switch ( *backend_type ) { + case BackendType::MEMORY: + backend.reset(new broker::store::memory_backend); + break; + case BackendType::SQLITE: + { + auto sqlite = new broker::store::sqlite_backend; + std::string path = backend_options->Lookup(0)->AsRecordVal() + ->Lookup(0)->AsStringVal()->CheckString(); + + if ( sqlite->open(path) ) + backend.reset(sqlite); + else + { + reporter->Error("failed to open sqlite backend at path %s: %s", + path.data(), sqlite->last_error().data()); + delete sqlite; + } + } + break; + case BackendType::ROCKSDB: + { +#ifdef HAVE_ROCKSDB + std::string path = backend_options->Lookup(1)->AsRecordVal() + ->Lookup(0)->AsStringVal()->CheckString(); + bool use_merge_op = backend_options->Lookup(1)->AsRecordVal() + ->Lookup(1)->AsBool(); + rocksdb::Options rock_op; + rock_op.create_if_missing = true; + + if ( use_merge_op ) + options.merge_operator.reset(new rocksdb_merge_operator); + + auto rocksdb = new broker::store::rocksdb_backend; + + if ( rocksdb->open(path, options).ok() ) + backend.reset(rocksdb); + else + { + reporter->Error("failed to open rocksdb backend at path %s: %s", + path.data(), rocksdb->last_error().data()); + delete rocksdb; + } +#else + reporter->Error("rocksdb backend support is not enabled"); +#endif + } + break; + default: + reporter->FatalError("unknown data store backend: %d", + static_cast(*backend_type)); + } + + switch ( store_type ) { + case StoreType::FRONTEND: + store.reset(new broker::store::frontend(comm_mgr->Endpoint(), + move(id))); + break; + case StoreType::MASTER: + store.reset(new broker::store::master(comm_mgr->Endpoint(), + move(id), move(backend))); + break; + case StoreType::CLONE: + store.reset(new broker::store::clone(comm_mgr->Endpoint(), + move(id), resync, + move(backend))); + break; + default: + reporter->FatalError("unknown data store type: %d", + static_cast(store_type)); + } + } + +void comm::StoreHandleVal::ValDescribe(ODesc* d) const + { + using BifEnum::Store::BackendType; + d->Add("broker::store::"); + + switch ( store_type ) { + case StoreType::FRONTEND: + d->Add("frontend"); + break; + case StoreType::MASTER: + d->Add("master"); + break; + case StoreType::CLONE: + d->Add("clone"); + break; + default: + d->Add("unknown"); + } + + d->Add("{"); + d->Add(store->id()); + + if ( backend_type ) + { + d->Add(", "); + + switch ( *backend_type ) { + case BackendType::MEMORY: + d->Add("memory"); + break; + case BackendType::SQLITE: + d->Add("sqlite"); + break; + case BackendType::ROCKSDB: + d->Add("rocksdb"); + break; + default: + d->Add("unknown"); + } + } + + d->Add("}"); + } diff --git a/src/comm/Store.h b/src/comm/Store.h new file mode 100644 index 0000000000..b3a8ccb339 --- /dev/null +++ b/src/comm/Store.h @@ -0,0 +1,113 @@ +#ifndef BRO_COMM_STORE_H +#define BRO_COMM_STORE_H + +#include "comm/store.bif.h" +#include "comm/data.bif.h" +#include "Reporter.h" +#include "Type.h" +#include "Val.h" +#include "Trigger.h" + +#include + +namespace comm { + +extern OpaqueType* opaque_of_store_handle; + +enum StoreType { + FRONTEND, + MASTER, + CLONE, +}; + +inline EnumVal* query_status(bool success) + { + static EnumType* store_query_status = nullptr; + static int success_val; + static int failure_val; + + if ( ! store_query_status ) + { + store_query_status = internal_type("Store::QueryStatus")->AsEnumType(); + success_val = store_query_status->Lookup("Store", "SUCCESS"); + failure_val = store_query_status->Lookup("Store", "FAILURE"); + } + + return new EnumVal(success ? success_val : failure_val, store_query_status); + } + +inline RecordVal* query_result() + { + auto rval = new RecordVal(BifType::Record::Store::QueryResult); + rval->Assign(0, query_status(false)); + rval->Assign(1, new RecordVal(BifType::Record::Comm::Data)); + return rval; + } + +inline RecordVal* query_result(RecordVal* data) + { + auto rval = new RecordVal(BifType::Record::Store::QueryResult); + rval->Assign(0, query_status(true)); + rval->Assign(1, data); + return rval; + } + +class StoreQueryCallback { +public: + + StoreQueryCallback(Trigger* arg_trigger, const CallExpr* arg_call, + broker::store::identifier arg_store_id) + : trigger(arg_trigger), call(arg_call), store_id(move(arg_store_id)) + { + Ref(trigger); + } + + ~StoreQueryCallback() + { + Unref(trigger); + } + + void Result(RecordVal* result) + { + trigger->Cache(call, result); + trigger->Release(); + Unref(result); + } + + void Abort() + { + auto result = query_result(); + trigger->Cache(call, result); + trigger->Release(); + Unref(result); + } + + const broker::store::identifier& StoreID() const + { return store_id; } + +private: + + Trigger* trigger; + const CallExpr* call; + broker::store::identifier store_id; +}; + +class StoreHandleVal : public OpaqueVal { +public: + + StoreHandleVal(broker::store::identifier id, + comm::StoreType arg_type, + broker::util::optional arg_back, + RecordVal* backend_options, + std::chrono::duration resync = std::chrono::seconds(1)); + + void ValDescribe(ODesc* d) const override; + + std::unique_ptr store; + comm::StoreType store_type; + broker::util::optional backend_type; +}; + +} // namespace comm + +#endif // BRO_COMM_STORE_H diff --git a/src/comm/comm.bif b/src/comm/data.bif similarity index 76% rename from src/comm/comm.bif rename to src/comm/data.bif index b2ce0fb415..2a78a9229a 100644 --- a/src/comm/comm.bif +++ b/src/comm/data.bif @@ -1,8 +1,8 @@ +##! Functions for inspecting and manipulating broker data. + %%{ -#include "comm/Manager.h" #include "comm/Data.h" -#include "logging/Manager.h" %%} module Comm; @@ -25,12 +25,8 @@ enum DataType %{ RECORD, %} -type Comm::SendFlags: record; - type Comm::Data: record; -type Comm::EventArgs: record; - type Comm::TableItem: record; function Comm::data%(d: any%): Comm::Data @@ -507,146 +503,3 @@ function Comm::record_iterator_value%(it: opaque of Comm::RecordIterator%): Comm rval->Assign(0, new comm::DataVal(**ri->it)); return rval; %} - -event Comm::remote_connection_established%(peer_address: string, - peer_port: port, - peer_name: string%); - -event Comm::remote_connection_broken%(peer_address: string, - peer_port: port%); - -event Comm::remote_connection_incompatible%(peer_address: string, - peer_port: port%); - -function Comm::listen%(p: port, a: string &default = "", - reuse: bool &default = T%): bool - %{ - if ( ! p->IsTCP() ) - { - reporter->Error("listen port must use tcp"); - return new Val(false, TYPE_BOOL); - } - - auto rval = comm_mgr->Listen(p->Port(), a->Len() ? a->CheckString() : 0, - reuse); - return new Val(rval, TYPE_BOOL); - %} - -function Comm::connect%(a: string, p: port, retry: interval%): bool - %{ - if ( ! p->IsTCP() ) - { - reporter->Error("remote connection port must use tcp"); - return new Val(false, TYPE_BOOL); - } - - auto rval = comm_mgr->Connect(a->CheckString(), p->Port(), - std::chrono::duration(retry)); - return new Val(rval, TYPE_BOOL); - %} - -function Comm::disconnect%(a: string, p: port%): bool - %{ - if ( ! p->IsTCP() ) - { - reporter->Error("remote connection port must use tcp"); - return new Val(false, TYPE_BOOL); - } - - auto rval = comm_mgr->Disconnect(a->CheckString(), p->Port()); - return new Val(rval, TYPE_BOOL); - %} - -event Comm::print_handler%(msg: string%); - -function Comm::print%(topic: string, msg: string, - flags: SendFlags &default = SendFlags()%): bool - %{ - auto rval = comm_mgr->Print(topic->CheckString(), msg->CheckString(), - flags); - return new Val(rval, TYPE_BOOL); - %} - -function Comm::subscribe_to_prints%(topic_prefix: string%): bool - %{ - auto rval = comm_mgr->SubscribeToPrints(topic_prefix->CheckString()); - return new Val(rval, TYPE_BOOL); - %} - -function Comm::unsubscribe_to_prints%(topic_prefix: string%): bool - %{ - auto rval = comm_mgr->UnsubscribeToPrints(topic_prefix->CheckString()); - return new Val(rval, TYPE_BOOL); - %} - -function Comm::event_args%(...%): Comm::EventArgs - %{ - auto rval = comm_mgr->MakeEventArgs(@ARGS@); - return rval; - %} - -function Comm::event%(topic: string, args: Comm::EventArgs, - flags: SendFlags &default = SendFlags()%): bool - %{ - auto rval = comm_mgr->Event(topic->CheckString(), args->AsRecordVal(), - flags); - return new Val(rval, TYPE_BOOL); - %} - -function Comm::auto_event%(topic: string, ev: any, - flags: SendFlags &default = SendFlags()%): bool - %{ - auto rval = comm_mgr->AutoEvent(topic->CheckString(), ev, flags); - return new Val(rval, TYPE_BOOL); - %} - -function Comm::auto_event_stop%(topic: string, ev: any%): bool - %{ - auto rval = comm_mgr->AutoEventStop(topic->CheckString(), ev); - return new Val(rval, TYPE_BOOL); - %} - -function Comm::subscribe_to_events%(topic_prefix: string%): bool - %{ - auto rval = comm_mgr->SubscribeToEvents(topic_prefix->CheckString()); - return new Val(rval, TYPE_BOOL); - %} - -function Comm::unsubscribe_to_events%(topic_prefix: string%): bool - %{ - auto rval = comm_mgr->UnsubscribeToEvents(topic_prefix->CheckString()); - return new Val(rval, TYPE_BOOL); - %} - -function -Comm::enable_remote_logs%(id: Log::ID, - flags: SendFlags &default = SendFlags()%): bool - %{ - auto rval = log_mgr->EnableRemoteLogs(id->AsEnumVal(), - comm::Manager::GetFlags(flags)); - return new Val(rval, TYPE_BOOL); - %} - -function Comm::disable_remote_logs%(id: Log::ID%): bool - %{ - auto rval = log_mgr->DisableRemoteLogs(id->AsEnumVal()); - return new Val(rval, TYPE_BOOL); - %} - -function Comm::remote_logs_enabled%(id: Log::ID%): bool - %{ - auto rval = log_mgr->RemoteLogsAreEnabled(id->AsEnumVal()); - return new Val(rval, TYPE_BOOL); - %} - -function Comm::subscribe_to_logs%(topic_prefix: string%): bool - %{ - auto rval = comm_mgr->SubscribeToLogs(topic_prefix->CheckString()); - return new Val(rval, TYPE_BOOL); - %} - -function Comm::unsubscribe_to_logs%(topic_prefix: string%): bool - %{ - auto rval = comm_mgr->UnsubscribeToLogs(topic_prefix->CheckString()); - return new Val(rval, TYPE_BOOL); - %} diff --git a/src/comm/messaging.bif b/src/comm/messaging.bif new file mode 100644 index 0000000000..f5034f842f --- /dev/null +++ b/src/comm/messaging.bif @@ -0,0 +1,156 @@ + +##! Functions for peering and various messaging patterns (e.g. print/log/event). + +%%{ +#include "comm/Manager.h" +#include "logging/Manager.h" +%%} + +module Comm; + +type Comm::SendFlags: record; + +type Comm::EventArgs: record; + +event Comm::remote_connection_established%(peer_address: string, + peer_port: port, + peer_name: string%); + +event Comm::remote_connection_broken%(peer_address: string, + peer_port: port%); + +event Comm::remote_connection_incompatible%(peer_address: string, + peer_port: port%); + +function Comm::listen%(p: port, a: string &default = "", + reuse: bool &default = T%): bool + %{ + if ( ! p->IsTCP() ) + { + reporter->Error("listen port must use tcp"); + return new Val(false, TYPE_BOOL); + } + + auto rval = comm_mgr->Listen(p->Port(), a->Len() ? a->CheckString() : 0, + reuse); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::connect%(a: string, p: port, retry: interval%): bool + %{ + if ( ! p->IsTCP() ) + { + reporter->Error("remote connection port must use tcp"); + return new Val(false, TYPE_BOOL); + } + + auto rval = comm_mgr->Connect(a->CheckString(), p->Port(), + std::chrono::duration(retry)); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::disconnect%(a: string, p: port%): bool + %{ + if ( ! p->IsTCP() ) + { + reporter->Error("remote connection port must use tcp"); + return new Val(false, TYPE_BOOL); + } + + auto rval = comm_mgr->Disconnect(a->CheckString(), p->Port()); + return new Val(rval, TYPE_BOOL); + %} + +event Comm::print_handler%(msg: string%); + +function Comm::print%(topic: string, msg: string, + flags: SendFlags &default = SendFlags()%): bool + %{ + auto rval = comm_mgr->Print(topic->CheckString(), msg->CheckString(), + flags); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::subscribe_to_prints%(topic_prefix: string%): bool + %{ + auto rval = comm_mgr->SubscribeToPrints(topic_prefix->CheckString()); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::unsubscribe_to_prints%(topic_prefix: string%): bool + %{ + auto rval = comm_mgr->UnsubscribeToPrints(topic_prefix->CheckString()); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::event_args%(...%): Comm::EventArgs + %{ + auto rval = comm_mgr->MakeEventArgs(@ARGS@); + return rval; + %} + +function Comm::event%(topic: string, args: Comm::EventArgs, + flags: SendFlags &default = SendFlags()%): bool + %{ + auto rval = comm_mgr->Event(topic->CheckString(), args->AsRecordVal(), + flags); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::auto_event%(topic: string, ev: any, + flags: SendFlags &default = SendFlags()%): bool + %{ + auto rval = comm_mgr->AutoEvent(topic->CheckString(), ev, flags); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::auto_event_stop%(topic: string, ev: any%): bool + %{ + auto rval = comm_mgr->AutoEventStop(topic->CheckString(), ev); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::subscribe_to_events%(topic_prefix: string%): bool + %{ + auto rval = comm_mgr->SubscribeToEvents(topic_prefix->CheckString()); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::unsubscribe_to_events%(topic_prefix: string%): bool + %{ + auto rval = comm_mgr->UnsubscribeToEvents(topic_prefix->CheckString()); + return new Val(rval, TYPE_BOOL); + %} + +function +Comm::enable_remote_logs%(id: Log::ID, + flags: SendFlags &default = SendFlags()%): bool + %{ + auto rval = log_mgr->EnableRemoteLogs(id->AsEnumVal(), + comm::Manager::GetFlags(flags)); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::disable_remote_logs%(id: Log::ID%): bool + %{ + auto rval = log_mgr->DisableRemoteLogs(id->AsEnumVal()); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::remote_logs_enabled%(id: Log::ID%): bool + %{ + auto rval = log_mgr->RemoteLogsAreEnabled(id->AsEnumVal()); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::subscribe_to_logs%(topic_prefix: string%): bool + %{ + auto rval = comm_mgr->SubscribeToLogs(topic_prefix->CheckString()); + return new Val(rval, TYPE_BOOL); + %} + +function Comm::unsubscribe_to_logs%(topic_prefix: string%): bool + %{ + auto rval = comm_mgr->UnsubscribeToLogs(topic_prefix->CheckString()); + return new Val(rval, TYPE_BOOL); + %} diff --git a/src/comm/store.bif b/src/comm/store.bif new file mode 100644 index 0000000000..fb4c8d57ce --- /dev/null +++ b/src/comm/store.bif @@ -0,0 +1,378 @@ + +##! Functions to interface with broker's distributed data store. + +%%{ +#include "comm/Manager.h" +#include "comm/Store.h" +#include "comm/Data.h" +#include "Trigger.h" +%%} + +module Store; + +type Store::ExpiryTime: record; + +type Store::QueryResult: record; + +type Store::BackendOptions: record; + +enum BackendType %{ + MEMORY, + SQLITE, + ROCKSDB, +%} + +function Store::create_master%(id: string, b: BackendType &default = MEMORY, + options: BackendOptions &default = BackendOptions()%): opaque of Store::Handle + %{ + auto rval = new comm::StoreHandleVal(id->CheckString(), comm::StoreType::MASTER, + static_cast(b->AsEnum()), + options->AsRecordVal()); + comm_mgr->AddStore(rval); + return rval; + %} + +function Store::create_clone%(id: string, b: BackendType &default = MEMORY, + options: BackendOptions &default = BackendOptions(), + resync: interval &default = 1sec%): opaque of Store::Handle + %{ + auto rval = new comm::StoreHandleVal(id->CheckString(), comm::StoreType::CLONE, + static_cast(b->AsEnum()), + options->AsRecordVal(), + std::chrono::duration(resync)); + comm_mgr->AddStore(rval); + return rval; + %} + +function Store::create_frontend%(id: string%): opaque of Store::Handle + %{ + auto rval = new comm::StoreHandleVal(id->CheckString(), comm::StoreType::FRONTEND, + {}, nullptr); + comm_mgr->AddStore(rval); + return rval; + %} + +function Store::close_by_name%(id: string%): bool + %{ + return new Val(comm_mgr->CloseStore(id->CheckString()), TYPE_BOOL); + %} + +function Store::close_by_handle%(h: opaque of Store::Handle%): bool + %{ + auto handle = static_cast(h); + + if ( ! handle->store ) + return new Val(false, TYPE_BOOL); + + return new Val(comm_mgr->CloseStore(handle->store->id()), TYPE_BOOL); + %} + +########################### +# non-blocking update API # +########################### + +function Store::insert%(h: opaque of Store::Handle, + k: Comm::Data, v: Comm::Data, + e: Store::ExpiryTime &default = Store::ExpiryTime()%): bool + %{ + auto handle = static_cast(h); + + if ( ! handle->store ) + return new Val(false, TYPE_BOOL); + + auto& key = comm::opaque_field_to_data(k->AsRecordVal(), frame); + auto& val = comm::opaque_field_to_data(v->AsRecordVal(), frame); + + broker::util::optional expiry; + + auto abs_expiry_val = e->AsRecordVal()->Lookup(0); + auto rel_expiry_val = e->AsRecordVal()->Lookup(1); + + if ( abs_expiry_val ) + { + auto tag = broker::store::expiration_time::tag::absolute; + expiry = broker::store::expiration_time(abs_expiry_val->AsTime(), tag); + } + else if ( rel_expiry_val ) + { + auto tag = broker::store::expiration_time::tag::since_last_modification; + expiry = broker::store::expiration_time(rel_expiry_val->AsInterval(), tag); + } + + handle->store->insert(key, val, expiry); + return new Val(true, TYPE_BOOL); + %} + +function Store::erase%(h: opaque of Store::Handle, k: Comm::Data%): bool + %{ + auto handle = static_cast(h); + + if ( ! handle->store ) + return new Val(false, TYPE_BOOL); + + auto& key = comm::opaque_field_to_data(k->AsRecordVal(), frame); + handle->store->erase(key); + return new Val(true, TYPE_BOOL); + %} + +function Store::clear%(h: opaque of Store::Handle%): bool + %{ + auto handle = static_cast(h); + + if ( ! handle->store ) + return new Val(false, TYPE_BOOL); + + handle->store->clear(); + return new Val(true, TYPE_BOOL); + %} + +function Store::increment%(h: opaque of Store::Handle, + k: Comm::Data, by: int%): bool + %{ + auto handle = static_cast(h); + + if ( ! handle->store ) + return new Val(false, TYPE_BOOL); + + auto& key = comm::opaque_field_to_data(k->AsRecordVal(), frame); + handle->store->increment(key, by); + return new Val(true, TYPE_BOOL); + %} + +function Store::decrement%(h: opaque of Store::Handle, + k: Comm::Data, by: int%): bool + %{ + auto handle = static_cast(h); + + if ( ! handle->store ) + return new Val(false, TYPE_BOOL); + + auto& key = comm::opaque_field_to_data(k->AsRecordVal(), frame); + handle->store->decrement(key, by); + return new Val(true, TYPE_BOOL); + %} + +function Store::add_to_set%(h: opaque of Store::Handle, + k: Comm::Data, element: Comm::Data%): bool + %{ + auto handle = static_cast(h); + + if ( ! handle->store ) + return new Val(false, TYPE_BOOL); + + auto& key = comm::opaque_field_to_data(k->AsRecordVal(), frame); + auto& ele = comm::opaque_field_to_data(element->AsRecordVal(), frame); + handle->store->add_to_set(key, ele); + return new Val(true, TYPE_BOOL); + %} + +function Store::remove_from_set%(h: opaque of Store::Handle, + k: Comm::Data, element: Comm::Data%): bool + %{ + auto handle = static_cast(h); + + if ( ! handle->store ) + return new Val(false, TYPE_BOOL); + + auto& key = comm::opaque_field_to_data(k->AsRecordVal(), frame); + auto& ele = comm::opaque_field_to_data(element->AsRecordVal(), frame); + handle->store->remove_from_set(key, ele); + return new Val(true, TYPE_BOOL); + %} + +function Store::push_left%(h: opaque of Store::Handle, k: Comm::Data, + items: Comm::DataVector%): bool + %{ + auto handle = static_cast(h); + + if ( ! handle->store ) + return new Val(false, TYPE_BOOL); + + auto& key = comm::opaque_field_to_data(k->AsRecordVal(), frame); + broker::vector items_vector; + auto items_vv = items->AsVector(); + + for ( auto i = 0u; i < items_vv->size(); ++i ) + { + auto& item = comm::opaque_field_to_data((*items_vv)[i]->AsRecordVal(), + frame); + items_vector.emplace_back(item); + } + + handle->store->push_left(key, move(items_vector)); + return new Val(true, TYPE_BOOL); + %} + +function Store::push_right%(h: opaque of Store::Handle, k: Comm::Data, + items: Comm::DataVector%): bool + %{ + auto handle = static_cast(h); + + if ( ! handle->store ) + return new Val(false, TYPE_BOOL); + + auto& key = comm::opaque_field_to_data(k->AsRecordVal(), frame); + broker::vector items_vector; + auto items_vv = items->AsVector(); + + for ( auto i = 0u; i < items_vv->size(); ++i ) + { + auto& item = comm::opaque_field_to_data((*items_vv)[i]->AsRecordVal(), + frame); + items_vector.emplace_back(item); + } + + handle->store->push_right(key, move(items_vector)); + return new Val(true, TYPE_BOOL); + %} + +########################## +# non-blocking query API # +########################## + +%%{ +static bool prepare_for_query(Val* opaque, Frame* frame, + comm::StoreHandleVal** handle, + double* timeout, + comm::StoreQueryCallback** cb) + { + *handle = static_cast(opaque); + + if ( ! (*handle)->store ) + return false; + + Trigger* trigger = frame->GetTrigger(); + + if ( ! trigger ) + { + reporter->PushLocation(frame->GetCall()->GetLocationInfo()); + reporter->Error("Store queries can only be called inside when-condition"); + reporter->PopLocation(); + return false; + } + + *timeout = trigger->TimeoutValue(); + + if ( *timeout < 0 ) + { + reporter->PushLocation(frame->GetCall()->GetLocationInfo()); + reporter->Error("Store queries must specify a timeout block"); + reporter->PopLocation(); + return false; + } + + frame->SetDelayed(); + trigger->Hold(); + *cb = new comm::StoreQueryCallback(trigger, frame->GetCall(), + (*handle)->store->id()); + comm_mgr->TrackStoreQuery(*cb); + return true; + } + +%%} + +function Store::pop_left%(h: opaque of Store::Handle, + k: Comm::Data%): Store::QueryResult + %{ + double timeout; + comm::StoreQueryCallback* cb; + comm::StoreHandleVal* handle; + + if ( ! prepare_for_query(h, frame, &handle, &timeout, &cb) ) + return comm::query_result(); + + Val* key = k->AsRecordVal()->Lookup(0); + + if ( ! key ) + return comm::query_result(); + + handle->store->pop_left(static_cast(key)->data, + std::chrono::duration(timeout), cb); + return 0; + %} + +function Store::pop_right%(h: opaque of Store::Handle, + k: Comm::Data%): Store::QueryResult + %{ + double timeout; + comm::StoreQueryCallback* cb; + comm::StoreHandleVal* handle; + + if ( ! prepare_for_query(h, frame, &handle, &timeout, &cb) ) + return comm::query_result(); + + Val* key = k->AsRecordVal()->Lookup(0); + + if ( ! key ) + return comm::query_result(); + + handle->store->pop_right(static_cast(key)->data, + std::chrono::duration(timeout), cb); + return 0; + %} + +function Store::lookup%(h: opaque of Store::Handle, + k: Comm::Data%): Store::QueryResult + %{ + double timeout; + comm::StoreQueryCallback* cb; + comm::StoreHandleVal* handle; + + if ( ! prepare_for_query(h, frame, &handle, &timeout, &cb) ) + return comm::query_result(); + + Val* key = k->AsRecordVal()->Lookup(0); + + if ( ! key ) + return comm::query_result(); + + handle->store->lookup(static_cast(key)->data, + std::chrono::duration(timeout), cb); + return 0; + %} + +function Store::exists%(h: opaque of Store::Handle, + k: Comm::Data%): Store::QueryResult + %{ + double timeout; + comm::StoreQueryCallback* cb; + comm::StoreHandleVal* handle; + + if ( ! prepare_for_query(h, frame, &handle, &timeout, &cb) ) + return comm::query_result(); + + Val* key = k->AsRecordVal()->Lookup(0); + + if ( ! key ) + return comm::query_result(); + + handle->store->exists(static_cast(key)->data, + std::chrono::duration(timeout), cb); + return 0; + %} + +function Store::keys%(h: opaque of Store::Handle%): Store::QueryResult + %{ + double timeout; + comm::StoreQueryCallback* cb; + comm::StoreHandleVal* handle; + + if ( ! prepare_for_query(h, frame, &handle, &timeout, &cb) ) + return comm::query_result(); + + handle->store->keys(std::chrono::duration(timeout), cb); + return 0; + %} + +function Store::size%(h: opaque of Store::Handle%): Store::QueryResult + %{ + double timeout; + comm::StoreQueryCallback* cb; + comm::StoreHandleVal* handle; + + if ( ! prepare_for_query(h, frame, &handle, &timeout, &cb) ) + return comm::query_result(); + + handle->store->size(std::chrono::duration(timeout), cb); + return 0; + %}