diff --git a/scripts/base/frameworks/comm/main.bro b/scripts/base/frameworks/comm/main.bro index 66dc1715f4..da910f20bf 100644 --- a/scripts/base/frameworks/comm/main.bro +++ b/scripts/base/frameworks/comm/main.bro @@ -1,32 +1,53 @@ +##! Various data structure definitions for use with Bro's communication system. module Comm; export { + ## A name used to identify this endpoint to peers. + ## .. bro:see:: Comm::connect Comm::listen const endpoint_name = "" &redef; + ## Change communication behavior. type EndpointFlags: record { + ## Whether to restrict message topics that can be published to peers. auto_publish: bool &default = T; + ## Whether to restrict what message topics or data store identifiers + ## the local endpoint advertises to peers (e.g. subscribing to + ## events or making a master data store available). auto_advertise: bool &default = T; }; + ## Fine-grained tuning of communication behavior for a particular message. type SendFlags: record { + ## Send the message to the local endpoint. self: bool &default = F; + ## Send the message to peer endpoints that advertise interest in + ## the topic associated with the message. peers: bool &default = T; + ## Send the message to peer endpoints even if they don't advertise + ## interest in the topic associated with the message. unsolicited: bool &default = F; }; + ## Opaque communication data. type Data: record { d: opaque of Comm::Data &optional; }; + ## Opaque communication data. type DataVector: vector of Comm::Data; + ## Opaque event communication data. type EventArgs: record { - name: string &optional; # nil for invalid event/args. + ## The name of the event. Not set if invalid event or arguments. + name: string &optional; + ## The arguments to the event. args: DataVector; }; + ## Opaque communication data used as a convenient way to wrap key-value + ## pairs that comprise table entries. type Comm::TableItem : record { key: Comm::Data; val: Comm::Data; @@ -37,30 +58,44 @@ module Store; export { + ## Whether a data store query could be completed or not. type QueryStatus: enum { SUCCESS, FAILURE, }; + ## A expiry time for a key-value pair inserted in to a data store. type ExpiryTime: record { + ## Absolute point in time at which to expire the entry. absolute: time &optional; + ## A point in time relative to the last modification time at which + ## to expire the entry. New modifications will delay the expiration. since_last_modification: interval &optional; }; + ## The result of a data store query. type QueryResult: record { + ## Whether the query completed or not. status: Store::QueryStatus; + ## The result of the query. Certain queries may use a particular + ## data type (e.g. querying store size always returns a count, but + ## a lookup may return various data types). result: Comm::Data; }; + ## Options to tune the SQLite storage backend. type SQLiteOptions: record { + ## File system path of the database. path: string &default = "store.sqlite"; }; + ## Options to tune the RocksDB storage backend. type RocksDBOptions: record { + ## File system path of the database. path: string &default = "store.rocksdb"; - use_merge_operator: bool &default = F; }; + ## Options to tune the particular storage backends. type BackendOptions: record { sqlite: SQLiteOptions &default = SQLiteOptions(); rocksdb: RocksDBOptions &default = RocksDBOptions(); diff --git a/src/Trigger.h b/src/Trigger.h index 7662901dc5..3af9ddf1b0 100644 --- a/src/Trigger.h +++ b/src/Trigger.h @@ -55,6 +55,8 @@ public: // may not immediately delete it as other references may still exist. void Disable(); + bool Disabled() const { return disabled; } + virtual void Describe(ODesc* d) const { d->Add(""); } // Overidden from Notifier. We queue the trigger and evaluate it diff --git a/src/comm/Data.h b/src/comm/Data.h index ed3c16f677..ef7b15110d 100644 --- a/src/comm/Data.h +++ b/src/comm/Data.h @@ -15,18 +15,53 @@ extern OpaqueType* opaque_of_table_iterator; extern OpaqueType* opaque_of_vector_iterator; extern OpaqueType* opaque_of_record_iterator; +/** + * Convert a broker port protocol to a bro port protocol. + */ TransportProto to_bro_port_proto(broker::port::protocol tp); +/** + * Create a Comm::Data value from a Bro value. + * @param v the Bro value to convert to a Broker data value. + * @return a Comm::Data value, where the optional field is set if the conversion + * was possible, else it is unset. + */ RecordVal* make_data_val(Val* v); +/** + * Create a Comm::Data value from a Broker data value. + * @param d the Broker value to wrap in an opaque type. + * @return a Comm::Data value that wraps the Broker value. + */ RecordVal* make_data_val(broker::data d); +/** + * Get the type of Broker data that Comm::Data wraps. + * @param v a Comm::Data value. + * @param frame used to get location info upon error. + * @return a Comm::DataType value. + */ EnumVal* get_data_type(RecordVal* v, Frame* frame); +/** + * Convert a Bro value to a Broker data value. + * @param v a Bro value. + * @return a Broker data value if the Bro value could be converted to one. + */ broker::util::optional val_to_data(Val* v); +/** + * Convert a Broker data value to a Bro value. + * @param d a Broker data value. + * @param type the expected type of the value to return. + * @return a pointer to a new Bro value or a nullptr if the conversion was not + * possible. + */ Val* data_to_val(broker::data d, BroType* type); +/** + * A Bro value which wraps a Broker data value. + */ class DataVal : public OpaqueVal { public: @@ -51,6 +86,9 @@ protected: {} }; +/** + * Visitor for retrieving type names a Broker data value. + */ struct type_name_getter { using result_type = const char*; @@ -100,8 +138,25 @@ struct type_name_getter { { return "record"; } }; +/** + * Retrieve Broker data value associated with a Comm::Data Bro value. + * @param v a Comm::Data value. + * @param f used to get location information on error. + * @return a reference to the wrapped Broker data value. A runtime interpreter + * exception is thrown if the the optional opaque value of \a v is not set. + */ broker::data& opaque_field_to_data(RecordVal* v, Frame* f); +/** + * Retrieve variant data from a Broker data value. + * @tparam T a type that the variant may contain. + * @param d a Broker data value to get variant data out of. + * @param tag a Bro tag which corresponds to T (just used for error reporting). + * @param f used to get location information on error. + * @return a refrence to the requested type in the variant Broker data. + * A runtime interpret exception is thrown if trying to access a type which + * is not currently stored in the Broker data. + */ template T& require_data_type(broker::data& d, TypeTag tag, Frame* f) { @@ -116,12 +171,24 @@ T& require_data_type(broker::data& d, TypeTag tag, Frame* f) return *ptr; } +/** + * @see require_data_type() and opaque_field_to_data(). + */ template inline T& require_data_type(RecordVal* v, TypeTag tag, Frame* f) { return require_data_type(opaque_field_to_data(v, f), tag, f); } +/** + * Convert a Comm::Data Bro value to a Bro value of a given type. + * @tparam a type that a Broker data variant may contain. + * @param v a Comm::Data value. + * @param tag a Bro type to convert to. + * @param f used to get location information on error. + * A runtime interpret exception is thrown if trying to access a type which + * is not currently stored in the Broker data. + */ template inline Val* refine(RecordVal* v, TypeTag tag, Frame* f) { diff --git a/src/comm/Manager.cc b/src/comm/Manager.cc index 92b2c167dd..65a7bddbf6 100644 --- a/src/comm/Manager.cc +++ b/src/comm/Manager.cc @@ -41,7 +41,7 @@ static int require_field(RecordType* rt, const char* name) return rval; } -static int GetEndpointFlags(Val* broker_endpoint_flags) +static int endpoint_flags_to_int(Val* broker_endpoint_flags) { int rval = 0; auto r = broker_endpoint_flags->AsRecordVal(); @@ -111,7 +111,7 @@ bool comm::Manager::Enable(Val* broker_endpoint_flags) name = fmt("bro@.%ld", static_cast(getpid())); } - int flags = GetEndpointFlags(broker_endpoint_flags); + int flags = endpoint_flags_to_int(broker_endpoint_flags); endpoint = unique_ptr(new broker::endpoint(name, flags)); iosource_mgr->Register(this, true); return true; @@ -122,7 +122,7 @@ bool comm::Manager::SetEndpointFlags(Val* broker_endpoint_flags) if ( ! Enabled() ) return false; - int flags = GetEndpointFlags(broker_endpoint_flags); + int flags = endpoint_flags_to_int(broker_endpoint_flags); endpoint->set_flags(flags); return true; } @@ -179,7 +179,8 @@ bool comm::Manager::Print(string topic, string msg, Val* flags) if ( ! Enabled() ) return false; - endpoint->send(move(topic), broker::message{move(msg)}, GetFlags(flags)); + endpoint->send(move(topic), broker::message{move(msg)}, + send_flags_to_int(flags)); return true; } @@ -243,7 +244,7 @@ bool comm::Manager::Event(std::string topic, RecordVal* args, Val* flags) msg.emplace_back(data_val->data); } - endpoint->send(move(topic), move(msg), GetFlags(flags)); + endpoint->send(move(topic), move(msg), send_flags_to_int(flags)); return true; } @@ -275,7 +276,7 @@ bool comm::Manager::AutoEvent(string topic, Val* event, Val* flags) return false; } - handler->AutoRemote(move(topic), GetFlags(flags)); + handler->AutoRemote(move(topic), send_flags_to_int(flags)); return true; } @@ -484,7 +485,7 @@ bool comm::Manager::UnadvertiseTopic(broker::topic t) return true; } -int comm::Manager::GetFlags(Val* flags) +int comm::Manager::send_flags_to_int(Val* flags) { auto r = flags->AsRecordVal(); int rval = 0; @@ -869,6 +870,14 @@ void comm::Manager::Process() auto query = *it; + if ( query->Disabled() ) + { + // Trigger timer must have timed the query out already. + delete query; + pending_queries.erase(it); + continue; + } + switch ( response.reply.stat ) { case broker::store::result::status::timeout: // Fine, trigger's timeout takes care of things. @@ -885,6 +894,7 @@ void comm::Manager::Process() break; } + delete query; pending_queries.erase(it); } } @@ -1006,8 +1016,6 @@ bool comm::Manager::CloseStore(const broker::store::identifier& id, bool comm::Manager::TrackStoreQuery(StoreQueryCallback* cb) { - if ( ! Enabled() ) - return false; - + assert(Enabled()); return pending_queries.insert(cb).second; } diff --git a/src/comm/Manager.h b/src/comm/Manager.h index e8a8d5e5b1..bd1236bf34 100644 --- a/src/comm/Manager.h +++ b/src/comm/Manager.h @@ -14,73 +14,275 @@ namespace comm { -// TODO: documentation - -// Manages various forms of communication between peer Bro processes -// or possibly between different parts of a single Bro process. +/** + * Manages various forms of communication between peer Bro processes + * or other external applications via use of the Broker messaging library. + */ class Manager : public iosource::IOSource { friend class StoreHandleVal; public: + /** + * Destructor. Any still-pending data store queries are aborted. + */ ~Manager(); + /** + * Enable use of communication. + * @param flags used to tune the local Broker endpoint's behavior. + * See the Comm::EndpointFlags record type. + * @return true if communication is successfully initialized. + */ bool Enable(Val* flags); + /** + * Changes endpoint flags originally supplied to comm::Manager::Enable(). + * @param flags the new behavior flags to use. + * @return true if flags were changed. + */ bool SetEndpointFlags(Val* flags); + /** + * @return true if comm::Manager::Enable() has previously been called and + * it succeeded. + */ bool Enabled() { return endpoint != nullptr; } + /** + * Listen for remote connections. + * @param port the TCP port to listen on. + * @param addr an address string on which to accept connections, e.g. + * "127.0.0.1". A nullptr refers to @p INADDR_ANY. + * @param reuse_addr equivalent to behavior of SO_REUSEADDR. + * @return true if the local endpoint is now listening for connections. + */ bool Listen(uint16_t port, const char* addr = nullptr, bool reuse_addr = true); + /** + * Initiate a remote connection. + * @param addr an address to connect to, e.g. "localhost" or "127.0.0.1". + * @param port the TCP port on which the remote side is listening. + * @param retry_interval an interval at which to retry establishing the + * connection with the remote peer. + * @return true if it's possible to try connecting with the peer and + * it's a new peer. The actual connection may not be established until a + * later point in time. + */ bool Connect(std::string addr, uint16_t port, std::chrono::duration retry_interval); + /** + * Remove a remote connection. + * @param addr the address used in comm::Manager::Connect(). + * @param port the port used in comm::Manager::Connect(). + * @return true if the arguments match a previously successful call to + * comm::Manager::Connect(). + */ bool Disconnect(const std::string& addr, uint16_t port); + /** + * Print a simple message to any interested peers. + * @param topic a topic string associated with the print message. + * Peers advertise interest by registering a subscription to some prefix + * of this topic name. + * @param msg the string to send to peers. + * @param flags tune the behavior of how the message is send. + * See the Comm::SendFlags record type. + * @return true if the message is sent successfully. + */ bool Print(std::string topic, std::string msg, Val* flags); + /** + * Send an event to any interested peers. + * @param topic a topic string associated with the print message. + * Peers advertise interest by registering a subscription to some prefix + * of this topic name. + * @param msg the event to send to peers, which is the name of the event + * as a string followed by all of its arguments. + * @param flags tune the behavior of how the message is send. + * See the Comm::SendFlags record type. + * @return true if the message is sent successfully. + */ bool Event(std::string topic, broker::message msg, int flags); + + /** + * Send an event to any interested peers. + * @param topic a topic string associated with the print message. + * Peers advertise interest by registering a subscription to some prefix + * of this topic name. + * @param args the event and its arguments to send to peers. See the + * Comm::EventArgs record type. + * @param flags tune the behavior of how the message is send. + * See the Comm::SendFlags record type. + * @return true if the message is sent successfully. + */ bool Event(std::string topic, RecordVal* args, Val* flags); + /** + * Send a log entry to any interested peers. The topic name used is + * implicitly "bro/log/". + * @param stream_id the stream to which the log entry belongs. + * @param columns the data which comprises the log entry. + * @param flags tune the behavior of how the message is send. + * See the Comm::SendFlags record type. + * @return true if the message is sent successfully. + */ bool Log(EnumVal* stream_id, RecordVal* columns, int flags); + /** + * Automatically send an event to any interested peers whenever it is + * locally dispatched (e.g. using "event my_event(...);" in a script). + * @param topic a topic string associated with the event message. + * Peers advertise interest by registering a subscription to some prefix + * of this topic name. + * @param event a Bro event value. + * @param flags tune the behavior of how the message is send. + * See the Comm::SendFlags record type. + * @return true if automatic event sending is now enabled. + */ bool AutoEvent(std::string topic, Val* event, Val* flags); + /** + * Stop automatically sending an event to peers upon local dispatch. + * @param topic a topic originally given to comm::Manager::AutoEvent(). + * @param event an event originally given to comm::Manager::AutoEvent(). + * @return true if automatic events will no occur for the topic/event pair. + */ bool AutoEventStop(const std::string& topic, Val* event); + /** + * Create an EventArgs record value from an event and its arguments. + * @param args the event and its arguments. The event is always the first + * elements in the list. + * @return an EventArgs record value. If an invalid event or arguments + * were supplied the optional "name" field will not be set. + */ RecordVal* MakeEventArgs(val_list* args); + /** + * Register interest in peer print messages that use a certain topic prefix. + * @param topic_prefix a prefix to match against remote message topics. + * e.g. an empty prefix will match everything and "a" will match "alice" + * and "amy" but not "bob". + * @return true if it's a new print subscriptions and it is now registered. + */ bool SubscribeToPrints(std::string topic_prefix); + /** + * Unregister interest in peer print messages. + * @param topic_prefix a prefix previously supplied to a successful call + * to comm::Manager::SubscribeToPrints(). + * @return true if interest in topic prefix is no longer advertised. + */ bool UnsubscribeToPrints(const std::string& topic_prefix); + /** + * Register interest in peer event messages that use a certain topic prefix. + * @param topic_prefix a prefix to match against remote message topics. + * e.g. an empty prefix will match everything and "a" will match "alice" + * and "amy" but not "bob". + * @return true if it's a new event subscription and it is now registered. + */ bool SubscribeToEvents(std::string topic_prefix); + /** + * Unregister interest in peer event messages. + * @param topic_prefix a prefix previously supplied to a successful call + * to comm::Manager::SubscribeToEvents(). + * @return true if interest in topic prefix is no longer advertised. + */ bool UnsubscribeToEvents(const std::string& topic_prefix); + /** + * Register interest in peer log messages that use a certain topic prefix. + * @param topic_prefix a prefix to match against remote message topics. + * e.g. an empty prefix will match everything and "a" will match "alice" + * and "amy" but not "bob". + * @return true if it's a new log subscription and it is now registered. + */ bool SubscribeToLogs(std::string topic_prefix); + /** + * Unregister interest in peer log messages. + * @param topic_prefix a prefix previously supplied to a successful call + * to comm::Manager::SubscribeToLogs(). + * @return true if interest in topic prefix is no longer advertised. + */ bool UnsubscribeToLogs(const std::string& topic_prefix); + /** + * Allow sending messages to peers if associated with the given topic. + * This has no effect if auto publication behavior is enabled via the flags + * supplied to comm::Manager::Enable() or comm::Manager::SetEndpointFlags(). + * @param t a topic to allow messages to be published under. + * @return true if successful. + */ bool PublishTopic(broker::topic t); + /** + * Disallow sending messages to peers if associated with the given topic. + * This has no effect if auto publication behavior is enabled via the flags + * supplied to comm::Manager::Enable() or comm::Manager::SetEndpointFlags(). + * @param t a topic to disallow messages to be published under. + * @return true if successful. + */ bool UnpublishTopic(broker::topic t); + /** + * Allow advertising interest in the given topic to peers. + * This has no effect if auto advertise behavior is enabled via the flags + * supplied to comm::Manager::Enable() or comm::Manager::SetEndpointFlags(). + * @param t a topic to allow advertising interest/subscription to peers. + * @return true if successful. + */ bool AdvertiseTopic(broker::topic t); + /** + * Disallow advertising interest in the given topic to peers. + * This has no effect if auto advertise behavior is enabled via the flags + * supplied to comm::Manager::Enable() or comm::Manager::SetEndpointFlags(). + * @param t a topic to disallow advertising interest/subscription to peers. + * @return true if successful. + */ bool UnadvertiseTopic(broker::topic t); + /** + * Register the availability of a data store. + * @param handle the data store. + * @return true if the store was valid and not already away of it. + */ bool AddStore(StoreHandleVal* handle); + /** + * Lookup a data store by it's identifier name and type. + * @param id the store's name. + * @param type the type of data store. + * @return a pointer to the store handle if it exists else nullptr. + */ StoreHandleVal* LookupStore(const broker::store::identifier& id, StoreType type); + /** + * Close and unregister a data store. Any existing references to the + * store handle will not be able to be used for any data store operations. + * @param id the stores' name. + * @param type the type of the data store. + * @return true if such a store existed and is now closed. + */ bool CloseStore(const broker::store::identifier& id, StoreType type); + /** + * Register a data store query callback. + * @param cb the callback info to use when the query completes or times out. + * @return true if now tracking a data store query. + */ bool TrackStoreQuery(StoreQueryCallback* cb); - static int GetFlags(Val* flags); + /** + * Convert Comm::SendFlags to int flags for use with broker::send(). + */ + static int send_flags_to_int(Val* flags); private: diff --git a/src/comm/Store.cc b/src/comm/Store.cc index 8c55c31785..5fcc7daa85 100644 --- a/src/comm/Store.cc +++ b/src/comm/Store.cc @@ -48,14 +48,9 @@ comm::StoreHandleVal::StoreHandleVal(broker::store::identifier id, #ifdef HAVE_ROCKSDB std::string path = backend_options->Lookup(1)->AsRecordVal() ->Lookup(0)->AsStringVal()->CheckString(); - bool use_merge_op = backend_options->Lookup(1)->AsRecordVal() - ->Lookup(1)->AsBool(); rocksdb::Options rock_op; rock_op.create_if_missing = true; - if ( use_merge_op ) - options.merge_operator.reset(new rocksdb_merge_operator); - auto rocksdb = new broker::store::rocksdb_backend; if ( rocksdb->open(path, options).ok() ) diff --git a/src/comm/Store.h b/src/comm/Store.h index b02c5b4f5b..289290eab4 100644 --- a/src/comm/Store.h +++ b/src/comm/Store.h @@ -14,12 +14,21 @@ namespace comm { extern OpaqueType* opaque_of_store_handle; +/** + * Enumerates the possible types of data stores. + */ enum StoreType { + // Just a view in to a remote store, contains no data itself. FRONTEND, MASTER, CLONE, }; +/** + * Create a Store::QueryStatus value. + * @param success whether the query status should be set to success or failure. + * @return a Store::QueryStatus value. + */ inline EnumVal* query_status(bool success) { static EnumType* store_query_status = nullptr; @@ -36,6 +45,10 @@ inline EnumVal* query_status(bool success) return new EnumVal(success ? success_val : failure_val, store_query_status); } +/** + * @return a Store::QueryResult value that has a Store::QueryStatus indicating + * a failure. + */ inline RecordVal* query_result() { auto rval = new RecordVal(BifType::Record::Store::QueryResult); @@ -44,6 +57,11 @@ inline RecordVal* query_result() return rval; } +/** + * @param data the result of the query. + * @return a Store::QueryResult value that has a Store::QueryStatus indicating + * a success. + */ inline RecordVal* query_result(RecordVal* data) { auto rval = new RecordVal(BifType::Record::Store::QueryResult); @@ -52,6 +70,9 @@ inline RecordVal* query_result(RecordVal* data) return rval; } +/** + * Used for asynchronous data store queries which use "when" statements. + */ class StoreQueryCallback { public: @@ -84,6 +105,9 @@ public: Unref(result); } + bool Disabled() const + { return trigger->Disabled(); } + const broker::store::identifier& StoreID() const { return store_id; } @@ -98,6 +122,9 @@ private: StoreType store_type; }; +/** + * An opaque handle which wraps a Broker data store. + */ class StoreHandleVal : public OpaqueVal { public: diff --git a/src/comm/comm.bif b/src/comm/comm.bif index 1d41b572f6..23e163c748 100644 --- a/src/comm/comm.bif +++ b/src/comm/comm.bif @@ -9,50 +9,133 @@ module Comm; type Comm::EndpointFlags: record; +## Enable use of communication. +## +## flags: used to tune the local Broker endpoint behavior. +## +## Returns: true if communication is successfully initialized. function Comm::enable%(flags: EndpointFlags &default = EndpointFlags()%): bool %{ return new Val(comm_mgr->Enable(flags), TYPE_BOOL); %} +## Changes endpoint flags originally supplied to :bro:see:`Comm::enable`. +## +## flags: the new endpoint behavior flags to use. +## +## Returns: true of flags were changed. function Comm::set_endpoint_flags%(flags: EndpointFlags &default = EndpointFlags()%): bool %{ return new Val(comm_mgr->SetEndpointFlags(flags), TYPE_BOOL); %} +## Allow sending messages to peers if associated with the given topic. +## This has no effect if auto publication behavior is enabled via the flags +## supplied to :bro:see:`Comm::enable` or :bro:see:`Comm::set_endpoint_flags`. +## +## topic: a topic to allow messages to be published under. +## +## Returns: true if successful. function Comm::publish_topic%(topic: string%): bool %{ return new Val(comm_mgr->PublishTopic(topic->CheckString()), TYPE_BOOL); %} +## Disallow sending messages to peers if associated with the given topic. +## This has no effect if auto publication behavior is enabled via the flags +## supplied to :bro:see:`Comm::enable` or :bro:see:`Comm::set_endpoint_flags`. +## +## topic: a topic to disallow messages to be published under. +## +## Returns: true if successful. function Comm::unpublish_topic%(topic: string%): bool %{ return new Val(comm_mgr->UnpublishTopic(topic->CheckString()), TYPE_BOOL); %} +## Allow advertising interest in the given topic to peers. +## This has no effect if auto advertise behavior is enabled via the flags +## supplied to :bro:see:`Comm::enable` or :bro:see:`Comm::set_endpoint_flags`. +## +## topic: a topic to allow advertising interest/subscription to peers. +## +## Returns: true if successful. function Comm::advertise_topic%(topic: string%): bool %{ return new Val(comm_mgr->AdvertiseTopic(topic->CheckString()), TYPE_BOOL); %} +## Disallow advertising interest in the given topic to peers. +## This has no effect if auto advertise behavior is enabled via the flags +## supplied to :bro:see:`Comm::enable` or :bro:see:`Comm::set_endpoint_flags`. +## +## topic: a topic to disallow advertising interest/subscription to peers. +## +## Returns: true if successful. function Comm::unadvertise_topic%(topic: string%): bool %{ return new Val(comm_mgr->UnadvertiseTopic(topic->CheckString()), TYPE_BOOL); %} +## Generated when a connection has been established due to a previous call +## to :bro:see:`Comm::connect`. +## +## peer_address: the address used to connect to the peer. +## +## peer_port: the port used to connect to the peer. +## +## peer_name: the name by which the peer identified itself. event Comm::outgoing_connection_established%(peer_address: string, peer_port: port, peer_name: string%); +## Generated when a previously established connection becomes broken. +## Reconnection will automatically be attempted at a frequency given +## by the original call to :bro:see:`Comm::connect`. +## +## peer_address: the address used to connect to the peer. +## +## peer_port: the port used to connect to the peer. +## +## .. bro:see:: Comm::outgoing_connection_established event Comm::outgoing_connection_broken%(peer_address: string, peer_port: port%); +## Generated when a connection via :bro:see:`Comm::connect` has failed +## because the remote side is incompatible. +## +## peer_address: the address used to connect to the peer. +## +## peer_port: the port used to connect to the peer. event Comm::outgoing_connection_incompatible%(peer_address: string, peer_port: port%); +## Generated when a peer has established a connection with this process +## as a result of previously performing a :bro:see:`Comm::listen`. +## +## peer_name: the name by which the peer identified itself. event Comm::incoming_connection_established%(peer_name: string%); +## Generated when a peer that previously established a connection with this +## process becomes disconnected. +## +## peer_name: the name by which the peer identified itself. +## +## .. bro:see:: Comm::incoming_connection_established event Comm::incoming_connection_broken%(peer_name: string%); +## Listen for remote connections. +## +## p: the TCP port to listen on. +## +## a: an address string on which to accept connections, e.g. +## "127.0.0.1". An empty string refers to @p INADDR_ANY. +## +## reuse: equivalent to behavior of SO_REUSEADDR. +## +## Returns: true if the local endpoint is now listening for connections. +## +## .. bro:see:: Comm::incoming_connection_established function Comm::listen%(p: port, a: string &default = "", reuse: bool &default = T%): bool %{ @@ -67,6 +150,21 @@ function Comm::listen%(p: port, a: string &default = "", return new Val(rval, TYPE_BOOL); %} +## Initiate a remote connection. +## +## a: an address to connect to, e.g. "localhost" or "127.0.0.1". +## +## p: the TCP port on which the remote side is listening. +## +## retry: an interval at which to retry establishing the +## connection with the remote peer if it cannot be made initially, or +## if it ever becomes disconnected. +## +## Returns: true if it's possible to try connecting with the peer and +## it's a new peer. The actual connection may not be established +## a later point in time. +## +## .. bro:see:: Comm::outgoing_connection_established function Comm::connect%(a: string, p: port, retry: interval%): bool %{ if ( ! p->IsTCP() ) @@ -80,6 +178,14 @@ function Comm::connect%(a: string, p: port, retry: interval%): bool return new Val(rval, TYPE_BOOL); %} +## Remove a remote connection. +## +## a: the address used in previous successful call to :bro:see:`Comm::connect`. +## +## p: the port used in previous successful call to :bro:see:`Comm::connect`. +## +## Returns: true if the arguments match a previously successful call to +## :bro:see:`Comm::connect`. function Comm::disconnect%(a: string, p: port%): bool %{ if ( ! p->IsTCP() ) diff --git a/src/comm/data.bif b/src/comm/data.bif index 2a78a9229a..7120046920 100644 --- a/src/comm/data.bif +++ b/src/comm/data.bif @@ -7,6 +7,8 @@ module Comm; +## Enumerates the possible types that :bro:see:`Comm::Data` may be in terms of +## Bro data types. enum DataType %{ BOOL, INT, @@ -29,36 +31,78 @@ type Comm::Data: record; type Comm::TableItem: record; +## Convert any Bro value in to communication data. +## +## d: any Bro value to attempt to convert (not all types are supported). +## +## Returns: the converted communication data which may not set its only +## opaque field of the the conversion was not possible (the Bro data +## type does not support being converted to communicaiton data). function Comm::data%(d: any%): Comm::Data %{ return comm::make_data_val(d); %} +## Retrieve the type of data associated with communication data. +## +## d: the communication data. +## +## Returns: the data type associated with the communication data. function Comm::data_type%(d: Comm::Data%): Comm::DataType %{ return comm::get_data_type(d->AsRecordVal(), frame); %} +## Convert communication data with a type of :bro:see:`Comm::BOOL` to +## an actual Bro value. +## +## d: the communication data to convert. +## +## Returns: the value retrieved from the communication data. function Comm::refine_to_bool%(d: Comm::Data%): bool %{ return comm::refine(d->AsRecordVal(), TYPE_BOOL, frame); %} +## Convert communication data with a type of :bro:see:`Comm::INT` to +## an actual Bro value. +## +## d: the communication data to convert. +## +## Returns: the value retrieved from the communication data. function Comm::refine_to_int%(d: Comm::Data%): int %{ return comm::refine(d->AsRecordVal(), TYPE_INT, frame); %} +## Convert communication data with a type of :bro:see:`Comm::COUNT` to +## an actual Bro value. +## +## d: the communication data to convert. +## +## Returns: the value retrieved from the communication data. function Comm::refine_to_count%(d: Comm::Data%): count %{ return comm::refine(d->AsRecordVal(), TYPE_COUNT, frame); %} +## Convert communication data with a type of :bro:see:`Comm::DOUBLE` to +## an actual Bro value. +## +## d: the communication data to convert. +## +## Returns: the value retrieved from the communication data. function Comm::refine_to_double%(d: Comm::Data%): double %{ return comm::refine(d->AsRecordVal(), TYPE_DOUBLE, frame); %} +## Convert communication data with a type of :bro:see:`Comm::STRING` to +## an actual Bro value. +## +## d: the communication data to convert. +## +## Returns: the value retrieved from the communication data. function Comm::refine_to_string%(d: Comm::Data%): string %{ return new StringVal(comm::require_data_type(d->AsRecordVal(), @@ -66,6 +110,12 @@ function Comm::refine_to_string%(d: Comm::Data%): string frame)); %} +## Convert communication data with a type of :bro:see:`Comm::ADDR` to +## an actual Bro value. +## +## d: the communication data to convert. +## +## Returns: the value retrieved from the communication data. function Comm::refine_to_addr%(d: Comm::Data%): addr %{ auto& a = comm::require_data_type(d->AsRecordVal(), @@ -74,6 +124,12 @@ function Comm::refine_to_addr%(d: Comm::Data%): addr return new AddrVal(IPAddr(*bits)); %} +## Convert communication data with a type of :bro:see:`Comm::SUBNET` to +## an actual Bro value. +## +## d: the communication data to convert. +## +## Returns: the value retrieved from the communication data. function Comm::refine_to_subnet%(d: Comm::Data%): subnet %{ auto& a = comm::require_data_type(d->AsRecordVal(), @@ -82,6 +138,12 @@ function Comm::refine_to_subnet%(d: Comm::Data%): subnet return new SubNetVal(IPPrefix(IPAddr(*bits), a.length())); %} +## Convert communication data with a type of :bro:see:`Comm::PORT` to +## an actual Bro value. +## +## d: the communication data to convert. +## +## Returns: the value retrieved from the communication data. function Comm::refine_to_port%(d: Comm::Data%): port %{ auto& a = comm::require_data_type(d->AsRecordVal(), @@ -89,6 +151,12 @@ function Comm::refine_to_port%(d: Comm::Data%): port return new PortVal(a.number(), comm::to_bro_port_proto(a.type())); %} +## Convert communication data with a type of :bro:see:`Comm::TIME` to +## an actual Bro value. +## +## d: the communication data to convert. +## +## Returns: the value retrieved from the communication data. function Comm::refine_to_time%(d: Comm::Data%): time %{ auto v = comm::require_data_type(d->AsRecordVal(), @@ -96,6 +164,12 @@ function Comm::refine_to_time%(d: Comm::Data%): time return new Val(v, TYPE_TIME); %} +## Convert communication data with a type of :bro:see:`Comm::INTERVAL` to +## an actual Bro value. +## +## d: the communication data to convert. +## +## Returns: the value retrieved from the communication data. function Comm::refine_to_interval%(d: Comm::Data%): interval %{ auto v = comm::require_data_type(d->AsRecordVal(), @@ -103,6 +177,13 @@ function Comm::refine_to_interval%(d: Comm::Data%): interval return new Val(v, TYPE_INTERVAL); %} +## Convert communication data with a type of :bro:see:`Comm::ENUM` to +## the name of the enum value. :bro:see:`lookup_ID` may be used to convert +## the name to the actual enum value. +## +## d: the communication data to convert. +## +## Returns: the enum name retrieved from the communication data. function Comm::refine_to_enum_name%(d: Comm::Data%): string %{ auto& v = comm::require_data_type(d->AsRecordVal(), @@ -110,11 +191,17 @@ function Comm::refine_to_enum_name%(d: Comm::Data%): string return new StringVal(v); %} +## Create communication data of type "set". function Comm::set_create%(%): Comm::Data %{ return comm::make_data_val(broker::set()); %} +## Remove all elements within a set. +## +## s: the set to clear. +## +## Returns: always true. function Comm::set_clear%(s: Comm::Data%): bool %{ auto& v = comm::require_data_type(s->AsRecordVal(), TYPE_TABLE, @@ -123,6 +210,11 @@ function Comm::set_clear%(s: Comm::Data%): bool return new Val(true, TYPE_BOOL); %} +## Get the number of elements within a set. +## +## s: the set to query. +## +## Returns: the number of elements in the set. function Comm::set_size%(s: Comm::Data%): count %{ auto& v = comm::require_data_type(s->AsRecordVal(), TYPE_TABLE, @@ -130,6 +222,13 @@ function Comm::set_size%(s: Comm::Data%): count return new Val(static_cast(v.size()), TYPE_COUNT); %} +## Check if a set contains a particular element. +## +## s: the set to query. +## +## key: the element to check for existence. +## +## Returns: true if the key exists in the set. function Comm::set_contains%(s: Comm::Data, key: Comm::Data%): bool %{ auto& v = comm::require_data_type(s->AsRecordVal(), TYPE_TABLE, @@ -138,6 +237,13 @@ function Comm::set_contains%(s: Comm::Data, key: Comm::Data%): bool return new Val(v.find(k) != v.end(), TYPE_BOOL); %} +### Insert an element into a set. +## +## s: the set to modify. +## +## key: the element to insert. +## +## Returns: true if the key was inserted, or false if it already existed. function Comm::set_insert%(s: Comm::Data, key: Comm::Data%): bool %{ auto& v = comm::require_data_type(s->AsRecordVal(), TYPE_TABLE, @@ -146,6 +252,13 @@ function Comm::set_insert%(s: Comm::Data, key: Comm::Data%): bool return new Val(v.insert(k).second, TYPE_BOOL); %} +## Remove an element from a set. +## +## s: the set to modify. +## +## key: the element to remove. +## +## Returns: true if the element existed in the set and is now removed. function Comm::set_remove%(s: Comm::Data, key: Comm::Data%): bool %{ auto& v = comm::require_data_type(s->AsRecordVal(), TYPE_TABLE, @@ -154,17 +267,36 @@ function Comm::set_remove%(s: Comm::Data, key: Comm::Data%): bool return new Val(v.erase(k) > 0, TYPE_BOOL); %} +## Create an iterator for a set. Note that this makes a copy of the set +## internally to ensure the iterator is always valid. +## +## s: the set to iterate over. +## +## Returns: an iterator. function Comm::set_iterator%(s: Comm::Data%): opaque of Comm::SetIterator %{ return new comm::SetIterator(s->AsRecordVal(), TYPE_TABLE, frame); %} +## Check if there are no more elements to iterate over. +## +## it: an iterator. +## +## Returns: true if there are no more elements to iterator over, i.e. +## the iterator is one-past-the-final-element. function Comm::set_iterator_last%(it: opaque of Comm::SetIterator%): bool %{ auto set_it = static_cast(it); return new Val(set_it->it == set_it->dat.end(), TYPE_BOOL); %} +## Advance an iterator. +## +## it: an iterator. +## +## Returns: true if the iterator, after advancing, still references an element +## in the collection. False if the iterator, after advancing, is +## one-past-the-final-element. function Comm::set_iterator_next%(it: opaque of Comm::SetIterator%): bool %{ auto set_it = static_cast(it); @@ -176,6 +308,11 @@ function Comm::set_iterator_next%(it: opaque of Comm::SetIterator%): bool return new Val(set_it->it != set_it->dat.end(), TYPE_BOOL); %} +## Retrieve the data at an iterator's current position. +## +## it: an iterator. +## +## Returns: element in the collection that the iterator currently references. function Comm::set_iterator_value%(it: opaque of Comm::SetIterator%): Comm::Data %{ auto set_it = static_cast(it); @@ -193,11 +330,17 @@ function Comm::set_iterator_value%(it: opaque of Comm::SetIterator%): Comm::Data return rval; %} +## Create communication data of type "table". function Comm::table_create%(%): Comm::Data %{ return comm::make_data_val(broker::table()); %} +## Remove all elements within a table. +## +## t: the table to clear. +## +## Returns: always true. function Comm::table_clear%(t: Comm::Data%): bool %{ auto& v = comm::require_data_type(t->AsRecordVal(), @@ -206,6 +349,11 @@ function Comm::table_clear%(t: Comm::Data%): bool return new Val(true, TYPE_BOOL); %} +## Get the number of elements within a table. +## +## t: the table to query. +## +## Returns: the number of elements in the table. function Comm::table_size%(t: Comm::Data%): count %{ auto& v = comm::require_data_type(t->AsRecordVal(), @@ -213,6 +361,13 @@ function Comm::table_size%(t: Comm::Data%): count return new Val(static_cast(v.size()), TYPE_COUNT); %} +## Check if a table contains a particular key. +## +## t: the table to query. +## +## key: the key to check for existence. +## +## Returns: true if the key exists in the set. function Comm::table_contains%(t: Comm::Data, key: Comm::Data%): bool %{ auto& v = comm::require_data_type(t->AsRecordVal(), @@ -221,6 +376,16 @@ function Comm::table_contains%(t: Comm::Data, key: Comm::Data%): bool return new Val(v.find(k) != v.end(), TYPE_BOOL); %} +## Insert a key-value pair into a table. +## +## t: the table to modify. +## +## key: the key at which to insert the value. +## +## val: the value to insert. +## +## Returns: true if the key-value pair was inserted, or false if the key +## already existed in the table. function Comm::table_insert%(t: Comm::Data, key: Comm::Data, val: Comm::Data%): Comm::Data %{ auto& table = comm::require_data_type(t->AsRecordVal(), @@ -242,6 +407,14 @@ function Comm::table_insert%(t: Comm::Data, key: Comm::Data, val: Comm::Data%): } %} +## Remove a key-value pair from a table. +## +## t: the table to modify. +## +## key: the key to remove from the table. +## +## Returns: the value associated with the key. If the key did not exist, then +## the optional field of the returned record is not set. function Comm::table_remove%(t: Comm::Data, key: Comm::Data%): Comm::Data %{ auto& table = comm::require_data_type(t->AsRecordVal(), @@ -259,6 +432,14 @@ function Comm::table_remove%(t: Comm::Data, key: Comm::Data%): Comm::Data } %} +## Retrieve a value from a table. +## +## t: the table to query. +## +## key: the key to lookup. +## +## Returns: the value associated with the key. If the key did not exist, then +## the optional field of the returned record is not set. function Comm::table_lookup%(t: Comm::Data, key: Comm::Data%): Comm::Data %{ auto& table = comm::require_data_type(t->AsRecordVal(), @@ -272,17 +453,36 @@ function Comm::table_lookup%(t: Comm::Data, key: Comm::Data%): Comm::Data return comm::make_data_val(it->second); %} +## Create an iterator for a table. Note that this makes a copy of the table +## internally to ensure the iterator is always valid. +## +## t: the table to iterate over. +## +## Returns: an iterator. function Comm::table_iterator%(t: Comm::Data%): opaque of Comm::TableIterator %{ return new comm::TableIterator(t->AsRecordVal(), TYPE_TABLE, frame); %} +## Check if there are no more elements to iterate over. +## +## it: an iterator. +## +## Returns: true if there are no more elements to iterator over, i.e. +## the iterator is one-past-the-final-element. function Comm::table_iterator_last%(it: opaque of Comm::TableIterator%): bool %{ auto ti = static_cast(it); return new Val(ti->it == ti->dat.end(), TYPE_BOOL); %} +## Advance an iterator. +## +## it: an iterator. +## +## Returns: true if the iterator, after advancing, still references an element +## in the collection. False if the iterator, after advancing, is +## one-past-the-final-element. function Comm::table_iterator_next%(it: opaque of Comm::TableIterator%): bool %{ auto ti = static_cast(it); @@ -294,6 +494,11 @@ function Comm::table_iterator_next%(it: opaque of Comm::TableIterator%): bool return new Val(ti->it != ti->dat.end(), TYPE_BOOL); %} +## Retrieve the data at an iterator's current position. +## +## it: an iterator. +## +## Returns: element in the collection that the iterator currently references. function Comm::table_iterator_value%(it: opaque of Comm::TableIterator%): Comm::TableItem %{ auto ti = static_cast(it); @@ -316,11 +521,17 @@ function Comm::table_iterator_value%(it: opaque of Comm::TableIterator%): Comm:: return rval; %} +## Create communication data of type "vector". function Comm::vector_create%(%): Comm::Data %{ return comm::make_data_val(broker::vector()); %} +## Remove all elements within a vector. +## +## v: the vector to clear. +## +## Returns: always true. function Comm::vector_clear%(v: Comm::Data%): bool %{ auto& vec = comm::require_data_type(v->AsRecordVal(), @@ -329,6 +540,11 @@ function Comm::vector_clear%(v: Comm::Data%): bool return new Val(true, TYPE_BOOL); %} +## Get the number of elements within a vector. +## +## v: the vector to query. +## +## Returns: the number of elements in the vector. function Comm::vector_size%(v: Comm::Data%): count %{ auto& vec = comm::require_data_type(v->AsRecordVal(), @@ -336,6 +552,17 @@ function Comm::vector_size%(v: Comm::Data%): count return new Val(static_cast(vec.size()), TYPE_COUNT); %} +## Insert an element into a vector at a particular position, possibly displacing +## existing elements (insertion always grows the size of the vector by one). +## +## v: the vector to modify. +## +## d: the element to insert. +## +## idx: the index at which to insert the data. If it is greater than the +## current size of the vector, the element is inserted at the end. +## +## Returns: always true. function Comm::vector_insert%(v: Comm::Data, d: Comm::Data, idx: count%): bool %{ auto& vec = comm::require_data_type(v->AsRecordVal(), @@ -346,6 +573,16 @@ function Comm::vector_insert%(v: Comm::Data, d: Comm::Data, idx: count%): bool return new Val(true, TYPE_BOOL); %} +## Replace an element in a vector at a particular position. +## +## v: the vector to modify. +## +## d: the element to insert. +## +## idx: the index to replace. +## +## Returns: the value that was just evicted. If the index was larger than any +## valid index, the optional field of the returned record is not set. function Comm::vector_replace%(v: Comm::Data, d: Comm::Data, idx: count%): Comm::Data %{ auto& vec = comm::require_data_type(v->AsRecordVal(), @@ -360,6 +597,14 @@ function Comm::vector_replace%(v: Comm::Data, d: Comm::Data, idx: count%): Comm: return rval; %} +## Remove an element from a vector at a particular position. +## +## v: the vector to modify. +## +## idx: the index to remove. +## +## Returns: the value that was just evicted. If the index was larger than any +## valid index, the optional field of the returned record is not set. function Comm::vector_remove%(v: Comm::Data, idx: count%): Comm::Data %{ auto& vec = comm::require_data_type(v->AsRecordVal(), @@ -373,6 +618,14 @@ function Comm::vector_remove%(v: Comm::Data, idx: count%): Comm::Data return rval; %} +## Lookup an element in a vector at a particular position. +## +## v: the vector to query. +## +## idx: the index to lookup. +## +## Returns: the value at the index. If the index was larger than any +## valid index, the optional field of the returned record is not set. function Comm::vector_lookup%(v: Comm::Data, idx: count%): Comm::Data %{ auto& vec = comm::require_data_type(v->AsRecordVal(), @@ -384,17 +637,36 @@ function Comm::vector_lookup%(v: Comm::Data, idx: count%): Comm::Data return comm::make_data_val(vec[idx]); %} +## Create an iterator for a vector. Note that this makes a copy of the vector +## internally to ensure the iterator is always valid. +## +## v: the vector to iterate over. +## +## Returns: an iterator. function Comm::vector_iterator%(v: Comm::Data%): opaque of Comm::VectorIterator %{ return new comm::VectorIterator(v->AsRecordVal(), TYPE_VECTOR, frame); %} +## Check if there are no more elements to iterate over. +## +## it: an iterator. +## +## Returns: true if there are no more elements to iterator over, i.e. +## the iterator is one-past-the-final-element. function Comm::vector_iterator_last%(it: opaque of Comm::VectorIterator%): bool %{ auto vi = static_cast(it); return new Val(vi->it == vi->dat.end(), TYPE_BOOL); %} +## Advance an iterator. +## +## it: an iterator. +## +## Returns: true if the iterator, after advancing, still references an element +## in the collection. False if the iterator, after advancing, is +## one-past-the-final-element. function Comm::vector_iterator_next%(it: opaque of Comm::VectorIterator%): bool %{ auto vi = static_cast(it); @@ -406,6 +678,11 @@ function Comm::vector_iterator_next%(it: opaque of Comm::VectorIterator%): bool return new Val(vi->it != vi->dat.end(), TYPE_BOOL); %} +## Retrieve the data at an iterator's current position. +## +## it: an iterator. +## +## Returns: element in the collection that the iterator currently references. function Comm::vector_iterator_value%(it: opaque of Comm::VectorIterator%): Comm::Data %{ auto vi = static_cast(it); @@ -414,7 +691,7 @@ function Comm::vector_iterator_value%(it: opaque of Comm::VectorIterator%): Comm if ( vi->it == vi->dat.end() ) { reporter->PushLocation(frame->GetCall()->GetLocationInfo()); - reporter->Warning("attempt to retrieve value of invalid table iterator"); + reporter->Warning("attempt to retrieve value of invalid vector iterator"); reporter->PopLocation(); return rval; } @@ -423,11 +700,21 @@ function Comm::vector_iterator_value%(it: opaque of Comm::VectorIterator%): Comm return rval; %} +## Create communication data of type "record". +## +## sz: the number of fields in the record. +## +## Returns: record data, with all fields uninitialized. function Comm::record_create%(sz: count%): Comm::Data %{ return comm::make_data_val(broker::record(std::vector(sz))); %} +## Get the number of fields within a record. +## +## r: the record to query. +## +## Returns: the number of fields in the record. function Comm::record_size%(r: Comm::Data%): count %{ auto& v = comm::require_data_type(r->AsRecordVal(), @@ -435,6 +722,15 @@ function Comm::record_size%(r: Comm::Data%): count return new Val(static_cast(v.fields.size()), TYPE_COUNT); %} +## Replace a field in a record at a particular position. +## +## t: the table to modify. +## +## d: the new field value to assign. +## +## idx: the index to replace. +## +## Returns: false if the index was larger than any valid index, else true. function Comm::record_assign%(r: Comm::Data, d: Comm::Data, idx: count%): bool %{ auto& v = comm::require_data_type(r->AsRecordVal(), @@ -448,6 +744,15 @@ function Comm::record_assign%(r: Comm::Data, d: Comm::Data, idx: count%): bool return new Val(true, TYPE_BOOL); %} +## Lookup a field in a record at a particular position. +## +## r: the record to query. +## +## idx: the index to lookup. +## +## Returns: the value at the index. The optional field of the returned record +## may not be set if the field of the record has no value or if the +## the index was not valid. function Comm::record_lookup%(r: Comm::Data, idx: count%): Comm::Data %{ auto& v = comm::require_data_type(r->AsRecordVal(), @@ -462,17 +767,36 @@ function Comm::record_lookup%(r: Comm::Data, idx: count%): Comm::Data return comm::make_data_val(*v.fields[idx]); %} +## Create an iterator for a record. Note that this makes a copy of the record +## internally to ensure the iterator is always valid. +## +## r: the record to iterate over. +## +## Returns: an iterator. function Comm::record_iterator%(r: Comm::Data%): opaque of Comm::RecordIterator %{ return new comm::RecordIterator(r->AsRecordVal(), TYPE_RECORD, frame); %} +## Check if there are no more elements to iterate over. +## +## it: an iterator. +## +## Returns: true if there are no more elements to iterator over, i.e. +## the iterator is one-past-the-final-element. function Comm::record_iterator_last%(it: opaque of Comm::RecordIterator%): bool %{ auto ri = static_cast(it); return new Val(ri->it == ri->dat.fields.end(), TYPE_BOOL); %} +## Advance an iterator. +## +## it: an iterator. +## +## Returns: true if the iterator, after advancing, still references an element +## in the collection. False if the iterator, after advancing, is +## one-past-the-final-element. function Comm::record_iterator_next%(it: opaque of Comm::RecordIterator%): bool %{ auto ri = static_cast(it); @@ -484,6 +808,11 @@ function Comm::record_iterator_next%(it: opaque of Comm::RecordIterator%): bool return new Val(ri->it != ri->dat.fields.end(), TYPE_BOOL); %} +## Retrieve the data at an iterator's current position. +## +## it: an iterator. +## +## Returns: element in the collection that the iterator currently references. function Comm::record_iterator_value%(it: opaque of Comm::RecordIterator%): Comm::Data %{ auto ri = static_cast(it); diff --git a/src/comm/messaging.bif b/src/comm/messaging.bif index 26f9497449..fb65c981b2 100644 --- a/src/comm/messaging.bif +++ b/src/comm/messaging.bif @@ -14,6 +14,15 @@ type Comm::EventArgs: record; event Comm::print_handler%(msg: string%); +## Print a simple message to any interested peers. +## +## topic: a topic associated with the printed message. +## +## msg: the print message to send to peers. +## +## flags: tune the behavior of how the message is sent. +## +## Returns: true if the message is sent. function Comm::print%(topic: string, msg: string, flags: SendFlags &default = SendFlags()%): bool %{ @@ -22,24 +31,53 @@ function Comm::print%(topic: string, msg: string, return new Val(rval, TYPE_BOOL); %} +## Register interest in all peer print messages that use a certain topic prefix. +## +## topic_prefix: a prefix to match against remote message topics. +## e.g. an empty prefix matches everything and "a" matches +## "alice" and "amy" but not "bob". +## +## Returns: true if it's a new print subscription and it is now registered. function Comm::subscribe_to_prints%(topic_prefix: string%): bool %{ auto rval = comm_mgr->SubscribeToPrints(topic_prefix->CheckString()); return new Val(rval, TYPE_BOOL); %} +## Unregister interest in all peer print messages that use a topic prefix. +## +## topic_prefix: a prefix previously supplied to a successful call to +## :bro:see:`Comm::subscribe_to_prints`. +## +## Returns: true if interest in the topic prefix is no longer advertised. function Comm::unsubscribe_to_prints%(topic_prefix: string%): bool %{ auto rval = comm_mgr->UnsubscribeToPrints(topic_prefix->CheckString()); return new Val(rval, TYPE_BOOL); %} +## Create a data structure that may be used to send a remote event via +## :bro:see:`Comm::event`. +## +## args: an event, followed by a list of argument values that may be used +## to call it. +## +## Returns: opaque communication data that may be used to send a remote event. function Comm::event_args%(...%): Comm::EventArgs %{ auto rval = comm_mgr->MakeEventArgs(@ARGS@); return rval; %} +## Send an event to any interested peers. +## +## topic: a topic associated with the event message. +## +## args: event arguments as made by :bro:see:`Comm::event_args`. +## +## flags: tune the behavior of how the message is sent. +## +## Returns: true if the message is sent. function Comm::event%(topic: string, args: Comm::EventArgs, flags: SendFlags &default = SendFlags()%): bool %{ @@ -48,6 +86,18 @@ function Comm::event%(topic: string, args: Comm::EventArgs, return new Val(rval, TYPE_BOOL); %} +## Automatically send an event to any interested peers whenever it is +## locally dispatched (e.g. using "event my_event(...);" in a script). +## +## topic: a topic string associated with the event message. +## Peers advertise interest by registering a subscription to some prefix +## of this topic name. +## +## ev: a Bro event value. +## +## flags: tune the behavior of how the message is send. +## +## Returns: true if automatic event sending is now enabled. function Comm::auto_event%(topic: string, ev: any, flags: SendFlags &default = SendFlags()%): bool %{ @@ -55,51 +105,101 @@ function Comm::auto_event%(topic: string, ev: any, return new Val(rval, TYPE_BOOL); %} +## Stop automatically sending an event to peers upon local dispatch. +## +## topic: a topic originally given to :bro:see:`Comm::auto_event`. +## +## ev: an event originally given to :bro:see:`Comm::auto_event`. +## +## Returns: true if automatic events will no occur for the topic/event pair. function Comm::auto_event_stop%(topic: string, ev: any%): bool %{ auto rval = comm_mgr->AutoEventStop(topic->CheckString(), ev); return new Val(rval, TYPE_BOOL); %} +## Register interest in all peer event messages that use a certain topic prefix. +## +## topic_prefix: a prefix to match against remote message topics. +## e.g. an empty prefix matches everything and "a" matches +## "alice" and "amy" but not "bob". +## +## Returns: true if it's a new event subscription and it is now registered. function Comm::subscribe_to_events%(topic_prefix: string%): bool %{ auto rval = comm_mgr->SubscribeToEvents(topic_prefix->CheckString()); return new Val(rval, TYPE_BOOL); %} +## Unregister interest in all peer event messages that use a topic prefix. +## +## topic_prefix: a prefix previously supplied to a successful call to +## :bro:see:`Comm::subscribe_to_events`. +## +## Returns: true if interest in the topic prefix is no longer advertised. function Comm::unsubscribe_to_events%(topic_prefix: string%): bool %{ auto rval = comm_mgr->UnsubscribeToEvents(topic_prefix->CheckString()); return new Val(rval, TYPE_BOOL); %} +## Enable remote logs for a given log stream. +## +## id: the log stream to enable remote logs for. +## +## flags: tune the behavior of how log entry messages are sent. +## +## Returns: true if remote logs are enabled for the stream. function Comm::enable_remote_logs%(id: Log::ID, flags: SendFlags &default = SendFlags()%): bool %{ auto rval = log_mgr->EnableRemoteLogs(id->AsEnumVal(), - comm::Manager::GetFlags(flags)); + comm::Manager::send_flags_to_int(flags)); return new Val(rval, TYPE_BOOL); %} +## Disable remote logs for a given log stream. +## +## id: the log stream to disable remote logs for. +## +## Returns: true if remote logs are disabled for the stream. function Comm::disable_remote_logs%(id: Log::ID%): bool %{ auto rval = log_mgr->DisableRemoteLogs(id->AsEnumVal()); return new Val(rval, TYPE_BOOL); %} +## Returns: true if remote logs are enabled for the given stream. function Comm::remote_logs_enabled%(id: Log::ID%): bool %{ auto rval = log_mgr->RemoteLogsAreEnabled(id->AsEnumVal()); return new Val(rval, TYPE_BOOL); %} +## Register interest in all peer log messages that use a certain topic prefix. +## Logs are implicitly sent with topic "bro/log/" and the +## receiving side processes them through the logging framework as usual. +## +## topic_prefix: a prefix to match against remote message topics. +## e.g. an empty prefix matches everything and "a" matches +## "alice" and "amy" but not "bob". +## +## Returns: true if it's a new log subscription and it is now registered. function Comm::subscribe_to_logs%(topic_prefix: string%): bool %{ auto rval = comm_mgr->SubscribeToLogs(topic_prefix->CheckString()); return new Val(rval, TYPE_BOOL); %} +## Unregister interest in all peer log messages that use a topic prefix. +## Logs are implicitly sent with topic "bro/log/" and the +## receiving side processes them through the logging framework as usual. +## +## topic_prefix: a prefix previously supplied to a successful call to +## :bro:see:`Comm::subscribe_to_logs`. +## +## Returns: true if interest in the topic prefix is no longer advertised. function Comm::unsubscribe_to_logs%(topic_prefix: string%): bool %{ auto rval = comm_mgr->UnsubscribeToLogs(topic_prefix->CheckString()); diff --git a/src/comm/store.bif b/src/comm/store.bif index 18e63282e8..6a27c05dcb 100644 --- a/src/comm/store.bif +++ b/src/comm/store.bif @@ -16,12 +16,22 @@ type Store::QueryResult: record; type Store::BackendOptions: record; +## Enumerates the possible storage backends. enum BackendType %{ MEMORY, SQLITE, ROCKSDB, %} +## Create a master data store which contains key-value pairs. +## +## id: a unique name for the data store. +## +## b: the storage backend to use. +## +## options: tunes how some storage backends operate. +## +## Returns: a handle to the data store. function Store::create_master%(id: string, b: BackendType &default = MEMORY, options: BackendOptions &default = BackendOptions()%): opaque of Store::Handle %{ @@ -42,6 +52,28 @@ function Store::create_master%(id: string, b: BackendType &default = MEMORY, return rval; %} +## Create a clone of a master data store which may live with a remote peer. +## A clone automatically synchronizes to the master by automatically receiving +## modifications and applying them locally. Direct modifications are not +## possible, they must be sent through the master store, which then +## automatically broadcasts the changes out to clones. But queries may be made +## directly against the local cloned copy, which may be resolved quicker than +## reaching out to a remote master store. +## +## id: the unique name which identifies the master data store. +## +## b: the storage backend to use. +## +## options: tunes how some storage backends operate. +## +## resync: the interval at which to re-attempt synchronizing with the master +## store should the connection be lost. If the clone has not yet +## synchronized for the first time, updates and queries queue up until +## the synchronization completes. After, if the connection to the +## master store is lost, queries continue to use the clone's version, +## but updates will be lost until the master is once again available. +## +## Returns: a handle to the data store. function Store::create_clone%(id: string, b: BackendType &default = MEMORY, options: BackendOptions &default = BackendOptions(), resync: interval &default = 1sec%): opaque of Store::Handle @@ -64,6 +96,12 @@ function Store::create_clone%(id: string, b: BackendType &default = MEMORY, return rval; %} +## Create a frontend interface to an existing master data store that allows +## querying and updating its contents. +## +## id: the unique name which identifies the master data store. +## +## Returns: a handle to the data store. function Store::create_frontend%(id: string%): opaque of Store::Handle %{ auto id_str = id->CheckString(); @@ -81,6 +119,12 @@ function Store::create_frontend%(id: string%): opaque of Store::Handle return rval; %} +## Close a data store. +## +## h: a data store handle. +## +## Returns: true if store was valid and is now closed. The handle can no +## longer be used for data store operations. function Store::close_by_handle%(h: opaque of Store::Handle%): bool %{ auto handle = static_cast(h); @@ -96,6 +140,17 @@ function Store::close_by_handle%(h: opaque of Store::Handle%): bool # non-blocking update API # ########################### +## Insert a key-value pair in to the store. +## +## h: the handle of the store to modify. +## +## k: the key to insert. +## +## v: the value to insert. +## +## e: the expiration time of the key-value pair. +## +## Returns: false if the store handle was not valid. function Store::insert%(h: opaque of Store::Handle, k: Comm::Data, v: Comm::Data, e: Store::ExpiryTime &default = Store::ExpiryTime()%): bool @@ -134,6 +189,13 @@ function Store::insert%(h: opaque of Store::Handle, return new Val(true, TYPE_BOOL); %} +## Remove a key-value pair from the store. +## +## h: the handle of the store to modify. +## +## k: the key to remove. +## +## Returns: false if the store handle was not valid. function Store::erase%(h: opaque of Store::Handle, k: Comm::Data%): bool %{ auto handle = static_cast(h); @@ -146,6 +208,11 @@ function Store::erase%(h: opaque of Store::Handle, k: Comm::Data%): bool return new Val(true, TYPE_BOOL); %} +## Remove all key-value pairs from the store. +## +## h: the handle of the store to modify. +## +## Returns: false if the store handle was not valid. function Store::clear%(h: opaque of Store::Handle%): bool %{ auto handle = static_cast(h); @@ -157,6 +224,16 @@ function Store::clear%(h: opaque of Store::Handle%): bool return new Val(true, TYPE_BOOL); %} +## Increment an integer value in a data store. +## +## h: the handle of the store to modify. +## +## k: the key whose associated value is to be modified. +## +## by: the amount to increment the value by. A non-existent key will first +## create it with an implicit value of zero before incrementing. +## +## Returns: false if the store handle was not valid. function Store::increment%(h: opaque of Store::Handle, k: Comm::Data, by: int &default = +1%): bool %{ @@ -170,6 +247,16 @@ function Store::increment%(h: opaque of Store::Handle, return new Val(true, TYPE_BOOL); %} +## Decrement an integer value in a data store. +## +## h: the handle of the store to modify. +## +## k: the key whose associated value is to be modified. +## +## by: the amount to decrement the value by. A non-existent key will first +## create it with an implicit value of zero before decrementing. +## +## Returns: false if the store handle was not valid. function Store::decrement%(h: opaque of Store::Handle, k: Comm::Data, by: int &default = +1%): bool %{ @@ -183,6 +270,16 @@ function Store::decrement%(h: opaque of Store::Handle, return new Val(true, TYPE_BOOL); %} +## Add an element to a set value in a data store. +## +## h: the handle of the store to modify. +## +## k: the key whose associated value is to be modified. +## +## element: the element to add to the set. A non-existent key will first +## create it with an implicit empty set value before modifying. +## +## Returns: false if the store handle was not valid. function Store::add_to_set%(h: opaque of Store::Handle, k: Comm::Data, element: Comm::Data%): bool %{ @@ -197,6 +294,16 @@ function Store::add_to_set%(h: opaque of Store::Handle, return new Val(true, TYPE_BOOL); %} +## Remove an element from a set value in a data store. +## +## h: the handle of the store to modify. +## +## k: the key whose associated value is to be modified. +## +## element: the element to remove from the set. A non-existent key will +## implicitly create an empty set value associated with the key. +## +## Returns: false if the store handle was not valid. function Store::remove_from_set%(h: opaque of Store::Handle, k: Comm::Data, element: Comm::Data%): bool %{ @@ -211,6 +318,16 @@ function Store::remove_from_set%(h: opaque of Store::Handle, return new Val(true, TYPE_BOOL); %} +## Add a new item to the head of a vector value in a data store. +## +## h: the handle of store to modify. +## +## k: the key whose associated value is to be modified. +## +## item: the element to insert in to the vector. A non-existent key will first +## create empty vector value before modifying. +## +## Returns: the handle of store to modify. function Store::push_left%(h: opaque of Store::Handle, k: Comm::Data, items: Comm::DataVector%): bool %{ @@ -234,6 +351,16 @@ function Store::push_left%(h: opaque of Store::Handle, k: Comm::Data, return new Val(true, TYPE_BOOL); %} +## Add a new item to the tail of a vector value in a data store. +## +## h: the handle of store to modify. +## +## k: the key whose associated value is to be modified. +## +## item: the element to insert in to the vector. A non-existent key will first +## create empty vector value before modifying. +## +## Returns: the handle of store to modify. function Store::push_right%(h: opaque of Store::Handle, k: Comm::Data, items: Comm::DataVector%): bool %{ @@ -297,20 +424,23 @@ static bool prepare_for_query(Val* opaque, Frame* frame, *cb = new comm::StoreQueryCallback(trigger, frame->GetCall(), (*handle)->store->id(), (*handle)->store_type); - comm_mgr->TrackStoreQuery(*cb); + comm_mgr->TrackStoreQuery(*cb); return true; } %%} +## Pop the head of a data store vector value. +## +## h: the handle of the store to query. +## +## k: the key associated with the vector to modify. +## +## Returns: the result of the query. function Store::pop_left%(h: opaque of Store::Handle, k: Comm::Data%): Store::QueryResult %{ - double timeout; - comm::StoreQueryCallback* cb; - comm::StoreHandleVal* handle; - - if ( ! prepare_for_query(h, frame, &handle, &timeout, &cb) ) + if ( ! comm_mgr->Enabled() ) return comm::query_result(); Val* key = k->AsRecordVal()->Lookup(0); @@ -318,19 +448,29 @@ function Store::pop_left%(h: opaque of Store::Handle, if ( ! key ) return comm::query_result(); + double timeout; + comm::StoreQueryCallback* cb; + comm::StoreHandleVal* handle; + + if ( ! prepare_for_query(h, frame, &handle, &timeout, &cb) ) + return comm::query_result(); + handle->store->pop_left(static_cast(key)->data, std::chrono::duration(timeout), cb); return 0; %} +## Pop the tail of a data store vector value. +## +## h: the handle of the store to query. +## +## k: the key associated with the vector to modify. +## +## Returns: the result of the query. function Store::pop_right%(h: opaque of Store::Handle, k: Comm::Data%): Store::QueryResult %{ - double timeout; - comm::StoreQueryCallback* cb; - comm::StoreHandleVal* handle; - - if ( ! prepare_for_query(h, frame, &handle, &timeout, &cb) ) + if ( ! comm_mgr->Enabled() ) return comm::query_result(); Val* key = k->AsRecordVal()->Lookup(0); @@ -338,19 +478,29 @@ function Store::pop_right%(h: opaque of Store::Handle, if ( ! key ) return comm::query_result(); + double timeout; + comm::StoreQueryCallback* cb; + comm::StoreHandleVal* handle; + + if ( ! prepare_for_query(h, frame, &handle, &timeout, &cb) ) + return comm::query_result(); + handle->store->pop_right(static_cast(key)->data, std::chrono::duration(timeout), cb); return 0; %} +## Lookup the value associated with a key in a data store. +## +## h: the handle of the store to query. +## +## k: the key to lookup. +## +## Returns: the result of the query. function Store::lookup%(h: opaque of Store::Handle, k: Comm::Data%): Store::QueryResult %{ - double timeout; - comm::StoreQueryCallback* cb; - comm::StoreHandleVal* handle; - - if ( ! prepare_for_query(h, frame, &handle, &timeout, &cb) ) + if ( ! comm_mgr->Enabled() ) return comm::query_result(); Val* key = k->AsRecordVal()->Lookup(0); @@ -358,19 +508,29 @@ function Store::lookup%(h: opaque of Store::Handle, if ( ! key ) return comm::query_result(); + double timeout; + comm::StoreQueryCallback* cb; + comm::StoreHandleVal* handle; + + if ( ! prepare_for_query(h, frame, &handle, &timeout, &cb) ) + return comm::query_result(); + handle->store->lookup(static_cast(key)->data, std::chrono::duration(timeout), cb); return 0; %} +## Check if a data store contains a given key. +## +## h: the handle of the store to query. +## +## k: the key to check for existence. +## +## Returns: the result of the query (uses :bro:see:`Comm::BOOL`). function Store::exists%(h: opaque of Store::Handle, k: Comm::Data%): Store::QueryResult %{ - double timeout; - comm::StoreQueryCallback* cb; - comm::StoreHandleVal* handle; - - if ( ! prepare_for_query(h, frame, &handle, &timeout, &cb) ) + if ( ! comm_mgr->Enabled() ) return comm::query_result(); Val* key = k->AsRecordVal()->Lookup(0); @@ -378,11 +538,23 @@ function Store::exists%(h: opaque of Store::Handle, if ( ! key ) return comm::query_result(); + double timeout; + comm::StoreQueryCallback* cb; + comm::StoreHandleVal* handle; + + if ( ! prepare_for_query(h, frame, &handle, &timeout, &cb) ) + return comm::query_result(); + handle->store->exists(static_cast(key)->data, std::chrono::duration(timeout), cb); return 0; %} +## Retrieve all keys in a data store. +## +## h: the handle of the store to query. +## +## Returns: the result of the query (uses :bro:see:`Comm::VECTOR`). function Store::keys%(h: opaque of Store::Handle%): Store::QueryResult %{ double timeout; @@ -396,8 +568,16 @@ function Store::keys%(h: opaque of Store::Handle%): Store::QueryResult return 0; %} +## Get the number of key-value pairs in a data store. +## +## h: the handle of the store to query. +## +## Returns: the result of the query (uses :bro:see:`Comm::COUNT`). function Store::size%(h: opaque of Store::Handle%): Store::QueryResult %{ + if ( ! comm_mgr->Enabled() ) + return comm::query_result(); + double timeout; comm::StoreQueryCallback* cb; comm::StoreHandleVal* handle; diff --git a/src/logging/Manager.h b/src/logging/Manager.h index 8130a1ddd4..5d3372fb9b 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -158,12 +158,30 @@ public: void Terminate(); #ifdef ENABLE_BROKER + /** + * Enable remote logs for a given stream. + * @param stream_id the stream to enable remote logs for. + * @param flags tune behavior of how log entries are sent to peer endpoints. + * @return true if remote logs are enabled. + */ bool EnableRemoteLogs(EnumVal* stream_id, int flags); + /** + * Disable remote logs for a given stream. + * @param stream_id the stream to disable remote logs for. + * @return true if remote logs are disabled. + */ bool DisableRemoteLogs(EnumVal* stream_id); + /** + * @return true if remote logs are enabled for a given stream. + */ bool RemoteLogsAreEnabled(EnumVal* stream_id); + /** + * @return the type which corresponds to the columns in a log entry for + * a given log stream. + */ RecordType* StreamColumns(EnumVal* stream_id); #endif