mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
cluster/zeromq: Comments and move lookups to InitPostScript()
This commit is contained in:
parent
85d5dda028
commit
5de9296c77
2 changed files with 26 additions and 20 deletions
|
@ -124,6 +124,14 @@ void ZeroMQBackend::DoInitPostScript() {
|
||||||
xpub_sndbuf = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::xpub_sndbuf")->AsInt());
|
xpub_sndbuf = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::xpub_sndbuf")->AsInt());
|
||||||
xsub_rcvhwm = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::xsub_rcvhwm")->AsInt());
|
xsub_rcvhwm = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::xsub_rcvhwm")->AsInt());
|
||||||
xsub_rcvbuf = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::xsub_rcvbuf")->AsInt());
|
xsub_rcvbuf = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::xsub_rcvbuf")->AsInt());
|
||||||
|
|
||||||
|
// log push/pull socket configuration
|
||||||
|
log_immediate =
|
||||||
|
static_cast<int>(zeek::id::find_val<zeek::BoolVal>("Cluster::Backend::ZeroMQ::log_immediate")->AsBool());
|
||||||
|
log_sndhwm = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_sndhwm")->AsInt());
|
||||||
|
log_sndbuf = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_sndbuf")->AsInt());
|
||||||
|
log_rcvhwm = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_rcvhwm")->AsInt());
|
||||||
|
log_rcvbuf = static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_rcvbuf")->AsInt());
|
||||||
}
|
}
|
||||||
|
|
||||||
void ZeroMQBackend::DoTerminate() {
|
void ZeroMQBackend::DoTerminate() {
|
||||||
|
@ -206,22 +214,6 @@ bool ZeroMQBackend::DoInit() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
auto log_immediate =
|
|
||||||
static_cast<int>(zeek::id::find_val<zeek::BoolVal>("Cluster::Backend::ZeroMQ::log_immediate")->AsBool());
|
|
||||||
|
|
||||||
auto log_sndhwm =
|
|
||||||
static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_sndhwm")->AsInt());
|
|
||||||
|
|
||||||
auto log_sndbuf =
|
|
||||||
static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_sndbuf")->AsInt());
|
|
||||||
|
|
||||||
auto log_rcvhwm =
|
|
||||||
static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_rcvhwm")->AsInt());
|
|
||||||
|
|
||||||
auto log_rcvbuf =
|
|
||||||
static_cast<int>(zeek::id::find_val<zeek::IntVal>("Cluster::Backend::ZeroMQ::log_rcvbuf")->AsInt());
|
|
||||||
|
|
||||||
ZEROMQ_DEBUG("Setting log_sndhwm=%d log_sndbuf=%d log_rcvhwm=%d log_rcvbuf=%d linger_ms=%d", log_sndhwm, log_sndbuf,
|
ZEROMQ_DEBUG("Setting log_sndhwm=%d log_sndbuf=%d log_rcvhwm=%d log_rcvbuf=%d linger_ms=%d", log_sndhwm, log_sndbuf,
|
||||||
log_rcvhwm, log_rcvbuf, linger_ms);
|
log_rcvhwm, log_rcvbuf, linger_ms);
|
||||||
|
|
||||||
|
@ -244,6 +236,11 @@ bool ZeroMQBackend::DoInit() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The connect_log_endpoints variable may be modified by zeek_init(), so
|
||||||
|
// need to look it up here rather than during DoInitPostScript().
|
||||||
|
//
|
||||||
|
// We should've probably introduced a configuration record similar to the
|
||||||
|
// storage framework, too. Hmm. Maybe in the future.
|
||||||
const auto& log_endpoints = zeek::id::find_val<zeek::VectorVal>("Cluster::Backend::ZeroMQ::connect_log_endpoints");
|
const auto& log_endpoints = zeek::id::find_val<zeek::VectorVal>("Cluster::Backend::ZeroMQ::connect_log_endpoints");
|
||||||
for ( unsigned int i = 0; i < log_endpoints->Size(); i++ )
|
for ( unsigned int i = 0; i < log_endpoints->Size(); i++ )
|
||||||
connect_log_endpoints.push_back(log_endpoints->StringValAt(i)->ToStdString());
|
connect_log_endpoints.push_back(log_endpoints->StringValAt(i)->ToStdString());
|
||||||
|
@ -313,9 +310,11 @@ bool ZeroMQBackend::DoPublishEvent(const std::string& topic, const std::string&
|
||||||
if ( i < parts.size() - 1 )
|
if ( i < parts.size() - 1 )
|
||||||
flags = flags | zmq::send_flags::sndmore;
|
flags = flags | zmq::send_flags::sndmore;
|
||||||
|
|
||||||
// This should never fail, it will instead block
|
// This never returns EAGAIN. A pair socket blocks whenever the hwm
|
||||||
// when HWM is reached. I guess we need to see if
|
// is reached, regardless of passing any dontwait flag.
|
||||||
// and how this can happen :-/
|
//
|
||||||
|
// This can result in blocking on Cluster::publish() if the inner
|
||||||
|
// thread does not consume from child_inproc.
|
||||||
try {
|
try {
|
||||||
main_inproc.send(parts[i], flags);
|
main_inproc.send(parts[i], flags);
|
||||||
} catch ( const zmq::error_t& err ) {
|
} catch ( const zmq::error_t& err ) {
|
||||||
|
@ -512,7 +511,7 @@ void ZeroMQBackend::HandleInprocMessages(std::vector<MultipartMessage>& msgs) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
ZEROMQ_THREAD_PRINTF("inproc: error: expected 1 or 4 parts, have %zu!\n", msg.size());
|
ZEROMQ_THREAD_PRINTF("inproc: error: expected 2 or 4 parts, have %zu!\n", msg.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -105,6 +105,13 @@ private:
|
||||||
int xsub_rcvhwm = 1000; // libzmq default
|
int xsub_rcvhwm = 1000; // libzmq default
|
||||||
int xsub_rcvbuf = -1; // OS defaults
|
int xsub_rcvbuf = -1; // OS defaults
|
||||||
|
|
||||||
|
// log socket configuration
|
||||||
|
int log_immediate = false; // libzmq default
|
||||||
|
int log_sndhwm = 1000; // libzmq default
|
||||||
|
int log_sndbuf = -1; // OS defaults
|
||||||
|
int log_rcvhwm = 1000; // libzmq defaults
|
||||||
|
int log_rcvbuf = -1; // OS defaults
|
||||||
|
|
||||||
zmq::context_t ctx;
|
zmq::context_t ctx;
|
||||||
zmq::socket_t xsub;
|
zmq::socket_t xsub;
|
||||||
zmq::socket_t xpub;
|
zmq::socket_t xpub;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue