mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 06:38:20 +00:00
Merge remote-tracking branch 'origin/master' into topic/johanna/remove-serializer
This commit is contained in:
commit
a50b06d6c1
543 changed files with 6954 additions and 6478 deletions
|
@ -84,17 +84,17 @@ struct scoped_reporter_location {
|
|||
};
|
||||
|
||||
#ifdef DEBUG
|
||||
static std::string RenderMessage(std::string topic, broker::data x)
|
||||
static std::string RenderMessage(std::string topic, const broker::data& x)
|
||||
{
|
||||
return fmt("%s -> %s", broker::to_string(x).c_str(), topic.c_str());
|
||||
}
|
||||
|
||||
static std::string RenderEvent(std::string topic, std::string name, broker::data args)
|
||||
static std::string RenderEvent(std::string topic, std::string name, const broker::data& args)
|
||||
{
|
||||
return fmt("%s(%s) -> %s", name.c_str(), broker::to_string(args).c_str(), topic.c_str());
|
||||
}
|
||||
|
||||
static std::string RenderMessage(broker::store::response x)
|
||||
static std::string RenderMessage(const broker::store::response& x)
|
||||
{
|
||||
return fmt("%s [id %" PRIu64 "]", (x.answer ? broker::to_string(*x.answer).c_str() : "<no answer>"), x.id);
|
||||
}
|
||||
|
@ -178,7 +178,7 @@ void Manager::InitPostScript()
|
|||
|
||||
BrokerConfig config{std::move(options)};
|
||||
|
||||
auto max_threads_env = getenv("BRO_BROKER_MAX_THREADS");
|
||||
auto max_threads_env = zeekenv("ZEEK_BROKER_MAX_THREADS");
|
||||
|
||||
if ( max_threads_env )
|
||||
config.set("scheduler.max-threads", atoi(max_threads_env));
|
||||
|
@ -304,7 +304,7 @@ void Manager::Peer(const string& addr, uint16_t port, double retry)
|
|||
DBG_LOG(DBG_BROKER, "Starting to peer with %s:%" PRIu16,
|
||||
addr.c_str(), port);
|
||||
|
||||
auto e = getenv("BRO_DEFAULT_CONNECT_RETRY");
|
||||
auto e = zeekenv("ZEEK_DEFAULT_CONNECT_RETRY");
|
||||
|
||||
if ( e )
|
||||
retry = atoi(e);
|
||||
|
@ -359,7 +359,7 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args)
|
|||
DBG_LOG(DBG_BROKER, "Publishing event: %s",
|
||||
RenderEvent(topic, name, args).c_str());
|
||||
broker::zeek::Event ev(std::move(name), std::move(args));
|
||||
bstate->endpoint.publish(move(topic), std::move(ev));
|
||||
bstate->endpoint.publish(move(topic), ev.move_data());
|
||||
++statistics.num_events_outgoing;
|
||||
return true;
|
||||
}
|
||||
|
@ -421,8 +421,8 @@ bool Manager::PublishIdentifier(std::string topic, std::string id)
|
|||
|
||||
broker::zeek::IdentifierUpdate msg(move(id), move(*data));
|
||||
DBG_LOG(DBG_BROKER, "Publishing id-update: %s",
|
||||
RenderMessage(topic, msg).c_str());
|
||||
bstate->endpoint.publish(move(topic), move(msg));
|
||||
RenderMessage(topic, msg.as_data()).c_str());
|
||||
bstate->endpoint.publish(move(topic), msg.move_data());
|
||||
++statistics.num_ids_outgoing;
|
||||
return true;
|
||||
}
|
||||
|
@ -472,14 +472,14 @@ bool Manager::PublishLogCreate(EnumVal* stream, EnumVal* writer,
|
|||
auto bwriter_id = broker::enum_value(move(writer_id));
|
||||
broker::zeek::LogCreate msg(move(bstream_id), move(bwriter_id), move(writer_info), move(fields_data));
|
||||
|
||||
DBG_LOG(DBG_BROKER, "Publishing log creation: %s", RenderMessage(topic, msg).c_str());
|
||||
DBG_LOG(DBG_BROKER, "Publishing log creation: %s", RenderMessage(topic, msg.as_data()).c_str());
|
||||
|
||||
if ( peer.node != NoPeer.node )
|
||||
// Direct message.
|
||||
bstate->endpoint.publish(peer, move(topic), move(msg));
|
||||
bstate->endpoint.publish(peer, move(topic), msg.move_data());
|
||||
else
|
||||
// Broadcast.
|
||||
bstate->endpoint.publish(move(topic), move(msg));
|
||||
bstate->endpoint.publish(move(topic), msg.move_data());
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -561,7 +561,7 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int
|
|||
broker::zeek::LogWrite msg(move(bstream_id), move(bwriter_id), move(path),
|
||||
move(serial_data));
|
||||
|
||||
DBG_LOG(DBG_BROKER, "Buffering log record: %s", RenderMessage(topic, msg).c_str());
|
||||
DBG_LOG(DBG_BROKER, "Buffering log record: %s", RenderMessage(topic, msg.as_data()).c_str());
|
||||
|
||||
if ( log_buffers.size() <= (unsigned int)stream_id_num )
|
||||
log_buffers.resize(stream_id_num + 1);
|
||||
|
@ -569,7 +569,7 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int
|
|||
auto& lb = log_buffers[stream_id_num];
|
||||
++lb.message_count;
|
||||
auto& pending_batch = lb.msgs[topic];
|
||||
pending_batch.emplace_back(std::move(msg));
|
||||
pending_batch.emplace_back(msg.move_data());
|
||||
|
||||
if ( lb.message_count >= log_batch_size ||
|
||||
(network_time - lb.last_flush >= log_batch_interval ) )
|
||||
|
@ -595,7 +595,7 @@ size_t Manager::LogBuffer::Flush(broker::endpoint& endpoint, size_t log_batch_si
|
|||
batch.reserve(log_batch_size + 1);
|
||||
pending_batch.swap(batch);
|
||||
broker::zeek::Batch msg(std::move(batch));
|
||||
endpoint.publish(topic, move(msg));
|
||||
endpoint.publish(topic, msg.move_data());
|
||||
}
|
||||
|
||||
auto rval = message_count;
|
||||
|
@ -773,6 +773,15 @@ bool Manager::Subscribe(const string& topic_prefix)
|
|||
{
|
||||
DBG_LOG(DBG_BROKER, "Subscribing to topic prefix %s", topic_prefix.c_str());
|
||||
bstate->subscriber.add_topic(topic_prefix, ! after_zeek_init);
|
||||
|
||||
// For backward compatibility, we also may receive messages on
|
||||
// "bro/" topic prefixes in addition to "zeek/".
|
||||
if ( strncmp(topic_prefix.data(), "zeek/", 5) == 0 )
|
||||
{
|
||||
std::string alt_topic = "bro/" + topic_prefix.substr(5);
|
||||
bstate->subscriber.add_topic(std::move(alt_topic), ! after_zeek_init);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -806,15 +815,8 @@ bool Manager::Unsubscribe(const string& topic_prefix)
|
|||
void Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
|
||||
iosource::FD_Set* except)
|
||||
{
|
||||
if ( bstate->status_subscriber.available() || bstate->subscriber.available() )
|
||||
SetIdle(false);
|
||||
|
||||
read->Insert(bstate->subscriber.fd());
|
||||
read->Insert(bstate->status_subscriber.fd());
|
||||
write->Insert(bstate->subscriber.fd());
|
||||
write->Insert(bstate->status_subscriber.fd());
|
||||
except->Insert(bstate->subscriber.fd());
|
||||
except->Insert(bstate->status_subscriber.fd());
|
||||
|
||||
for ( auto& x : data_stores )
|
||||
read->Insert(x.second->proxy.mailbox().descriptor());
|
||||
|
@ -822,19 +824,10 @@ void Manager::GetFds(iosource::FD_Set* read, iosource::FD_Set* write,
|
|||
|
||||
double Manager::NextTimestamp(double* local_network_time)
|
||||
{
|
||||
if ( ! IsIdle() )
|
||||
return timer_mgr->Time();
|
||||
|
||||
if ( bstate->status_subscriber.available() || bstate->subscriber.available() )
|
||||
return timer_mgr->Time();
|
||||
|
||||
for ( auto& s : data_stores )
|
||||
{
|
||||
if ( ! s.second->proxy.mailbox().empty() )
|
||||
return timer_mgr->Time();
|
||||
}
|
||||
|
||||
return -1;
|
||||
// We're only asked for a timestamp if either (1) a FD was ready
|
||||
// or (2) we're not idle (and we go idle if when Process is no-op),
|
||||
// so there's no case where returning -1 to signify a skip will help.
|
||||
return timer_mgr->Time();
|
||||
}
|
||||
|
||||
void Manager::DispatchMessage(const broker::topic& topic, broker::data msg)
|
||||
|
@ -934,11 +927,15 @@ void Manager::Process()
|
|||
|
||||
for ( auto& s : data_stores )
|
||||
{
|
||||
while ( ! s.second->proxy.mailbox().empty() )
|
||||
auto num_available = s.second->proxy.mailbox().size();
|
||||
|
||||
if ( num_available > 0 )
|
||||
{
|
||||
had_input = true;
|
||||
auto response = s.second->proxy.receive();
|
||||
ProcessStoreResponse(s.second, move(response));
|
||||
auto responses = s.second->proxy.receive(num_available);
|
||||
|
||||
for ( auto& r : responses )
|
||||
ProcessStoreResponse(s.second, move(r));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -976,7 +973,7 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev)
|
|||
if ( ! ev.valid() )
|
||||
{
|
||||
reporter->Warning("received invalid broker Event: %s",
|
||||
broker::to_string(ev).data());
|
||||
broker::to_string(ev.as_data()).data());
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1049,7 +1046,7 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev)
|
|||
|
||||
bool bro_broker::Manager::ProcessLogCreate(broker::zeek::LogCreate lc)
|
||||
{
|
||||
DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc).c_str());
|
||||
DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc.as_data()).c_str());
|
||||
if ( ! lc.valid() )
|
||||
{
|
||||
reporter->Warning("received invalid broker LogCreate: %s",
|
||||
|
@ -1119,7 +1116,7 @@ bool bro_broker::Manager::ProcessLogCreate(broker::zeek::LogCreate lc)
|
|||
|
||||
bool bro_broker::Manager::ProcessLogWrite(broker::zeek::LogWrite lw)
|
||||
{
|
||||
DBG_LOG(DBG_BROKER, "Received log-write: %s", RenderMessage(lw).c_str());
|
||||
DBG_LOG(DBG_BROKER, "Received log-write: %s", RenderMessage(lw.as_data()).c_str());
|
||||
|
||||
if ( ! lw.valid() )
|
||||
{
|
||||
|
@ -1206,7 +1203,7 @@ bool bro_broker::Manager::ProcessLogWrite(broker::zeek::LogWrite lw)
|
|||
|
||||
bool Manager::ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu)
|
||||
{
|
||||
DBG_LOG(DBG_BROKER, "Received id-update: %s", RenderMessage(iu).c_str());
|
||||
DBG_LOG(DBG_BROKER, "Received id-update: %s", RenderMessage(iu.as_data()).c_str());
|
||||
|
||||
if ( ! iu.valid() )
|
||||
{
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue