Merge remote-tracking branch 'origin/topic/dnthayer/gh-359'

* origin/topic/dnthayer/gh-359:
  Changes needed due to bro-to-zeek renaming in broker
This commit is contained in:
Jon Siwek 2019-05-15 15:03:11 -07:00
commit bf42dd02bd
6 changed files with 30 additions and 26 deletions

View file

@ -1,4 +1,8 @@
2.6-303 | 2019-05-15 15:03:11 -0700
* Changes needed due to bro-to-zeek renaming in broker (Daniel Thayer)
2.6-301 | 2019-05-15 10:05:53 -0700 2.6-301 | 2019-05-15 10:05:53 -0700
* Fix potential race in openflow broker plugin (Jon Siwek, Corelight) * Fix potential race in openflow broker plugin (Jon Siwek, Corelight)

View file

@ -1 +1 @@
2.6-301 2.6-303

@ -1 +1 @@
Subproject commit bef5e1460a8814b47a3cfc8073eb4be2479e74f6 Subproject commit 5e3684f1b69a282b831c9d1b72ed924b510f22f0

@ -1 +1 @@
Subproject commit 9ca96b333c1a2df8992a5a6e208707acc28eb9c2 Subproject commit bbfcb91b077a8bc145e39d7c941c50ba62826070

View file

@ -1,6 +1,6 @@
#include <broker/broker.hh> #include <broker/broker.hh>
#include <broker/bro.hh> #include <broker/zeek.hh>
#include <cstdio> #include <cstdio>
#include <cstring> #include <cstring>
#include <unistd.h> #include <unistd.h>
@ -357,7 +357,7 @@ bool Manager::PublishEvent(string topic, std::string name, broker::vector args)
DBG_LOG(DBG_BROKER, "Publishing event: %s", DBG_LOG(DBG_BROKER, "Publishing event: %s",
RenderEvent(topic, name, args).c_str()); RenderEvent(topic, name, args).c_str());
broker::bro::Event ev(std::move(name), std::move(args)); broker::zeek::Event ev(std::move(name), std::move(args));
bstate->endpoint.publish(move(topic), std::move(ev)); bstate->endpoint.publish(move(topic), std::move(ev));
++statistics.num_events_outgoing; ++statistics.num_events_outgoing;
return true; return true;
@ -418,7 +418,7 @@ bool Manager::PublishIdentifier(std::string topic, std::string id)
return false; return false;
} }
broker::bro::IdentifierUpdate msg(move(id), move(*data)); broker::zeek::IdentifierUpdate msg(move(id), move(*data));
DBG_LOG(DBG_BROKER, "Publishing id-update: %s", DBG_LOG(DBG_BROKER, "Publishing id-update: %s",
RenderMessage(topic, msg).c_str()); RenderMessage(topic, msg).c_str());
bstate->endpoint.publish(move(topic), move(msg)); bstate->endpoint.publish(move(topic), move(msg));
@ -469,7 +469,7 @@ bool Manager::PublishLogCreate(EnumVal* stream, EnumVal* writer,
std::string topic = default_log_topic_prefix + stream_id; std::string topic = default_log_topic_prefix + stream_id;
auto bstream_id = broker::enum_value(move(stream_id)); auto bstream_id = broker::enum_value(move(stream_id));
auto bwriter_id = broker::enum_value(move(writer_id)); auto bwriter_id = broker::enum_value(move(writer_id));
broker::bro::LogCreate msg(move(bstream_id), move(bwriter_id), move(writer_info), move(fields_data)); broker::zeek::LogCreate msg(move(bstream_id), move(bwriter_id), move(writer_info), move(fields_data));
DBG_LOG(DBG_BROKER, "Publishing log creation: %s", RenderMessage(topic, msg).c_str()); DBG_LOG(DBG_BROKER, "Publishing log creation: %s", RenderMessage(topic, msg).c_str());
@ -557,7 +557,7 @@ bool Manager::PublishLogWrite(EnumVal* stream, EnumVal* writer, string path, int
auto bstream_id = broker::enum_value(move(stream_id)); auto bstream_id = broker::enum_value(move(stream_id));
auto bwriter_id = broker::enum_value(move(writer_id)); auto bwriter_id = broker::enum_value(move(writer_id));
broker::bro::LogWrite msg(move(bstream_id), move(bwriter_id), move(path), broker::zeek::LogWrite msg(move(bstream_id), move(bwriter_id), move(path),
move(serial_data)); move(serial_data));
DBG_LOG(DBG_BROKER, "Buffering log record: %s", RenderMessage(topic, msg).c_str()); DBG_LOG(DBG_BROKER, "Buffering log record: %s", RenderMessage(topic, msg).c_str());
@ -593,7 +593,7 @@ size_t Manager::LogBuffer::Flush(broker::endpoint& endpoint, size_t log_batch_si
broker::vector batch; broker::vector batch;
batch.reserve(log_batch_size + 1); batch.reserve(log_batch_size + 1);
pending_batch.swap(batch); pending_batch.swap(batch);
broker::bro::Batch msg(std::move(batch)); broker::zeek::Batch msg(std::move(batch));
endpoint.publish(topic, move(msg)); endpoint.publish(topic, move(msg));
} }
@ -838,31 +838,31 @@ double Manager::NextTimestamp(double* local_network_time)
void Manager::DispatchMessage(const broker::topic& topic, broker::data msg) void Manager::DispatchMessage(const broker::topic& topic, broker::data msg)
{ {
switch ( broker::bro::Message::type(msg) ) { switch ( broker::zeek::Message::type(msg) ) {
case broker::bro::Message::Type::Invalid: case broker::zeek::Message::Type::Invalid:
reporter->Warning("received invalid broker message: %s", reporter->Warning("received invalid broker message: %s",
broker::to_string(msg).data()); broker::to_string(msg).data());
break; break;
case broker::bro::Message::Type::Event: case broker::zeek::Message::Type::Event:
ProcessEvent(topic, std::move(msg)); ProcessEvent(topic, std::move(msg));
break; break;
case broker::bro::Message::Type::LogCreate: case broker::zeek::Message::Type::LogCreate:
ProcessLogCreate(std::move(msg)); ProcessLogCreate(std::move(msg));
break; break;
case broker::bro::Message::Type::LogWrite: case broker::zeek::Message::Type::LogWrite:
ProcessLogWrite(std::move(msg)); ProcessLogWrite(std::move(msg));
break; break;
case broker::bro::Message::Type::IdentifierUpdate: case broker::zeek::Message::Type::IdentifierUpdate:
ProcessIdentifierUpdate(std::move(msg)); ProcessIdentifierUpdate(std::move(msg));
break; break;
case broker::bro::Message::Type::Batch: case broker::zeek::Message::Type::Batch:
{ {
broker::bro::Batch batch(std::move(msg)); broker::zeek::Batch batch(std::move(msg));
if ( ! batch.valid() ) if ( ! batch.valid() )
{ {
@ -970,7 +970,7 @@ void Manager::Process()
} }
void Manager::ProcessEvent(const broker::topic& topic, broker::bro::Event ev) void Manager::ProcessEvent(const broker::topic& topic, broker::zeek::Event ev)
{ {
if ( ! ev.valid() ) if ( ! ev.valid() )
{ {
@ -1046,7 +1046,7 @@ void Manager::ProcessEvent(const broker::topic& topic, broker::bro::Event ev)
} }
} }
bool bro_broker::Manager::ProcessLogCreate(broker::bro::LogCreate lc) bool bro_broker::Manager::ProcessLogCreate(broker::zeek::LogCreate lc)
{ {
DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc).c_str()); DBG_LOG(DBG_BROKER, "Received log-create: %s", RenderMessage(lc).c_str());
if ( ! lc.valid() ) if ( ! lc.valid() )
@ -1116,7 +1116,7 @@ bool bro_broker::Manager::ProcessLogCreate(broker::bro::LogCreate lc)
return true; return true;
} }
bool bro_broker::Manager::ProcessLogWrite(broker::bro::LogWrite lw) bool bro_broker::Manager::ProcessLogWrite(broker::zeek::LogWrite lw)
{ {
DBG_LOG(DBG_BROKER, "Received log-write: %s", RenderMessage(lw).c_str()); DBG_LOG(DBG_BROKER, "Received log-write: %s", RenderMessage(lw).c_str());
@ -1203,7 +1203,7 @@ bool bro_broker::Manager::ProcessLogWrite(broker::bro::LogWrite lw)
return true; return true;
} }
bool Manager::ProcessIdentifierUpdate(broker::bro::IdentifierUpdate iu) bool Manager::ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu)
{ {
DBG_LOG(DBG_BROKER, "Received id-update: %s", RenderMessage(iu).c_str()); DBG_LOG(DBG_BROKER, "Received id-update: %s", RenderMessage(iu).c_str());

View file

@ -2,7 +2,7 @@
#define BRO_COMM_MANAGER_H #define BRO_COMM_MANAGER_H
#include <broker/broker.hh> #include <broker/broker.hh>
#include <broker/bro.hh> #include <broker/zeek.hh>
#include <memory> #include <memory>
#include <string> #include <string>
#include <map> #include <map>
@ -324,10 +324,10 @@ public:
private: private:
void DispatchMessage(const broker::topic& topic, broker::data msg); void DispatchMessage(const broker::topic& topic, broker::data msg);
void ProcessEvent(const broker::topic& topic, broker::bro::Event ev); void ProcessEvent(const broker::topic& topic, broker::zeek::Event ev);
bool ProcessLogCreate(broker::bro::LogCreate lc); bool ProcessLogCreate(broker::zeek::LogCreate lc);
bool ProcessLogWrite(broker::bro::LogWrite lw); bool ProcessLogWrite(broker::zeek::LogWrite lw);
bool ProcessIdentifierUpdate(broker::bro::IdentifierUpdate iu); bool ProcessIdentifierUpdate(broker::zeek::IdentifierUpdate iu);
void ProcessStatus(broker::status stat); void ProcessStatus(broker::status stat);
void ProcessError(broker::error err); void ProcessError(broker::error err);
void ProcessStoreResponse(StoreHandleVal*, broker::store::response response); void ProcessStoreResponse(StoreHandleVal*, broker::store::response response);