This commit is contained in:
Arne Welzel 2025-09-25 16:38:10 -07:00 committed by GitHub
commit 670fa055e2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 48 additions and 15 deletions

View file

@ -106,17 +106,25 @@ bool WriterBackend::WriterInfo::FromBroker(broker::data d) {
return true;
}
WriterBackend::WriterBackend(WriterFrontend* arg_frontend) : MsgThread() {
// Remove in v9.1
WriterBackend::WriterBackend(WriterFrontend* arg_frontend, bool arg_send_heartbeats) : MsgThread() {
num_fields = 0;
fields = nullptr;
buffering = true;
frontend = arg_frontend;
info = new WriterInfo(frontend->Info());
rotation_counter = 0;
send_heartbeats = arg_send_heartbeats;
SetName(frontend->Name());
}
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
WriterBackend::WriterBackend(NoThreadingHeartbeats, WriterFrontend* arg_frontend)
: WriterBackend(arg_frontend, false) {}
#pragma GCC diagnostic pop
WriterBackend::~WriterBackend() {
if ( fields ) {
for ( int i = 0; i < num_fields; ++i )
@ -283,6 +291,13 @@ bool WriterBackend::Flush(double network_time) {
return true;
}
void WriterBackend::Heartbeat() {
if ( ! send_heartbeats )
return;
MsgThread::Heartbeat();
}
bool WriterBackend::OnFinish(double network_time) {
if ( Failed() )
return true;

View file

@ -38,9 +38,28 @@ public:
* frontend, it's running in a different thread.
*
* @param name A descriptive name for writer's type (e.g., \c Ascii).
*
*/
explicit WriterBackend(WriterFrontend* frontend);
[[deprecated(
"Remove in v9.1: Use WriterBackend(NoThreadingHeartbeats{}, frontend). If you have a use for heartbeats in "
"your WriterBackend, trigger these via a dedicated timer instead.")]]
explicit WriterBackend(WriterFrontend* frontend, bool send_heartbeats = true);
/**
* Marker struct to disable threading heartbeats.
*/
struct NoThreadingHeartbeats {};
/**
* Constructor that disables heartbeats.
*
* @param frontend The frontend writer that created this backend. The
* *only* purpose of this value is to be passed back via messages as
* a argument to callbacks. One must not otherwise access the
* frontend, it's running in a different thread.
*
* @param name A descriptive name for writer's type (e.g., \c Ascii).
*/
WriterBackend(NoThreadingHeartbeats, WriterFrontend* frontend);
/**
* Destructor.
@ -262,6 +281,11 @@ public:
protected:
friend class FinishMessage;
/**
* Overridden from MsgThread to skip sending heartbeats if disabled.
*/
void Heartbeat() override;
/**
* Writer-specific initialization method.
*
@ -371,7 +395,7 @@ protected:
* This method can be overridden. Default implementation does
* nothing.
*/
virtual bool DoHeartbeat(double network_time, double current_time) = 0;
virtual bool DoHeartbeat(double network_time, double current_time) { return true; }
private:
/**
@ -389,6 +413,8 @@ private:
bool buffering; // True if buffering is enabled.
int rotation_counter; // Tracks FinishedRotation() calls.
bool send_heartbeats;
};
} // namespace zeek::logging

View file

@ -205,7 +205,7 @@ static std::optional<LeftoverLog> parse_shadow_log(const std::string& fname) {
return rval;
}
Ascii::Ascii(WriterFrontend* frontend) : WriterBackend(frontend) {
Ascii::Ascii(WriterFrontend* frontend) : WriterBackend(NoThreadingHeartbeats{}, frontend) {
fd = 0;
ascii_done = false;
output_to_stdout = false;
@ -664,11 +664,6 @@ bool Ascii::DoSetBuf(bool enabled) {
return true;
}
bool Ascii::DoHeartbeat(double network_time, double current_time) {
// Nothing to do.
return true;
}
static std::vector<LeftoverLog> find_leftover_logs() {
std::vector<LeftoverLog> rval;
std::vector<std::string> stale_shadow_files;

View file

@ -33,7 +33,6 @@ protected:
bool DoRotate(const char* rotated_path, double open, double close, bool terminating) override;
bool DoFlush(double network_time) override;
bool DoFinish(double network_time) override;
bool DoHeartbeat(double network_time, double current_time) override;
private:
friend class plugin::detail::Zeek_AsciiWriter::Plugin;

View file

@ -10,7 +10,7 @@ namespace zeek::logging::writer::detail {
class None : public WriterBackend {
public:
explicit None(WriterFrontend* frontend) : WriterBackend(frontend) {}
explicit None(WriterFrontend* frontend) : WriterBackend(NoThreadingHeartbeats{}, frontend) {}
~None() override {};
static WriterBackend* Instantiate(WriterFrontend* frontend) { return new None(frontend); }
@ -24,7 +24,6 @@ protected:
bool DoRotate(const char* rotated_path, double open, double close, bool terminating) override;
bool DoFlush(double network_time) override { return true; }
bool DoFinish(double network_time) override { return true; }
bool DoHeartbeat(double network_time, double current_time) override { return true; }
};
} // namespace zeek::logging::writer::detail

View file

@ -17,7 +17,7 @@ using zeek::threading::Value;
namespace zeek::logging::writer::detail {
SQLite::SQLite(WriterFrontend* frontend) : WriterBackend(frontend) {
SQLite::SQLite(WriterFrontend* frontend) : WriterBackend(NoThreadingHeartbeats{}, frontend) {
set_separator.assign((const char*)BifConst::LogSQLite::set_separator->Bytes(),
BifConst::LogSQLite::set_separator->Len());

View file

@ -24,7 +24,6 @@ protected:
bool DoRotate(const char* rotated_path, double open, double close, bool terminating) override;
bool DoFlush(double network_time) override { return true; }
bool DoFinish(double network_time) override { return true; }
bool DoHeartbeat(double network_time, double current_time) override { return true; }
private:
bool checkError(int code);