diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index 98e3bb6ff6..4321672d8b 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -1170,6 +1170,11 @@ WriterFrontend* Manager::CreateWriter(zeek::EnumVal* id, zeek::EnumVal* writer, winfo->interval = f->interval; winfo->postprocessor = f->postprocessor; + if ( f->postprocessor ) + { + delete [] winfo->info->post_proc_func; + winfo->info->post_proc_func = copy_string(f->postprocessor->Name()); + } break; } @@ -1180,6 +1185,18 @@ WriterFrontend* Manager::CreateWriter(zeek::EnumVal* id, zeek::EnumVal* writer, const auto& id = zeek::detail::global_scope()->Find("Log::default_rotation_interval"); assert(id); winfo->interval = id->GetVal()->AsInterval(); + + if ( winfo->info->post_proc_func && + strlen(winfo->info->post_proc_func) ) + { + auto func = zeek::id::find_func(winfo->info->post_proc_func); + + if ( func ) + winfo->postprocessor = func.get(); + else + reporter->Warning("failed log postprocessor function lookup: %s\n", + winfo->info->post_proc_func); + } } stream->writers.insert( @@ -1466,23 +1483,32 @@ void Manager::InstallRotationTimer(WriterInfo* winfo) } } +std::string Manager::FormatRotationTime(time_t t) + { + struct tm tm; + char buf[128]; + const char* const date_fmt = "%y-%m-%d_%H.%M.%S"; + localtime_r(&t, &tm); + strftime(buf, sizeof(buf), date_fmt, &tm); + return buf; + } + +std::string Manager::FormatRotationPath(std::string_view path, time_t t) + { + auto rot_str = FormatRotationTime(t); + return fmt("%.*s-%s", + static_cast(path.size()), path.data(), rot_str.data()); + } + void Manager::Rotate(WriterInfo* winfo) { DBG_LOG(DBG_LOGGING, "Rotating %s at %.6f", winfo->writer->Name(), network_time); // Build a temporary path for the writer to move the file to. - struct tm tm; - char buf[128]; - const char* const date_fmt = "%y-%m-%d_%H.%M.%S"; - time_t teatime = (time_t)winfo->open_time; - - localtime_r(&teatime, &tm); - strftime(buf, sizeof(buf), date_fmt, &tm); - - // Trigger the rotation. - const char* tmp = fmt("%s-%s", winfo->writer->Info().path, buf); - winfo->writer->Rotate(tmp, winfo->open_time, network_time, terminating); + auto tmp = FormatRotationPath(winfo->writer->Info().path, + (time_t)winfo->open_time); + winfo->writer->Rotate(tmp.data(), winfo->open_time, network_time, terminating); ++rotations_pending; } diff --git a/src/logging/Manager.h b/src/logging/Manager.h index 620680938d..99d2105ecc 100644 --- a/src/logging/Manager.h +++ b/src/logging/Manager.h @@ -4,6 +4,9 @@ #pragma once +#include +#include + #include "../Val.h" #include "../Tag.h" #include "../EventHandler.h" @@ -26,6 +29,19 @@ class RotationFinishedMessage; */ class Manager : public plugin::ComponentManager { public: + + /** + * Returns a formatted string representing the given time. This + * string is used in the log file rotation process. + */ + static std::string FormatRotationTime(time_t t); + + /** + * Returns a formatted string representing the file rotation path. This + * string is used in the log file rotation process. + */ + static std::string FormatRotationPath(std::string_view path, time_t t); + /** * Constructor. */ diff --git a/src/logging/WriterBackend.cc b/src/logging/WriterBackend.cc index b3bbe8b13e..fc1e97a3f0 100644 --- a/src/logging/WriterBackend.cc +++ b/src/logging/WriterBackend.cc @@ -80,7 +80,9 @@ broker::data WriterBackend::WriterInfo::ToBroker() const t.insert(std::make_pair(key, value)); } - return broker::vector({path, rotation_base, rotation_interval, network_time, std::move(t)}); + auto bppf = post_proc_func ? post_proc_func : ""; + + return broker::vector({path, rotation_base, rotation_interval, network_time, std::move(t), bppf}); } bool WriterBackend::WriterInfo::FromBroker(broker::data d) @@ -94,11 +96,13 @@ bool WriterBackend::WriterInfo::FromBroker(broker::data d) auto brotation_interval = caf::get_if(&v[2]); auto bnetwork_time = caf::get_if(&v[3]); auto bconfig = caf::get_if(&v[4]); + auto bppf = caf::get_if(&v[5]); - if ( ! (bpath && brotation_base && brotation_interval && bnetwork_time && bconfig) ) + if ( ! (bpath && brotation_base && brotation_interval && bnetwork_time && bconfig && bppf) ) return false; path = copy_string(bpath->c_str()); + post_proc_func = copy_string(bppf->c_str()); rotation_base = *brotation_base; rotation_interval = *brotation_interval; network_time = *bnetwork_time; diff --git a/src/logging/WriterBackend.h b/src/logging/WriterBackend.h index 805b4f8b9a..14fe515dbf 100644 --- a/src/logging/WriterBackend.h +++ b/src/logging/WriterBackend.h @@ -61,6 +61,15 @@ public: */ const char* path; + /** + * The name of the postprocessor function that will be called + * upon the logging manager processing the "rotation finished" + * message. A null or empty value means "use the default function". + * + * Structure takes ownership of string. + */ + const char* post_proc_func = nullptr; + /** * The rotation interval as configured for this writer. */ @@ -90,6 +99,7 @@ public: WriterInfo(const WriterInfo& other) { path = other.path ? copy_string(other.path) : nullptr; + post_proc_func = other.post_proc_func ? copy_string(other.post_proc_func) : nullptr; rotation_interval = other.rotation_interval; rotation_base = other.rotation_base; network_time = other.network_time; @@ -101,6 +111,7 @@ public: ~WriterInfo() { delete [] path; + delete [] post_proc_func; for ( config_map::iterator i = config.begin(); i != config.end(); i++ ) { diff --git a/src/logging/writers/ascii/Ascii.cc b/src/logging/writers/ascii/Ascii.cc index 47cceeb738..94d6e7490d 100644 --- a/src/logging/writers/ascii/Ascii.cc +++ b/src/logging/writers/ascii/Ascii.cc @@ -1,10 +1,22 @@ // See the file "COPYING" in the main distribution directory for copyright. +#include +#include #include +#include +#include +#include + #include #include +#include +#include #include +#include +#include "Func.h" +#include "supervisor/Supervisor.h" +#include "logging/Manager.h" #include "threading/SerialTypes.h" #include "Ascii.h" @@ -16,6 +28,160 @@ using namespace threading; using threading::Value; using threading::Field; +static constexpr auto shadow_file_prefix = ".shadow."; + +/** + * Information about an leftover log file: that is, one that a previous + * process was in the middle of writing, but never completed a rotation + * for whatever reason (prematurely crashed/killed). + */ +struct LeftoverLog { + /* + * Name of leftover log, relative to working dir. + */ + std::string filename; + + /* + * File extension of the leftover log (e.g. ".log"). + */ + std::string extension; + + /* + * Name of shadow file associated with the log. + * The shadow file's existence is what indicates the presence of + * an "leftover log" and may contain the name of a postprocessing + * function that's supposed to be called after rotating (only + * named if that function differs from the default). Upon + * completing a rotation, the shadow file can be deleted. + */ + std::string shadow_filename; + + /** + * Name of a function to call to postprocess the log file after + * rotating. + */ + std::string post_proc_func; + + /** + * The time at which the shadow file was created. This is used + * as the log file's "opening time" for rotation purposes. + */ + time_t open_time; + + /** + * Time of the log file's last modification. This is used + * as the log file's "closing time" for rotation purposes. + */ + time_t close_time; + + /** + * Set the an error message explaining any error that happened while + * trying to parse the shadow file and construct an object. + */ + std::string error; + + /** + * Return the name of the log without file extension (this is + * what's called "path" in other logging framework parlance). + * E.g. the name of "conn.log" is just "conn". + */ + std::string Name() const + { return filename.substr(0, filename.size() - extension.size()); } + + /** + * The path to which the log file should be rotated (before + * calling any postprocessing function). + */ + std::string RotationPath() const + { return log_mgr->FormatRotationPath(Name(), open_time) + extension; } + + /** + * Performs the rename() call to rotate the file and returns whether + * it succeeded. + */ + bool Rename() const + { return rename(filename.data(), RotationPath().data()) == 0; } + + /** + * Deletes the shadow file and returns whether it succeeded. + */ + bool DeleteShadow() const + { return unlink(shadow_filename.data()) == 0; } +}; + +static std::optional parse_shadow_log(const std::string& fname) + { + char errbuf[512]; + auto sfname = shadow_file_prefix + fname; + + LeftoverLog rval; + rval.filename = fname; + rval.shadow_filename = std::move(sfname); + + auto sf_stream = fopen(rval.shadow_filename.data(), "r"); + + if ( ! sf_stream ) + { + bro_strerror_r(errno, errbuf, sizeof(errbuf)); + rval.error = "Failed to open " + rval.shadow_filename + ": " + errbuf; + return rval; + } + + fseek(sf_stream, 0, SEEK_END); + auto sf_len = ftell(sf_stream); + fseek(sf_stream, 0, SEEK_SET); + + auto sf_content = std::make_unique(sf_len); + auto bytes_read = fread(sf_content.get(), 1, sf_len, sf_stream); + + if ( bytes_read != static_cast(sf_len) ) + { + rval.error = "Failed to read contents of " + rval.shadow_filename; + return rval; + } + + fclose(sf_stream); + std::string_view sf_view(sf_content.get(), sf_len); + auto sf_lines = tokenize_string(sf_view, '\n'); + + if ( sf_lines.size() < 2 ) + { + snprintf(errbuf, sizeof(errbuf), + "Found leftover log, '%s', but the associated shadow file, " + "'%s', required to process it is invalid", + rval.filename.data(), rval.shadow_filename.data()); + rval.error = errbuf; + return rval; + } + + rval.extension = sf_lines[0]; + rval.post_proc_func = sf_lines[1]; + + struct stat st; + + // Use shadow file's modification time as creation time. + if ( stat(rval.shadow_filename.data(), &st) != 0 ) + { + bro_strerror_r(errno, errbuf, sizeof(errbuf)); + rval.error = "Failed to stat " + rval.shadow_filename + ": " + errbuf; + return rval; + } + + rval.open_time = st.st_ctime; + + // Use log file's modification time for closing time. + if ( stat(rval.filename.data(), &st) != 0 ) + { + bro_strerror_r(errno, errbuf, sizeof(errbuf)); + rval.error = "Failed to stat " + rval.filename + ": " + errbuf; + return rval; + } + + rval.close_time = st.st_mtime; + + return rval; + } + Ascii::Ascii(WriterFrontend* frontend) : WriterBackend(frontend) { fd = 0; @@ -261,12 +427,45 @@ bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const * if ( output_to_stdout ) path = "/dev/stdout"; - fname = IsSpecial(path) ? path : path + "." + LogExt(); + fname = path; - if ( gzip_level > 0 ) + if ( ! IsSpecial(fname) ) { - fname += "."; - fname += gzip_file_extension.empty() ? "gz" : gzip_file_extension; + std::string ext = "." + LogExt(); + + if ( gzip_level > 0 ) + { + ext += "."; + ext += gzip_file_extension.empty() ? "gz" : gzip_file_extension; + } + + fname += ext; + + bool use_shadow = zeek::Supervisor::ThisNode() && info.rotation_interval > 0; + + if ( use_shadow ) + { + auto sfname = shadow_file_prefix + fname; + auto sfd = open(sfname.data(), O_WRONLY | O_CREAT | O_TRUNC, 0666); + + if ( sfd < 0 ) + { + Error(Fmt("cannot open %s: %s", sfname.data(), Strerror(errno))); + return false; + } + + safe_write(sfd, ext.data(), ext.size()); + safe_write(sfd, "\n", 1); + + auto ppf = info.post_proc_func; + + if ( ppf ) + safe_write(sfd, ppf, strlen(ppf)); + + safe_write(sfd, "\n", 1); + + safe_close(sfd); + } } fd = open(fname.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666); @@ -459,6 +658,20 @@ bool Ascii::DoRotate(const char* rotated_path, double open, double close, bool t return false; } + bool use_shadow = zeek::Supervisor::ThisNode() && Info().rotation_interval > 0; + + if ( use_shadow ) + { + auto sfname = shadow_file_prefix + fname; + + if ( unlink(sfname.data()) != 0 ) + { + Error(Fmt("cannot unlink %s: %s", sfname.data(), Strerror(errno))); + FinishedRotation(); + return false; + } + } + if ( ! FinishedRotation(nname.c_str(), fname.c_str(), open, close, terminating) ) { Error(Fmt("error rotating %s to %s", fname.c_str(), nname.c_str())); @@ -480,6 +693,114 @@ bool Ascii::DoHeartbeat(double network_time, double current_time) return true; } +static std::vector find_leftover_logs() + { + std::vector rval; + std::vector stale_shadow_files; + auto prefix_len = strlen(shadow_file_prefix); + + auto d = opendir("."); + struct dirent* dp; + + while ( (dp = readdir(d)) ) + { + if ( strncmp(dp->d_name, shadow_file_prefix, prefix_len) != 0 ) + continue; + + std::string log_name = dp->d_name + prefix_len; + + if ( is_file(log_name) ) + { + if ( auto ll = parse_shadow_log(log_name) ) + { + if ( ll->error.empty() ) + rval.emplace_back(std::move(*ll)); + else + reporter->Error("failed to process leftover log '%s': %s", + log_name.data(), ll->error.data()); + } + } + else + // There was a log here. It's gone now. + stale_shadow_files.emplace_back(dp->d_name); + } + + for ( const auto& f : stale_shadow_files ) + if ( unlink(f.data()) != 0 ) + reporter->Error("cannot unlink %s: %s", f.data(), strerror(errno)); + + closedir(d); + return rval; + } + +void Ascii::RotateLeftoverLogs() + { + if ( ! zeek::Supervisor::ThisNode() ) + return; + + // Log file crash recovery: if there's still leftover shadow files from the + // ASCII log writer, attempt to rotate their associated log file. Ideally + // may be better if the ASCII writer itself could implement the entire + // crash recovery logic itself without being called from external, but (1) + // this does need to get called from a particular point in the + // initialization process (after zeek_init()) and (2) the nature of writers + // being instantiated lazily means that trying to rotate a leftover log + // only upon seeing that an open() will clobber something means they'll + // possibly not be rotated in a timely manner (e.g. a log files that are + // rarely written to). So the logic below drives the entire leftover log + // crash recovery process for a supervised node upon startup. + auto leftover_logs = find_leftover_logs(); + + for ( const auto& ll : leftover_logs ) + { + if ( ! ll.Rename() ) + reporter->FatalError("Found leftover/unprocessed log '%s', but " + "failed to rotate it: %s", + ll.filename.data(), strerror(errno)); + + if ( ! ll.DeleteShadow() ) + // Unusual failure to report, but not strictly fatal. + reporter->Error("Failed to unlink %s: %s", + ll.shadow_filename.data(), strerror(errno)); + + static auto rt = zeek::id::find_type("Log::RotationInfo"); + static auto writer_type = zeek::id::find_type("Log::Writer"); + static auto writer_idx = writer_type->Lookup("Log", "WRITER_ASCII"); + static auto writer_val = writer_type->GetVal(writer_idx); + + auto info = make_intrusive(rt); + info->Assign(0, writer_val); + info->Assign(1, make_intrusive(ll.RotationPath())); + info->Assign(2, make_intrusive(ll.Name())); + info->Assign(3, make_intrusive(ll.open_time)); + info->Assign(4, make_intrusive(ll.close_time)); + info->Assign(5, val_mgr->False()); + + auto ppf = ll.post_proc_func.empty() ? "Log::__default_rotation_postprocessor" + : ll.post_proc_func.data(); + + auto func = zeek::id::find_func(ppf); + + if ( ! func ) + reporter->Error("Postprocessing log '%s' failed: " + "no such function: '%s'", + ll.filename.data(), ppf); + + + try + { + func->Invoke(std::move(info)); + reporter->Info("Rotated/postprocessed leftover log '%s'", + ll.filename.data()); + } + catch ( InterpreterException& e ) + { + reporter->Info("Postprocess function '%s' failed for leftover log '%s'", + ppf, ll.filename.data()); + } + } + } + string Ascii::LogExt() { const char* ext = zeekenv("ZEEK_LOG_SUFFIX"); diff --git a/src/logging/writers/ascii/Ascii.h b/src/logging/writers/ascii/Ascii.h index c7376fe8fd..ada5463533 100644 --- a/src/logging/writers/ascii/Ascii.h +++ b/src/logging/writers/ascii/Ascii.h @@ -10,6 +10,8 @@ #include "Desc.h" #include "zlib.h" +namespace plugin::Zeek_AsciiWriter { class Plugin; } + namespace logging { namespace writer { class Ascii : public WriterBackend { @@ -35,6 +37,10 @@ protected: bool DoHeartbeat(double network_time, double current_time) override; private: + friend class plugin::Zeek_AsciiWriter::Plugin; + + static void RotateLeftoverLogs(); + bool IsSpecial(const std::string &path) { return path.find("/dev/") == 0; } bool WriteHeader(const std::string& path); bool WriteHeaderField(const std::string& key, const std::string& value); diff --git a/src/logging/writers/ascii/Plugin.cc b/src/logging/writers/ascii/Plugin.cc index ad41deeaf7..4692a9828a 100644 --- a/src/logging/writers/ascii/Plugin.cc +++ b/src/logging/writers/ascii/Plugin.cc @@ -19,7 +19,14 @@ public: config.description = "ASCII log writer"; return config; } +protected: + void InitPostScript() override; + } plugin; +void Plugin::InitPostScript() + { + ::logging::writer::Ascii::RotateLeftoverLogs(); + } } } diff --git a/testing/btest/Baseline/supervisor.config-cluster-leftover-log-archival/test.default.log b/testing/btest/Baseline/supervisor.config-cluster-leftover-log-archival/test.default.log new file mode 100644 index 0000000000..4e5c2e499a --- /dev/null +++ b/testing/btest/Baseline/supervisor.config-cluster-leftover-log-archival/test.default.log @@ -0,0 +1 @@ +{"s":"leftover test"} diff --git a/testing/btest/Baseline/supervisor.config-cluster-leftover-log-archival/test.postproc.log b/testing/btest/Baseline/supervisor.config-cluster-leftover-log-archival/test.postproc.log new file mode 100644 index 0000000000..4e5c2e499a --- /dev/null +++ b/testing/btest/Baseline/supervisor.config-cluster-leftover-log-archival/test.postproc.log @@ -0,0 +1 @@ +{"s":"leftover test"} diff --git a/testing/btest/Baseline/supervisor.config-cluster-leftover-log-archival/zeek.logger-1.postproc.out b/testing/btest/Baseline/supervisor.config-cluster-leftover-log-archival/zeek.logger-1.postproc.out new file mode 100644 index 0000000000..5e243ecf49 --- /dev/null +++ b/testing/btest/Baseline/supervisor.config-cluster-leftover-log-archival/zeek.logger-1.postproc.out @@ -0,0 +1 @@ +running my rotation postprocessor diff --git a/testing/btest/supervisor/config-cluster-leftover-log-archival.zeek b/testing/btest/supervisor/config-cluster-leftover-log-archival.zeek new file mode 100644 index 0000000000..900bb2cd9f --- /dev/null +++ b/testing/btest/supervisor/config-cluster-leftover-log-archival.zeek @@ -0,0 +1,112 @@ +# @TEST-PORT: SUPERVISOR_PORT +# @TEST-PORT: LOGGER_PORT + +# Test default leftover log rotation/archival behavior +# @TEST-EXEC: btest-bg-run zeek zeek -j -b %INPUT +# @TEST-EXEC: btest-bg-wait 45 + +# @TEST-EXEC: cp zeek/logger-1/test*.log test.default.log +# @TEST-EXEC: btest-diff test.default.log +# @TEST-EXEC: rm -rf ./zeek + +# Test leftover log rotation/archival behavior with custom postprocessor func +# @TEST-EXEC: btest-bg-run zeek zeek -j -b %INPUT use_custom_postproc=T +# @TEST-EXEC: btest-bg-wait 45 + +# @TEST-EXEC: cp zeek/logger-1/test*.log test.postproc.log +# @TEST-EXEC: btest-diff test.postproc.log +# @TEST-EXEC: btest-diff zeek/logger-1/postproc.out +# @TEST-EXEC: rm -rf ./zeek + +@load base/frameworks/cluster + +option use_custom_postproc = F; + +# JSON for log file brevity. +redef LogAscii::use_json=T; + +global topic = "test-topic"; + +module Test; +export { + redef enum Log::ID += { LOG }; + + type Log: record { + s: string; + } &log; +} +module GLOBAL; + +module LogAscii; +export { +function my_rotation_postprocessor(info: Log::RotationInfo) : bool + { + local f = open("postproc.out"); + print f, "running my rotation postprocessor"; + close(f); + return LogAscii::default_rotation_postprocessor_func(info); + } +} +module GLOBAL; + +event zeek_init() + { + Log::create_stream(Test::LOG, [$columns=Test::Log]); + + if ( use_custom_postproc ) + { + local df = Log::get_filter(Test::LOG, "default"); + df$postprocessor = LogAscii::my_rotation_postprocessor; + Log::add_filter(Test::LOG, df); + } + + if ( Supervisor::is_supervisor() ) + { + Broker::subscribe(topic); + Broker::listen("127.0.0.1", to_port(getenv("SUPERVISOR_PORT"))); + Broker::peer("127.0.0.1", to_port(getenv("LOGGER_PORT"))); + + local cluster: table[string] of Supervisor::ClusterEndpoint; + cluster["logger-1"] = [$role=Supervisor::LOGGER, $host=127.0.0.1, + $p=to_port(getenv("LOGGER_PORT"))]; + + for ( n, ep in cluster ) + { + local sn = Supervisor::NodeConfig($name = n); + sn$cluster = cluster; + sn$directory = n; + + # Hard to test the full process of a kill/crash leaving these + # leftover files, so just fake them. + mkdir(sn$directory); + local f = open(fmt("%s/test.log", sn$directory)); + print f, "{\"s\":\"leftover test\"}"; + close(f); + local sf = open(fmt("%s/.shadow.test.log", sn$directory)); + print sf, ".log"; + + if ( use_custom_postproc ) + print sf, "LogAscii::my_rotation_postprocessor"; + else + print sf, ""; + + close(sf); + + local res = Supervisor::create(sn); + + if ( res != "" ) + print fmt("failed to create node %s: %s", n, res); + } + } + else + { + Broker::subscribe(topic); + Broker::peer("127.0.0.1", to_port(getenv("SUPERVISOR_PORT"))); + } + } + +event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string) + { + if ( Supervisor::is_supervisor() ) + terminate(); + }