broker integration: add distributed data store api

But haven't done the full gamut of testing on it yet.
This commit is contained in:
Jon Siwek 2015-01-30 14:39:16 -06:00
parent d2ea87735a
commit 9875f5d3eb
12 changed files with 1012 additions and 156 deletions

View file

@ -15,9 +15,11 @@ export {
d: opaque of Comm::Data &optional;
};
type DataVector: vector of Comm::Data;
type EventArgs: record {
name: string &optional; # nil for invalid event/args.
args: vector of Comm::Data;
args: DataVector;
};
type Comm::TableItem : record {
@ -25,3 +27,37 @@ export {
val: Comm::Data;
};
}
module Store;
export {
type QueryStatus: enum {
SUCCESS,
FAILURE,
};
type ExpiryTime: record {
absolute: time &optional;
since_last_modification: interval &optional;
};
type QueryResult: record {
status: Store::QueryStatus;
result: Comm::Data;
};
type SQLiteOptions: record {
path: string &default = "store.sqlite";
};
type RocksDBOptions: record {
path: string &default = "store.rocksdb";
use_merge_operator: bool &default = F;
};
type BackendOptions: record {
sqlite: SQLiteOptions &default = SQLiteOptions();
rocksdb: RocksDBOptions &default = RocksDBOptions();
};
}

View file

@ -112,6 +112,7 @@ Trigger::Trigger(Expr* arg_cond, Stmt* arg_body, Stmt* arg_timeout_stmts,
attached = 0;
is_return = arg_is_return;
location = arg_location;
timeout_value = -1;
++total_triggers;
@ -133,17 +134,22 @@ Trigger::Trigger(Expr* arg_cond, Stmt* arg_body, Stmt* arg_timeout_stmts,
Val* timeout_val = arg_timeout ? arg_timeout->Eval(arg_frame) : 0;
if ( timeout_val )
{
Unref(timeout_val);
timeout_value = timeout_val->AsInterval();
}
// Make sure we don't get deleted if somebody calls a method like
// Timeout() while evaluating the trigger.
Ref(this);
if ( ! Eval() && timeout_val )
if ( ! Eval() && timeout_value >= 0 )
{
timer = new TriggerTimer(timeout_val->AsInterval(), this);
timer = new TriggerTimer(timeout_value, this);
timer_mgr->Add(timer);
}
Unref(timeout_val);
Unref(this);
}

View file

@ -32,6 +32,10 @@ public:
// Executes timeout code and deletes the object.
void Timeout();
// Return the timeout interval (negative if none was specified).
double TimeoutValue() const
{ return timeout_value; }
// Called if another entity needs to complete its operations first
// in any case before this trigger can proceed.
void Hold() { delayed = true; }
@ -87,6 +91,7 @@ private:
Stmt* body;
Stmt* timeout_stmts;
Expr* timeout;
double timeout_value;
Frame* frame;
bool is_return;
const Location* location;

View file

@ -5,12 +5,20 @@ include_directories(BEFORE
${CMAKE_CURRENT_BINARY_DIR}
)
if ( ROCKSDB_INCLUDE_DIR )
add_definitions(-DHAVE_ROCKSDB)
include_directories(BEFORE ${ROCKSDB_INCLUDE_DIR})
endif ()
set(comm_SRCS
Data.cc
Manager.cc
Store.cc
)
bif_target(comm.bif)
bif_target(data.bif)
bif_target(messaging.bif)
bif_target(store.bif)
bro_add_subdir_library(comm ${comm_SRCS} ${BIF_OUTPUT_CC})
add_dependencies(bro_comm generate_outputs)

View file

@ -1,5 +1,5 @@
#include "Data.h"
#include "comm/comm.bif.h"
#include "comm/data.bif.h"
using namespace std;

View file

@ -1,12 +1,15 @@
#include "Manager.h"
#include "Data.h"
#include "Store.h"
#include <broker/broker.hh>
#include <cstdio>
#include <unistd.h>
#include "util.h"
#include "Var.h"
#include "Reporter.h"
#include "comm/comm.bif.h"
#include "comm/data.bif.h"
#include "comm/messaging.bif.h"
#include "comm/store.bif.h"
#include "logging/Manager.h"
using namespace std;
@ -17,6 +20,12 @@ int comm::Manager::send_flags_self_idx;
int comm::Manager::send_flags_peers_idx;
int comm::Manager::send_flags_unsolicited_idx;
comm::Manager::~Manager()
{
for ( auto& s : data_stores )
CloseStore(s.first);
}
bool comm::Manager::InitPreScript()
{
return true;
@ -47,6 +56,7 @@ bool comm::Manager::InitPostScript()
comm::opaque_of_table_iterator = new OpaqueType("Comm::TableIterator");
comm::opaque_of_vector_iterator = new OpaqueType("Comm::VectorIterator");
comm::opaque_of_record_iterator = new OpaqueType("Comm::RecordIterator");
comm::opaque_of_store_handle = new OpaqueType("Store::Handle");
vector_of_data_type = new VectorType(internal_type("Comm::Data")->Ref());
auto res = broker::init();
@ -385,6 +395,9 @@ void comm::Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
for ( const auto& ps : log_subscriptions )
read->Insert(ps.second.fd());
for ( const auto& s : data_stores )
read->Insert(s.second->store->responses().fd());
}
double comm::Manager::NextTimestamp(double* local_network_time)
@ -393,6 +406,49 @@ double comm::Manager::NextTimestamp(double* local_network_time)
return timer_mgr->Time();
}
struct response_converter {
using result_type = RecordVal*;
result_type operator()(bool d)
{
return comm::make_data_val(broker::data{d});
}
result_type operator()(uint64_t d)
{
return comm::make_data_val(broker::data{d});
}
result_type operator()(broker::data& d)
{
return comm::make_data_val(move(d));
}
result_type operator()(std::vector<broker::data>& d)
{
return comm::make_data_val(broker::data{move(d)});
}
result_type operator()(broker::store::snapshot& d)
{
broker::table table;
for ( auto& item : d.entries )
{
auto& key = item.first;
auto& val = item.second.item;
table[move(key)] = move(val);
}
return comm::make_data_val(broker::data{move(table)});
}
};
static RecordVal* response_to_val(broker::store::response r)
{
return broker::visit(response_converter{}, r.reply.value);
}
void comm::Manager::Process()
{
bool idle = true;
@ -624,5 +680,92 @@ void comm::Manager::Process()
}
}
for ( const auto& s : data_stores )
{
auto responses = s.second->store->responses().want_pop();
if ( responses.empty() )
continue;
idle = false;
for ( auto& response : responses )
{
auto ck = static_cast<StoreQueryCallback*>(response.cookie);
auto it = pending_queries.find(ck);
if ( it == pending_queries.end() )
{
reporter->Warning("unmatched response to query on store %s",
s.second->store->id().data());
continue;
}
auto query = *it;
switch ( response.reply.stat ) {
case broker::store::result::status::timeout:
// Fine, trigger's timeout takes care of things.
break;
case broker::store::result::status::failure:
query->Result(query_result());
break;
case broker::store::result::status::success:
query->Result(query_result(response_to_val(move(response))));
break;
default:
reporter->InternalWarning("unknown store response status: %d",
static_cast<int>(response.reply.stat));
break;
}
pending_queries.erase(it);
}
}
SetIdle(idle);
}
bool comm::Manager::AddStore(StoreHandleVal* handle)
{
if ( ! handle->store )
return false;
if ( data_stores.find(handle->store->id()) != data_stores.end() )
return false;
data_stores[handle->store->id()] = handle;
Ref(handle);
return true;
}
bool comm::Manager::CloseStore(const broker::store::identifier& id)
{
auto it = data_stores.find(id);
if ( it == data_stores.end() )
return false;
for ( auto it = pending_queries.begin(); it != pending_queries.end(); )
{
auto query = *it;
if ( query->StoreID() == id )
{
it = pending_queries.erase(it);
query->Abort();
delete query;
}
else
++it;
}
it->second->store = nullptr;
Unref(it->second);
return true;
}
bool comm::Manager::TrackStoreQuery(StoreQueryCallback* cb)
{
return pending_queries.insert(cb).second;
}

View file

@ -6,6 +6,8 @@
#include <memory>
#include <string>
#include <map>
#include <unordered_set>
#include "comm/Store.h"
#include "Reporter.h"
#include "iosource/IOSource.h"
#include "Val.h"
@ -17,8 +19,11 @@ namespace comm {
// Manages various forms of communication between peer Bro processes
// or possibly between different parts of a single Bro process.
class Manager : public iosource::IOSource {
friend class StoreHandleVal;
public:
~Manager();
bool InitPreScript();
bool InitPostScript();
@ -56,6 +61,12 @@ public:
bool UnsubscribeToLogs(const std::string& topic_prefix);
bool AddStore(StoreHandleVal* handle);
bool CloseStore(const broker::store::identifier& id);
bool TrackStoreQuery(StoreQueryCallback* cb);
static int GetFlags(Val* flags);
private:
@ -71,12 +82,18 @@ private:
const char* Tag() override
{ return "Comm::Manager"; }
broker::endpoint& Endpoint()
{ return *endpoint; }
std::unique_ptr<broker::endpoint> endpoint;
std::map<std::pair<std::string, uint16_t>, broker::peering> peers;
std::map<std::string, broker::message_queue> print_subscriptions;
std::map<std::string, broker::message_queue> event_subscriptions;
std::map<std::string, broker::message_queue> log_subscriptions;
std::map<broker::store::identifier, StoreHandleVal*> data_stores;
std::unordered_set<StoreQueryCallback*> pending_queries;
static VectorType* vector_of_data_type;
static EnumType* log_id_type;
static int send_flags_self_idx;

141
src/comm/Store.cc Normal file
View file

@ -0,0 +1,141 @@
#include "Store.h"
#include "comm/Manager.h"
#include <broker/store/master.hh>
#include <broker/store/clone.hh>
#include <broker/store/sqlite_backend.hh>
#ifdef HAVE_ROCKSDB
#include <broker/store/rocksdb_backend.hh>
#include <rocksdb/db.h>
#endif
OpaqueType* comm::opaque_of_store_handle;
comm::StoreHandleVal::StoreHandleVal(broker::store::identifier id,
comm::StoreType arg_type,
broker::util::optional<BifEnum::Store::BackendType> arg_back,
RecordVal* backend_options, std::chrono::duration<double> resync)
: OpaqueVal(opaque_of_store_handle),
store(), store_type(arg_type), backend_type(arg_back)
{
using BifEnum::Store::BackendType;
std::unique_ptr<broker::store::backend> backend;
if ( backend_type )
switch ( *backend_type ) {
case BackendType::MEMORY:
backend.reset(new broker::store::memory_backend);
break;
case BackendType::SQLITE:
{
auto sqlite = new broker::store::sqlite_backend;
std::string path = backend_options->Lookup(0)->AsRecordVal()
->Lookup(0)->AsStringVal()->CheckString();
if ( sqlite->open(path) )
backend.reset(sqlite);
else
{
reporter->Error("failed to open sqlite backend at path %s: %s",
path.data(), sqlite->last_error().data());
delete sqlite;
}
}
break;
case BackendType::ROCKSDB:
{
#ifdef HAVE_ROCKSDB
std::string path = backend_options->Lookup(1)->AsRecordVal()
->Lookup(0)->AsStringVal()->CheckString();
bool use_merge_op = backend_options->Lookup(1)->AsRecordVal()
->Lookup(1)->AsBool();
rocksdb::Options rock_op;
rock_op.create_if_missing = true;
if ( use_merge_op )
options.merge_operator.reset(new rocksdb_merge_operator);
auto rocksdb = new broker::store::rocksdb_backend;
if ( rocksdb->open(path, options).ok() )
backend.reset(rocksdb);
else
{
reporter->Error("failed to open rocksdb backend at path %s: %s",
path.data(), rocksdb->last_error().data());
delete rocksdb;
}
#else
reporter->Error("rocksdb backend support is not enabled");
#endif
}
break;
default:
reporter->FatalError("unknown data store backend: %d",
static_cast<int>(*backend_type));
}
switch ( store_type ) {
case StoreType::FRONTEND:
store.reset(new broker::store::frontend(comm_mgr->Endpoint(),
move(id)));
break;
case StoreType::MASTER:
store.reset(new broker::store::master(comm_mgr->Endpoint(),
move(id), move(backend)));
break;
case StoreType::CLONE:
store.reset(new broker::store::clone(comm_mgr->Endpoint(),
move(id), resync,
move(backend)));
break;
default:
reporter->FatalError("unknown data store type: %d",
static_cast<int>(store_type));
}
}
void comm::StoreHandleVal::ValDescribe(ODesc* d) const
{
using BifEnum::Store::BackendType;
d->Add("broker::store::");
switch ( store_type ) {
case StoreType::FRONTEND:
d->Add("frontend");
break;
case StoreType::MASTER:
d->Add("master");
break;
case StoreType::CLONE:
d->Add("clone");
break;
default:
d->Add("unknown");
}
d->Add("{");
d->Add(store->id());
if ( backend_type )
{
d->Add(", ");
switch ( *backend_type ) {
case BackendType::MEMORY:
d->Add("memory");
break;
case BackendType::SQLITE:
d->Add("sqlite");
break;
case BackendType::ROCKSDB:
d->Add("rocksdb");
break;
default:
d->Add("unknown");
}
}
d->Add("}");
}

113
src/comm/Store.h Normal file
View file

@ -0,0 +1,113 @@
#ifndef BRO_COMM_STORE_H
#define BRO_COMM_STORE_H
#include "comm/store.bif.h"
#include "comm/data.bif.h"
#include "Reporter.h"
#include "Type.h"
#include "Val.h"
#include "Trigger.h"
#include <broker/store/frontend.hh>
namespace comm {
extern OpaqueType* opaque_of_store_handle;
enum StoreType {
FRONTEND,
MASTER,
CLONE,
};
inline EnumVal* query_status(bool success)
{
static EnumType* store_query_status = nullptr;
static int success_val;
static int failure_val;
if ( ! store_query_status )
{
store_query_status = internal_type("Store::QueryStatus")->AsEnumType();
success_val = store_query_status->Lookup("Store", "SUCCESS");
failure_val = store_query_status->Lookup("Store", "FAILURE");
}
return new EnumVal(success ? success_val : failure_val, store_query_status);
}
inline RecordVal* query_result()
{
auto rval = new RecordVal(BifType::Record::Store::QueryResult);
rval->Assign(0, query_status(false));
rval->Assign(1, new RecordVal(BifType::Record::Comm::Data));
return rval;
}
inline RecordVal* query_result(RecordVal* data)
{
auto rval = new RecordVal(BifType::Record::Store::QueryResult);
rval->Assign(0, query_status(true));
rval->Assign(1, data);
return rval;
}
class StoreQueryCallback {
public:
StoreQueryCallback(Trigger* arg_trigger, const CallExpr* arg_call,
broker::store::identifier arg_store_id)
: trigger(arg_trigger), call(arg_call), store_id(move(arg_store_id))
{
Ref(trigger);
}
~StoreQueryCallback()
{
Unref(trigger);
}
void Result(RecordVal* result)
{
trigger->Cache(call, result);
trigger->Release();
Unref(result);
}
void Abort()
{
auto result = query_result();
trigger->Cache(call, result);
trigger->Release();
Unref(result);
}
const broker::store::identifier& StoreID() const
{ return store_id; }
private:
Trigger* trigger;
const CallExpr* call;
broker::store::identifier store_id;
};
class StoreHandleVal : public OpaqueVal {
public:
StoreHandleVal(broker::store::identifier id,
comm::StoreType arg_type,
broker::util::optional<BifEnum::Store::BackendType> arg_back,
RecordVal* backend_options,
std::chrono::duration<double> resync = std::chrono::seconds(1));
void ValDescribe(ODesc* d) const override;
std::unique_ptr<broker::store::frontend> store;
comm::StoreType store_type;
broker::util::optional<BifEnum::Store::BackendType> backend_type;
};
} // namespace comm
#endif // BRO_COMM_STORE_H

View file

@ -1,8 +1,8 @@
##! Functions for inspecting and manipulating broker data.
%%{
#include "comm/Manager.h"
#include "comm/Data.h"
#include "logging/Manager.h"
%%}
module Comm;
@ -25,12 +25,8 @@ enum DataType %{
RECORD,
%}
type Comm::SendFlags: record;
type Comm::Data: record;
type Comm::EventArgs: record;
type Comm::TableItem: record;
function Comm::data%(d: any%): Comm::Data
@ -507,146 +503,3 @@ function Comm::record_iterator_value%(it: opaque of Comm::RecordIterator%): Comm
rval->Assign(0, new comm::DataVal(**ri->it));
return rval;
%}
event Comm::remote_connection_established%(peer_address: string,
peer_port: port,
peer_name: string%);
event Comm::remote_connection_broken%(peer_address: string,
peer_port: port%);
event Comm::remote_connection_incompatible%(peer_address: string,
peer_port: port%);
function Comm::listen%(p: port, a: string &default = "",
reuse: bool &default = T%): bool
%{
if ( ! p->IsTCP() )
{
reporter->Error("listen port must use tcp");
return new Val(false, TYPE_BOOL);
}
auto rval = comm_mgr->Listen(p->Port(), a->Len() ? a->CheckString() : 0,
reuse);
return new Val(rval, TYPE_BOOL);
%}
function Comm::connect%(a: string, p: port, retry: interval%): bool
%{
if ( ! p->IsTCP() )
{
reporter->Error("remote connection port must use tcp");
return new Val(false, TYPE_BOOL);
}
auto rval = comm_mgr->Connect(a->CheckString(), p->Port(),
std::chrono::duration<double>(retry));
return new Val(rval, TYPE_BOOL);
%}
function Comm::disconnect%(a: string, p: port%): bool
%{
if ( ! p->IsTCP() )
{
reporter->Error("remote connection port must use tcp");
return new Val(false, TYPE_BOOL);
}
auto rval = comm_mgr->Disconnect(a->CheckString(), p->Port());
return new Val(rval, TYPE_BOOL);
%}
event Comm::print_handler%(msg: string%);
function Comm::print%(topic: string, msg: string,
flags: SendFlags &default = SendFlags()%): bool
%{
auto rval = comm_mgr->Print(topic->CheckString(), msg->CheckString(),
flags);
return new Val(rval, TYPE_BOOL);
%}
function Comm::subscribe_to_prints%(topic_prefix: string%): bool
%{
auto rval = comm_mgr->SubscribeToPrints(topic_prefix->CheckString());
return new Val(rval, TYPE_BOOL);
%}
function Comm::unsubscribe_to_prints%(topic_prefix: string%): bool
%{
auto rval = comm_mgr->UnsubscribeToPrints(topic_prefix->CheckString());
return new Val(rval, TYPE_BOOL);
%}
function Comm::event_args%(...%): Comm::EventArgs
%{
auto rval = comm_mgr->MakeEventArgs(@ARGS@);
return rval;
%}
function Comm::event%(topic: string, args: Comm::EventArgs,
flags: SendFlags &default = SendFlags()%): bool
%{
auto rval = comm_mgr->Event(topic->CheckString(), args->AsRecordVal(),
flags);
return new Val(rval, TYPE_BOOL);
%}
function Comm::auto_event%(topic: string, ev: any,
flags: SendFlags &default = SendFlags()%): bool
%{
auto rval = comm_mgr->AutoEvent(topic->CheckString(), ev, flags);
return new Val(rval, TYPE_BOOL);
%}
function Comm::auto_event_stop%(topic: string, ev: any%): bool
%{
auto rval = comm_mgr->AutoEventStop(topic->CheckString(), ev);
return new Val(rval, TYPE_BOOL);
%}
function Comm::subscribe_to_events%(topic_prefix: string%): bool
%{
auto rval = comm_mgr->SubscribeToEvents(topic_prefix->CheckString());
return new Val(rval, TYPE_BOOL);
%}
function Comm::unsubscribe_to_events%(topic_prefix: string%): bool
%{
auto rval = comm_mgr->UnsubscribeToEvents(topic_prefix->CheckString());
return new Val(rval, TYPE_BOOL);
%}
function
Comm::enable_remote_logs%(id: Log::ID,
flags: SendFlags &default = SendFlags()%): bool
%{
auto rval = log_mgr->EnableRemoteLogs(id->AsEnumVal(),
comm::Manager::GetFlags(flags));
return new Val(rval, TYPE_BOOL);
%}
function Comm::disable_remote_logs%(id: Log::ID%): bool
%{
auto rval = log_mgr->DisableRemoteLogs(id->AsEnumVal());
return new Val(rval, TYPE_BOOL);
%}
function Comm::remote_logs_enabled%(id: Log::ID%): bool
%{
auto rval = log_mgr->RemoteLogsAreEnabled(id->AsEnumVal());
return new Val(rval, TYPE_BOOL);
%}
function Comm::subscribe_to_logs%(topic_prefix: string%): bool
%{
auto rval = comm_mgr->SubscribeToLogs(topic_prefix->CheckString());
return new Val(rval, TYPE_BOOL);
%}
function Comm::unsubscribe_to_logs%(topic_prefix: string%): bool
%{
auto rval = comm_mgr->UnsubscribeToLogs(topic_prefix->CheckString());
return new Val(rval, TYPE_BOOL);
%}

156
src/comm/messaging.bif Normal file
View file

@ -0,0 +1,156 @@
##! Functions for peering and various messaging patterns (e.g. print/log/event).
%%{
#include "comm/Manager.h"
#include "logging/Manager.h"
%%}
module Comm;
type Comm::SendFlags: record;
type Comm::EventArgs: record;
event Comm::remote_connection_established%(peer_address: string,
peer_port: port,
peer_name: string%);
event Comm::remote_connection_broken%(peer_address: string,
peer_port: port%);
event Comm::remote_connection_incompatible%(peer_address: string,
peer_port: port%);
function Comm::listen%(p: port, a: string &default = "",
reuse: bool &default = T%): bool
%{
if ( ! p->IsTCP() )
{
reporter->Error("listen port must use tcp");
return new Val(false, TYPE_BOOL);
}
auto rval = comm_mgr->Listen(p->Port(), a->Len() ? a->CheckString() : 0,
reuse);
return new Val(rval, TYPE_BOOL);
%}
function Comm::connect%(a: string, p: port, retry: interval%): bool
%{
if ( ! p->IsTCP() )
{
reporter->Error("remote connection port must use tcp");
return new Val(false, TYPE_BOOL);
}
auto rval = comm_mgr->Connect(a->CheckString(), p->Port(),
std::chrono::duration<double>(retry));
return new Val(rval, TYPE_BOOL);
%}
function Comm::disconnect%(a: string, p: port%): bool
%{
if ( ! p->IsTCP() )
{
reporter->Error("remote connection port must use tcp");
return new Val(false, TYPE_BOOL);
}
auto rval = comm_mgr->Disconnect(a->CheckString(), p->Port());
return new Val(rval, TYPE_BOOL);
%}
event Comm::print_handler%(msg: string%);
function Comm::print%(topic: string, msg: string,
flags: SendFlags &default = SendFlags()%): bool
%{
auto rval = comm_mgr->Print(topic->CheckString(), msg->CheckString(),
flags);
return new Val(rval, TYPE_BOOL);
%}
function Comm::subscribe_to_prints%(topic_prefix: string%): bool
%{
auto rval = comm_mgr->SubscribeToPrints(topic_prefix->CheckString());
return new Val(rval, TYPE_BOOL);
%}
function Comm::unsubscribe_to_prints%(topic_prefix: string%): bool
%{
auto rval = comm_mgr->UnsubscribeToPrints(topic_prefix->CheckString());
return new Val(rval, TYPE_BOOL);
%}
function Comm::event_args%(...%): Comm::EventArgs
%{
auto rval = comm_mgr->MakeEventArgs(@ARGS@);
return rval;
%}
function Comm::event%(topic: string, args: Comm::EventArgs,
flags: SendFlags &default = SendFlags()%): bool
%{
auto rval = comm_mgr->Event(topic->CheckString(), args->AsRecordVal(),
flags);
return new Val(rval, TYPE_BOOL);
%}
function Comm::auto_event%(topic: string, ev: any,
flags: SendFlags &default = SendFlags()%): bool
%{
auto rval = comm_mgr->AutoEvent(topic->CheckString(), ev, flags);
return new Val(rval, TYPE_BOOL);
%}
function Comm::auto_event_stop%(topic: string, ev: any%): bool
%{
auto rval = comm_mgr->AutoEventStop(topic->CheckString(), ev);
return new Val(rval, TYPE_BOOL);
%}
function Comm::subscribe_to_events%(topic_prefix: string%): bool
%{
auto rval = comm_mgr->SubscribeToEvents(topic_prefix->CheckString());
return new Val(rval, TYPE_BOOL);
%}
function Comm::unsubscribe_to_events%(topic_prefix: string%): bool
%{
auto rval = comm_mgr->UnsubscribeToEvents(topic_prefix->CheckString());
return new Val(rval, TYPE_BOOL);
%}
function
Comm::enable_remote_logs%(id: Log::ID,
flags: SendFlags &default = SendFlags()%): bool
%{
auto rval = log_mgr->EnableRemoteLogs(id->AsEnumVal(),
comm::Manager::GetFlags(flags));
return new Val(rval, TYPE_BOOL);
%}
function Comm::disable_remote_logs%(id: Log::ID%): bool
%{
auto rval = log_mgr->DisableRemoteLogs(id->AsEnumVal());
return new Val(rval, TYPE_BOOL);
%}
function Comm::remote_logs_enabled%(id: Log::ID%): bool
%{
auto rval = log_mgr->RemoteLogsAreEnabled(id->AsEnumVal());
return new Val(rval, TYPE_BOOL);
%}
function Comm::subscribe_to_logs%(topic_prefix: string%): bool
%{
auto rval = comm_mgr->SubscribeToLogs(topic_prefix->CheckString());
return new Val(rval, TYPE_BOOL);
%}
function Comm::unsubscribe_to_logs%(topic_prefix: string%): bool
%{
auto rval = comm_mgr->UnsubscribeToLogs(topic_prefix->CheckString());
return new Val(rval, TYPE_BOOL);
%}

378
src/comm/store.bif Normal file
View file

@ -0,0 +1,378 @@
##! Functions to interface with broker's distributed data store.
%%{
#include "comm/Manager.h"
#include "comm/Store.h"
#include "comm/Data.h"
#include "Trigger.h"
%%}
module Store;
type Store::ExpiryTime: record;
type Store::QueryResult: record;
type Store::BackendOptions: record;
enum BackendType %{
MEMORY,
SQLITE,
ROCKSDB,
%}
function Store::create_master%(id: string, b: BackendType &default = MEMORY,
options: BackendOptions &default = BackendOptions()%): opaque of Store::Handle
%{
auto rval = new comm::StoreHandleVal(id->CheckString(), comm::StoreType::MASTER,
static_cast<BifEnum::Store::BackendType>(b->AsEnum()),
options->AsRecordVal());
comm_mgr->AddStore(rval);
return rval;
%}
function Store::create_clone%(id: string, b: BackendType &default = MEMORY,
options: BackendOptions &default = BackendOptions(),
resync: interval &default = 1sec%): opaque of Store::Handle
%{
auto rval = new comm::StoreHandleVal(id->CheckString(), comm::StoreType::CLONE,
static_cast<BifEnum::Store::BackendType>(b->AsEnum()),
options->AsRecordVal(),
std::chrono::duration<double>(resync));
comm_mgr->AddStore(rval);
return rval;
%}
function Store::create_frontend%(id: string%): opaque of Store::Handle
%{
auto rval = new comm::StoreHandleVal(id->CheckString(), comm::StoreType::FRONTEND,
{}, nullptr);
comm_mgr->AddStore(rval);
return rval;
%}
function Store::close_by_name%(id: string%): bool
%{
return new Val(comm_mgr->CloseStore(id->CheckString()), TYPE_BOOL);
%}
function Store::close_by_handle%(h: opaque of Store::Handle%): bool
%{
auto handle = static_cast<comm::StoreHandleVal*>(h);
if ( ! handle->store )
return new Val(false, TYPE_BOOL);
return new Val(comm_mgr->CloseStore(handle->store->id()), TYPE_BOOL);
%}
###########################
# non-blocking update API #
###########################
function Store::insert%(h: opaque of Store::Handle,
k: Comm::Data, v: Comm::Data,
e: Store::ExpiryTime &default = Store::ExpiryTime()%): bool
%{
auto handle = static_cast<comm::StoreHandleVal*>(h);
if ( ! handle->store )
return new Val(false, TYPE_BOOL);
auto& key = comm::opaque_field_to_data(k->AsRecordVal(), frame);
auto& val = comm::opaque_field_to_data(v->AsRecordVal(), frame);
broker::util::optional<broker::store::expiration_time> expiry;
auto abs_expiry_val = e->AsRecordVal()->Lookup(0);
auto rel_expiry_val = e->AsRecordVal()->Lookup(1);
if ( abs_expiry_val )
{
auto tag = broker::store::expiration_time::tag::absolute;
expiry = broker::store::expiration_time(abs_expiry_val->AsTime(), tag);
}
else if ( rel_expiry_val )
{
auto tag = broker::store::expiration_time::tag::since_last_modification;
expiry = broker::store::expiration_time(rel_expiry_val->AsInterval(), tag);
}
handle->store->insert(key, val, expiry);
return new Val(true, TYPE_BOOL);
%}
function Store::erase%(h: opaque of Store::Handle, k: Comm::Data%): bool
%{
auto handle = static_cast<comm::StoreHandleVal*>(h);
if ( ! handle->store )
return new Val(false, TYPE_BOOL);
auto& key = comm::opaque_field_to_data(k->AsRecordVal(), frame);
handle->store->erase(key);
return new Val(true, TYPE_BOOL);
%}
function Store::clear%(h: opaque of Store::Handle%): bool
%{
auto handle = static_cast<comm::StoreHandleVal*>(h);
if ( ! handle->store )
return new Val(false, TYPE_BOOL);
handle->store->clear();
return new Val(true, TYPE_BOOL);
%}
function Store::increment%(h: opaque of Store::Handle,
k: Comm::Data, by: int%): bool
%{
auto handle = static_cast<comm::StoreHandleVal*>(h);
if ( ! handle->store )
return new Val(false, TYPE_BOOL);
auto& key = comm::opaque_field_to_data(k->AsRecordVal(), frame);
handle->store->increment(key, by);
return new Val(true, TYPE_BOOL);
%}
function Store::decrement%(h: opaque of Store::Handle,
k: Comm::Data, by: int%): bool
%{
auto handle = static_cast<comm::StoreHandleVal*>(h);
if ( ! handle->store )
return new Val(false, TYPE_BOOL);
auto& key = comm::opaque_field_to_data(k->AsRecordVal(), frame);
handle->store->decrement(key, by);
return new Val(true, TYPE_BOOL);
%}
function Store::add_to_set%(h: opaque of Store::Handle,
k: Comm::Data, element: Comm::Data%): bool
%{
auto handle = static_cast<comm::StoreHandleVal*>(h);
if ( ! handle->store )
return new Val(false, TYPE_BOOL);
auto& key = comm::opaque_field_to_data(k->AsRecordVal(), frame);
auto& ele = comm::opaque_field_to_data(element->AsRecordVal(), frame);
handle->store->add_to_set(key, ele);
return new Val(true, TYPE_BOOL);
%}
function Store::remove_from_set%(h: opaque of Store::Handle,
k: Comm::Data, element: Comm::Data%): bool
%{
auto handle = static_cast<comm::StoreHandleVal*>(h);
if ( ! handle->store )
return new Val(false, TYPE_BOOL);
auto& key = comm::opaque_field_to_data(k->AsRecordVal(), frame);
auto& ele = comm::opaque_field_to_data(element->AsRecordVal(), frame);
handle->store->remove_from_set(key, ele);
return new Val(true, TYPE_BOOL);
%}
function Store::push_left%(h: opaque of Store::Handle, k: Comm::Data,
items: Comm::DataVector%): bool
%{
auto handle = static_cast<comm::StoreHandleVal*>(h);
if ( ! handle->store )
return new Val(false, TYPE_BOOL);
auto& key = comm::opaque_field_to_data(k->AsRecordVal(), frame);
broker::vector items_vector;
auto items_vv = items->AsVector();
for ( auto i = 0u; i < items_vv->size(); ++i )
{
auto& item = comm::opaque_field_to_data((*items_vv)[i]->AsRecordVal(),
frame);
items_vector.emplace_back(item);
}
handle->store->push_left(key, move(items_vector));
return new Val(true, TYPE_BOOL);
%}
function Store::push_right%(h: opaque of Store::Handle, k: Comm::Data,
items: Comm::DataVector%): bool
%{
auto handle = static_cast<comm::StoreHandleVal*>(h);
if ( ! handle->store )
return new Val(false, TYPE_BOOL);
auto& key = comm::opaque_field_to_data(k->AsRecordVal(), frame);
broker::vector items_vector;
auto items_vv = items->AsVector();
for ( auto i = 0u; i < items_vv->size(); ++i )
{
auto& item = comm::opaque_field_to_data((*items_vv)[i]->AsRecordVal(),
frame);
items_vector.emplace_back(item);
}
handle->store->push_right(key, move(items_vector));
return new Val(true, TYPE_BOOL);
%}
##########################
# non-blocking query API #
##########################
%%{
static bool prepare_for_query(Val* opaque, Frame* frame,
comm::StoreHandleVal** handle,
double* timeout,
comm::StoreQueryCallback** cb)
{
*handle = static_cast<comm::StoreHandleVal*>(opaque);
if ( ! (*handle)->store )
return false;
Trigger* trigger = frame->GetTrigger();
if ( ! trigger )
{
reporter->PushLocation(frame->GetCall()->GetLocationInfo());
reporter->Error("Store queries can only be called inside when-condition");
reporter->PopLocation();
return false;
}
*timeout = trigger->TimeoutValue();
if ( *timeout < 0 )
{
reporter->PushLocation(frame->GetCall()->GetLocationInfo());
reporter->Error("Store queries must specify a timeout block");
reporter->PopLocation();
return false;
}
frame->SetDelayed();
trigger->Hold();
*cb = new comm::StoreQueryCallback(trigger, frame->GetCall(),
(*handle)->store->id());
comm_mgr->TrackStoreQuery(*cb);
return true;
}
%%}
function Store::pop_left%(h: opaque of Store::Handle,
k: Comm::Data%): Store::QueryResult
%{
double timeout;
comm::StoreQueryCallback* cb;
comm::StoreHandleVal* handle;
if ( ! prepare_for_query(h, frame, &handle, &timeout, &cb) )
return comm::query_result();
Val* key = k->AsRecordVal()->Lookup(0);
if ( ! key )
return comm::query_result();
handle->store->pop_left(static_cast<comm::DataVal*>(key)->data,
std::chrono::duration<double>(timeout), cb);
return 0;
%}
function Store::pop_right%(h: opaque of Store::Handle,
k: Comm::Data%): Store::QueryResult
%{
double timeout;
comm::StoreQueryCallback* cb;
comm::StoreHandleVal* handle;
if ( ! prepare_for_query(h, frame, &handle, &timeout, &cb) )
return comm::query_result();
Val* key = k->AsRecordVal()->Lookup(0);
if ( ! key )
return comm::query_result();
handle->store->pop_right(static_cast<comm::DataVal*>(key)->data,
std::chrono::duration<double>(timeout), cb);
return 0;
%}
function Store::lookup%(h: opaque of Store::Handle,
k: Comm::Data%): Store::QueryResult
%{
double timeout;
comm::StoreQueryCallback* cb;
comm::StoreHandleVal* handle;
if ( ! prepare_for_query(h, frame, &handle, &timeout, &cb) )
return comm::query_result();
Val* key = k->AsRecordVal()->Lookup(0);
if ( ! key )
return comm::query_result();
handle->store->lookup(static_cast<comm::DataVal*>(key)->data,
std::chrono::duration<double>(timeout), cb);
return 0;
%}
function Store::exists%(h: opaque of Store::Handle,
k: Comm::Data%): Store::QueryResult
%{
double timeout;
comm::StoreQueryCallback* cb;
comm::StoreHandleVal* handle;
if ( ! prepare_for_query(h, frame, &handle, &timeout, &cb) )
return comm::query_result();
Val* key = k->AsRecordVal()->Lookup(0);
if ( ! key )
return comm::query_result();
handle->store->exists(static_cast<comm::DataVal*>(key)->data,
std::chrono::duration<double>(timeout), cb);
return 0;
%}
function Store::keys%(h: opaque of Store::Handle%): Store::QueryResult
%{
double timeout;
comm::StoreQueryCallback* cb;
comm::StoreHandleVal* handle;
if ( ! prepare_for_query(h, frame, &handle, &timeout, &cb) )
return comm::query_result();
handle->store->keys(std::chrono::duration<double>(timeout), cb);
return 0;
%}
function Store::size%(h: opaque of Store::Handle%): Store::QueryResult
%{
double timeout;
comm::StoreQueryCallback* cb;
comm::StoreHandleVal* handle;
if ( ! prepare_for_query(h, frame, &handle, &timeout, &cb) )
return comm::query_result();
handle->store->size(std::chrono::duration<double>(timeout), cb);
return 0;
%}