Implement leftover log rotation/archival for supervised nodes

This helps prevent a node from being killed/crashing in the middle
of writing a log, restarting, and eventually clobbering that log
file that never underwent the rotation/archival process.

The old `archive-log` and `post-terminate` scripts as used by
ZeekControl previously implemented this behavior, but the new logic is
entirely in the ASCII writer.  It uses ".shadow" log files stored
alongside the real log to help detect such scenarios and rotate them
correctly upon the next startup of the Zeek process.
This commit is contained in:
Jon Siwek 2020-06-24 14:28:54 -07:00
parent a46e24091a
commit 11949ce37a
11 changed files with 523 additions and 17 deletions

View file

@ -1170,6 +1170,11 @@ WriterFrontend* Manager::CreateWriter(zeek::EnumVal* id, zeek::EnumVal* writer,
winfo->interval = f->interval; winfo->interval = f->interval;
winfo->postprocessor = f->postprocessor; winfo->postprocessor = f->postprocessor;
if ( f->postprocessor )
{
delete [] winfo->info->post_proc_func;
winfo->info->post_proc_func = copy_string(f->postprocessor->Name());
}
break; break;
} }
@ -1180,6 +1185,18 @@ WriterFrontend* Manager::CreateWriter(zeek::EnumVal* id, zeek::EnumVal* writer,
const auto& id = zeek::detail::global_scope()->Find("Log::default_rotation_interval"); const auto& id = zeek::detail::global_scope()->Find("Log::default_rotation_interval");
assert(id); assert(id);
winfo->interval = id->GetVal()->AsInterval(); winfo->interval = id->GetVal()->AsInterval();
if ( winfo->info->post_proc_func &&
strlen(winfo->info->post_proc_func) )
{
auto func = zeek::id::find_func(winfo->info->post_proc_func);
if ( func )
winfo->postprocessor = func.get();
else
reporter->Warning("failed log postprocessor function lookup: %s\n",
winfo->info->post_proc_func);
}
} }
stream->writers.insert( stream->writers.insert(
@ -1466,23 +1483,32 @@ void Manager::InstallRotationTimer(WriterInfo* winfo)
} }
} }
std::string Manager::FormatRotationTime(time_t t)
{
struct tm tm;
char buf[128];
const char* const date_fmt = "%y-%m-%d_%H.%M.%S";
localtime_r(&t, &tm);
strftime(buf, sizeof(buf), date_fmt, &tm);
return buf;
}
std::string Manager::FormatRotationPath(std::string_view path, time_t t)
{
auto rot_str = FormatRotationTime(t);
return fmt("%.*s-%s",
static_cast<int>(path.size()), path.data(), rot_str.data());
}
void Manager::Rotate(WriterInfo* winfo) void Manager::Rotate(WriterInfo* winfo)
{ {
DBG_LOG(DBG_LOGGING, "Rotating %s at %.6f", DBG_LOG(DBG_LOGGING, "Rotating %s at %.6f",
winfo->writer->Name(), network_time); winfo->writer->Name(), network_time);
// Build a temporary path for the writer to move the file to. // Build a temporary path for the writer to move the file to.
struct tm tm; auto tmp = FormatRotationPath(winfo->writer->Info().path,
char buf[128]; (time_t)winfo->open_time);
const char* const date_fmt = "%y-%m-%d_%H.%M.%S"; winfo->writer->Rotate(tmp.data(), winfo->open_time, network_time, terminating);
time_t teatime = (time_t)winfo->open_time;
localtime_r(&teatime, &tm);
strftime(buf, sizeof(buf), date_fmt, &tm);
// Trigger the rotation.
const char* tmp = fmt("%s-%s", winfo->writer->Info().path, buf);
winfo->writer->Rotate(tmp, winfo->open_time, network_time, terminating);
++rotations_pending; ++rotations_pending;
} }

View file

@ -4,6 +4,9 @@
#pragma once #pragma once
#include <ctime>
#include <string_view>
#include "../Val.h" #include "../Val.h"
#include "../Tag.h" #include "../Tag.h"
#include "../EventHandler.h" #include "../EventHandler.h"
@ -26,6 +29,19 @@ class RotationFinishedMessage;
*/ */
class Manager : public plugin::ComponentManager<Tag, Component> { class Manager : public plugin::ComponentManager<Tag, Component> {
public: public:
/**
* Returns a formatted string representing the given time. This
* string is used in the log file rotation process.
*/
static std::string FormatRotationTime(time_t t);
/**
* Returns a formatted string representing the file rotation path. This
* string is used in the log file rotation process.
*/
static std::string FormatRotationPath(std::string_view path, time_t t);
/** /**
* Constructor. * Constructor.
*/ */

View file

@ -80,7 +80,9 @@ broker::data WriterBackend::WriterInfo::ToBroker() const
t.insert(std::make_pair(key, value)); t.insert(std::make_pair(key, value));
} }
return broker::vector({path, rotation_base, rotation_interval, network_time, std::move(t)}); auto bppf = post_proc_func ? post_proc_func : "";
return broker::vector({path, rotation_base, rotation_interval, network_time, std::move(t), bppf});
} }
bool WriterBackend::WriterInfo::FromBroker(broker::data d) bool WriterBackend::WriterInfo::FromBroker(broker::data d)
@ -94,11 +96,13 @@ bool WriterBackend::WriterInfo::FromBroker(broker::data d)
auto brotation_interval = caf::get_if<double>(&v[2]); auto brotation_interval = caf::get_if<double>(&v[2]);
auto bnetwork_time = caf::get_if<double>(&v[3]); auto bnetwork_time = caf::get_if<double>(&v[3]);
auto bconfig = caf::get_if<broker::table>(&v[4]); auto bconfig = caf::get_if<broker::table>(&v[4]);
auto bppf = caf::get_if<std::string>(&v[5]);
if ( ! (bpath && brotation_base && brotation_interval && bnetwork_time && bconfig) ) if ( ! (bpath && brotation_base && brotation_interval && bnetwork_time && bconfig && bppf) )
return false; return false;
path = copy_string(bpath->c_str()); path = copy_string(bpath->c_str());
post_proc_func = copy_string(bppf->c_str());
rotation_base = *brotation_base; rotation_base = *brotation_base;
rotation_interval = *brotation_interval; rotation_interval = *brotation_interval;
network_time = *bnetwork_time; network_time = *bnetwork_time;

View file

@ -61,6 +61,15 @@ public:
*/ */
const char* path; const char* path;
/**
* The name of the postprocessor function that will be called
* upon the logging manager processing the "rotation finished"
* message. A null or empty value means "use the default function".
*
* Structure takes ownership of string.
*/
const char* post_proc_func = nullptr;
/** /**
* The rotation interval as configured for this writer. * The rotation interval as configured for this writer.
*/ */
@ -90,6 +99,7 @@ public:
WriterInfo(const WriterInfo& other) WriterInfo(const WriterInfo& other)
{ {
path = other.path ? copy_string(other.path) : nullptr; path = other.path ? copy_string(other.path) : nullptr;
post_proc_func = other.post_proc_func ? copy_string(other.post_proc_func) : nullptr;
rotation_interval = other.rotation_interval; rotation_interval = other.rotation_interval;
rotation_base = other.rotation_base; rotation_base = other.rotation_base;
network_time = other.network_time; network_time = other.network_time;
@ -101,6 +111,7 @@ public:
~WriterInfo() ~WriterInfo()
{ {
delete [] path; delete [] path;
delete [] post_proc_func;
for ( config_map::iterator i = config.begin(); i != config.end(); i++ ) for ( config_map::iterator i = config.begin(); i != config.end(); i++ )
{ {

View file

@ -1,10 +1,22 @@
// See the file "COPYING" in the main distribution directory for copyright. // See the file "COPYING" in the main distribution directory for copyright.
#include <ctime>
#include <cstdio>
#include <string> #include <string>
#include <vector>
#include <memory>
#include <optional>
#include <errno.h> #include <errno.h>
#include <fcntl.h> #include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h> #include <unistd.h>
#include <dirent.h>
#include "Func.h"
#include "supervisor/Supervisor.h"
#include "logging/Manager.h"
#include "threading/SerialTypes.h" #include "threading/SerialTypes.h"
#include "Ascii.h" #include "Ascii.h"
@ -16,6 +28,160 @@ using namespace threading;
using threading::Value; using threading::Value;
using threading::Field; using threading::Field;
static constexpr auto shadow_file_prefix = ".shadow.";
/**
* Information about an leftover log file: that is, one that a previous
* process was in the middle of writing, but never completed a rotation
* for whatever reason (prematurely crashed/killed).
*/
struct LeftoverLog {
/*
* Name of leftover log, relative to working dir.
*/
std::string filename;
/*
* File extension of the leftover log (e.g. ".log").
*/
std::string extension;
/*
* Name of shadow file associated with the log.
* The shadow file's existence is what indicates the presence of
* an "leftover log" and may contain the name of a postprocessing
* function that's supposed to be called after rotating (only
* named if that function differs from the default). Upon
* completing a rotation, the shadow file can be deleted.
*/
std::string shadow_filename;
/**
* Name of a function to call to postprocess the log file after
* rotating.
*/
std::string post_proc_func;
/**
* The time at which the shadow file was created. This is used
* as the log file's "opening time" for rotation purposes.
*/
time_t open_time;
/**
* Time of the log file's last modification. This is used
* as the log file's "closing time" for rotation purposes.
*/
time_t close_time;
/**
* Set the an error message explaining any error that happened while
* trying to parse the shadow file and construct an object.
*/
std::string error;
/**
* Return the name of the log without file extension (this is
* what's called "path" in other logging framework parlance).
* E.g. the name of "conn.log" is just "conn".
*/
std::string Name() const
{ return filename.substr(0, filename.size() - extension.size()); }
/**
* The path to which the log file should be rotated (before
* calling any postprocessing function).
*/
std::string RotationPath() const
{ return log_mgr->FormatRotationPath(Name(), open_time) + extension; }
/**
* Performs the rename() call to rotate the file and returns whether
* it succeeded.
*/
bool Rename() const
{ return rename(filename.data(), RotationPath().data()) == 0; }
/**
* Deletes the shadow file and returns whether it succeeded.
*/
bool DeleteShadow() const
{ return unlink(shadow_filename.data()) == 0; }
};
static std::optional<LeftoverLog> parse_shadow_log(const std::string& fname)
{
char errbuf[512];
auto sfname = shadow_file_prefix + fname;
LeftoverLog rval;
rval.filename = fname;
rval.shadow_filename = std::move(sfname);
auto sf_stream = fopen(rval.shadow_filename.data(), "r");
if ( ! sf_stream )
{
bro_strerror_r(errno, errbuf, sizeof(errbuf));
rval.error = "Failed to open " + rval.shadow_filename + ": " + errbuf;
return rval;
}
fseek(sf_stream, 0, SEEK_END);
auto sf_len = ftell(sf_stream);
fseek(sf_stream, 0, SEEK_SET);
auto sf_content = std::make_unique<char[]>(sf_len);
auto bytes_read = fread(sf_content.get(), 1, sf_len, sf_stream);
if ( bytes_read != static_cast<size_t>(sf_len) )
{
rval.error = "Failed to read contents of " + rval.shadow_filename;
return rval;
}
fclose(sf_stream);
std::string_view sf_view(sf_content.get(), sf_len);
auto sf_lines = tokenize_string(sf_view, '\n');
if ( sf_lines.size() < 2 )
{
snprintf(errbuf, sizeof(errbuf),
"Found leftover log, '%s', but the associated shadow file, "
"'%s', required to process it is invalid",
rval.filename.data(), rval.shadow_filename.data());
rval.error = errbuf;
return rval;
}
rval.extension = sf_lines[0];
rval.post_proc_func = sf_lines[1];
struct stat st;
// Use shadow file's modification time as creation time.
if ( stat(rval.shadow_filename.data(), &st) != 0 )
{
bro_strerror_r(errno, errbuf, sizeof(errbuf));
rval.error = "Failed to stat " + rval.shadow_filename + ": " + errbuf;
return rval;
}
rval.open_time = st.st_ctime;
// Use log file's modification time for closing time.
if ( stat(rval.filename.data(), &st) != 0 )
{
bro_strerror_r(errno, errbuf, sizeof(errbuf));
rval.error = "Failed to stat " + rval.filename + ": " + errbuf;
return rval;
}
rval.close_time = st.st_mtime;
return rval;
}
Ascii::Ascii(WriterFrontend* frontend) : WriterBackend(frontend) Ascii::Ascii(WriterFrontend* frontend) : WriterBackend(frontend)
{ {
fd = 0; fd = 0;
@ -261,12 +427,45 @@ bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const *
if ( output_to_stdout ) if ( output_to_stdout )
path = "/dev/stdout"; path = "/dev/stdout";
fname = IsSpecial(path) ? path : path + "." + LogExt(); fname = path;
if ( gzip_level > 0 ) if ( ! IsSpecial(fname) )
{ {
fname += "."; std::string ext = "." + LogExt();
fname += gzip_file_extension.empty() ? "gz" : gzip_file_extension;
if ( gzip_level > 0 )
{
ext += ".";
ext += gzip_file_extension.empty() ? "gz" : gzip_file_extension;
}
fname += ext;
bool use_shadow = zeek::Supervisor::ThisNode() && info.rotation_interval > 0;
if ( use_shadow )
{
auto sfname = shadow_file_prefix + fname;
auto sfd = open(sfname.data(), O_WRONLY | O_CREAT | O_TRUNC, 0666);
if ( sfd < 0 )
{
Error(Fmt("cannot open %s: %s", sfname.data(), Strerror(errno)));
return false;
}
safe_write(sfd, ext.data(), ext.size());
safe_write(sfd, "\n", 1);
auto ppf = info.post_proc_func;
if ( ppf )
safe_write(sfd, ppf, strlen(ppf));
safe_write(sfd, "\n", 1);
safe_close(sfd);
}
} }
fd = open(fname.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666); fd = open(fname.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666);
@ -459,6 +658,20 @@ bool Ascii::DoRotate(const char* rotated_path, double open, double close, bool t
return false; return false;
} }
bool use_shadow = zeek::Supervisor::ThisNode() && Info().rotation_interval > 0;
if ( use_shadow )
{
auto sfname = shadow_file_prefix + fname;
if ( unlink(sfname.data()) != 0 )
{
Error(Fmt("cannot unlink %s: %s", sfname.data(), Strerror(errno)));
FinishedRotation();
return false;
}
}
if ( ! FinishedRotation(nname.c_str(), fname.c_str(), open, close, terminating) ) if ( ! FinishedRotation(nname.c_str(), fname.c_str(), open, close, terminating) )
{ {
Error(Fmt("error rotating %s to %s", fname.c_str(), nname.c_str())); Error(Fmt("error rotating %s to %s", fname.c_str(), nname.c_str()));
@ -480,6 +693,114 @@ bool Ascii::DoHeartbeat(double network_time, double current_time)
return true; return true;
} }
static std::vector<LeftoverLog> find_leftover_logs()
{
std::vector<LeftoverLog> rval;
std::vector<std::string> stale_shadow_files;
auto prefix_len = strlen(shadow_file_prefix);
auto d = opendir(".");
struct dirent* dp;
while ( (dp = readdir(d)) )
{
if ( strncmp(dp->d_name, shadow_file_prefix, prefix_len) != 0 )
continue;
std::string log_name = dp->d_name + prefix_len;
if ( is_file(log_name) )
{
if ( auto ll = parse_shadow_log(log_name) )
{
if ( ll->error.empty() )
rval.emplace_back(std::move(*ll));
else
reporter->Error("failed to process leftover log '%s': %s",
log_name.data(), ll->error.data());
}
}
else
// There was a log here. It's gone now.
stale_shadow_files.emplace_back(dp->d_name);
}
for ( const auto& f : stale_shadow_files )
if ( unlink(f.data()) != 0 )
reporter->Error("cannot unlink %s: %s", f.data(), strerror(errno));
closedir(d);
return rval;
}
void Ascii::RotateLeftoverLogs()
{
if ( ! zeek::Supervisor::ThisNode() )
return;
// Log file crash recovery: if there's still leftover shadow files from the
// ASCII log writer, attempt to rotate their associated log file. Ideally
// may be better if the ASCII writer itself could implement the entire
// crash recovery logic itself without being called from external, but (1)
// this does need to get called from a particular point in the
// initialization process (after zeek_init()) and (2) the nature of writers
// being instantiated lazily means that trying to rotate a leftover log
// only upon seeing that an open() will clobber something means they'll
// possibly not be rotated in a timely manner (e.g. a log files that are
// rarely written to). So the logic below drives the entire leftover log
// crash recovery process for a supervised node upon startup.
auto leftover_logs = find_leftover_logs();
for ( const auto& ll : leftover_logs )
{
if ( ! ll.Rename() )
reporter->FatalError("Found leftover/unprocessed log '%s', but "
"failed to rotate it: %s",
ll.filename.data(), strerror(errno));
if ( ! ll.DeleteShadow() )
// Unusual failure to report, but not strictly fatal.
reporter->Error("Failed to unlink %s: %s",
ll.shadow_filename.data(), strerror(errno));
static auto rt = zeek::id::find_type<zeek::RecordType>("Log::RotationInfo");
static auto writer_type = zeek::id::find_type<zeek::EnumType>("Log::Writer");
static auto writer_idx = writer_type->Lookup("Log", "WRITER_ASCII");
static auto writer_val = writer_type->GetVal(writer_idx);
auto info = make_intrusive<RecordVal>(rt);
info->Assign(0, writer_val);
info->Assign(1, make_intrusive<StringVal>(ll.RotationPath()));
info->Assign(2, make_intrusive<StringVal>(ll.Name()));
info->Assign(3, make_intrusive<TimeVal>(ll.open_time));
info->Assign(4, make_intrusive<TimeVal>(ll.close_time));
info->Assign(5, val_mgr->False());
auto ppf = ll.post_proc_func.empty() ? "Log::__default_rotation_postprocessor"
: ll.post_proc_func.data();
auto func = zeek::id::find_func(ppf);
if ( ! func )
reporter->Error("Postprocessing log '%s' failed: "
"no such function: '%s'",
ll.filename.data(), ppf);
try
{
func->Invoke(std::move(info));
reporter->Info("Rotated/postprocessed leftover log '%s'",
ll.filename.data());
}
catch ( InterpreterException& e )
{
reporter->Info("Postprocess function '%s' failed for leftover log '%s'",
ppf, ll.filename.data());
}
}
}
string Ascii::LogExt() string Ascii::LogExt()
{ {
const char* ext = zeekenv("ZEEK_LOG_SUFFIX"); const char* ext = zeekenv("ZEEK_LOG_SUFFIX");

View file

@ -10,6 +10,8 @@
#include "Desc.h" #include "Desc.h"
#include "zlib.h" #include "zlib.h"
namespace plugin::Zeek_AsciiWriter { class Plugin; }
namespace logging { namespace writer { namespace logging { namespace writer {
class Ascii : public WriterBackend { class Ascii : public WriterBackend {
@ -35,6 +37,10 @@ protected:
bool DoHeartbeat(double network_time, double current_time) override; bool DoHeartbeat(double network_time, double current_time) override;
private: private:
friend class plugin::Zeek_AsciiWriter::Plugin;
static void RotateLeftoverLogs();
bool IsSpecial(const std::string &path) { return path.find("/dev/") == 0; } bool IsSpecial(const std::string &path) { return path.find("/dev/") == 0; }
bool WriteHeader(const std::string& path); bool WriteHeader(const std::string& path);
bool WriteHeaderField(const std::string& key, const std::string& value); bool WriteHeaderField(const std::string& key, const std::string& value);

View file

@ -19,7 +19,14 @@ public:
config.description = "ASCII log writer"; config.description = "ASCII log writer";
return config; return config;
} }
protected:
void InitPostScript() override;
} plugin; } plugin;
void Plugin::InitPostScript()
{
::logging::writer::Ascii::RotateLeftoverLogs();
}
} }
} }

View file

@ -0,0 +1 @@
{"s":"leftover test"}

View file

@ -0,0 +1 @@
{"s":"leftover test"}

View file

@ -0,0 +1 @@
running my rotation postprocessor

View file

@ -0,0 +1,112 @@
# @TEST-PORT: SUPERVISOR_PORT
# @TEST-PORT: LOGGER_PORT
# Test default leftover log rotation/archival behavior
# @TEST-EXEC: btest-bg-run zeek zeek -j -b %INPUT
# @TEST-EXEC: btest-bg-wait 45
# @TEST-EXEC: cp zeek/logger-1/test*.log test.default.log
# @TEST-EXEC: btest-diff test.default.log
# @TEST-EXEC: rm -rf ./zeek
# Test leftover log rotation/archival behavior with custom postprocessor func
# @TEST-EXEC: btest-bg-run zeek zeek -j -b %INPUT use_custom_postproc=T
# @TEST-EXEC: btest-bg-wait 45
# @TEST-EXEC: cp zeek/logger-1/test*.log test.postproc.log
# @TEST-EXEC: btest-diff test.postproc.log
# @TEST-EXEC: btest-diff zeek/logger-1/postproc.out
# @TEST-EXEC: rm -rf ./zeek
@load base/frameworks/cluster
option use_custom_postproc = F;
# JSON for log file brevity.
redef LogAscii::use_json=T;
global topic = "test-topic";
module Test;
export {
redef enum Log::ID += { LOG };
type Log: record {
s: string;
} &log;
}
module GLOBAL;
module LogAscii;
export {
function my_rotation_postprocessor(info: Log::RotationInfo) : bool
{
local f = open("postproc.out");
print f, "running my rotation postprocessor";
close(f);
return LogAscii::default_rotation_postprocessor_func(info);
}
}
module GLOBAL;
event zeek_init()
{
Log::create_stream(Test::LOG, [$columns=Test::Log]);
if ( use_custom_postproc )
{
local df = Log::get_filter(Test::LOG, "default");
df$postprocessor = LogAscii::my_rotation_postprocessor;
Log::add_filter(Test::LOG, df);
}
if ( Supervisor::is_supervisor() )
{
Broker::subscribe(topic);
Broker::listen("127.0.0.1", to_port(getenv("SUPERVISOR_PORT")));
Broker::peer("127.0.0.1", to_port(getenv("LOGGER_PORT")));
local cluster: table[string] of Supervisor::ClusterEndpoint;
cluster["logger-1"] = [$role=Supervisor::LOGGER, $host=127.0.0.1,
$p=to_port(getenv("LOGGER_PORT"))];
for ( n, ep in cluster )
{
local sn = Supervisor::NodeConfig($name = n);
sn$cluster = cluster;
sn$directory = n;
# Hard to test the full process of a kill/crash leaving these
# leftover files, so just fake them.
mkdir(sn$directory);
local f = open(fmt("%s/test.log", sn$directory));
print f, "{\"s\":\"leftover test\"}";
close(f);
local sf = open(fmt("%s/.shadow.test.log", sn$directory));
print sf, ".log";
if ( use_custom_postproc )
print sf, "LogAscii::my_rotation_postprocessor";
else
print sf, "";
close(sf);
local res = Supervisor::create(sn);
if ( res != "" )
print fmt("failed to create node %s: %s", n, res);
}
}
else
{
Broker::subscribe(topic);
Broker::peer("127.0.0.1", to_port(getenv("SUPERVISOR_PORT")));
}
}
event Broker::peer_added(endpoint: Broker::EndpointInfo, msg: string)
{
if ( Supervisor::is_supervisor() )
terminate();
}