Tweaking writer API for failed rotations.

There are now two FinishedRotation() methods, one that triggers
post-processing and one that doesn't. There's also insurance built in
against a writer not calling either (or both), in which case we abort
with an internal error.
This commit is contained in:
Robin Sommer 2012-07-28 11:55:31 -07:00
parent 4359bf6b42
commit 4ba038070f
12 changed files with 65 additions and 74 deletions

View file

@ -1,4 +1,13 @@
2.0-905 | 2012-07-28 16:24:34 -0700
* Fix log manager hanging on waiting for pending file rotations,
plus writer API tweak for failed rotations. Addresses #860. (Jon
Siwek and Robin Sommer)
* Tweaking logs-to-elasticsearch.bro so that it doesn't do anything
if ES server is unset. (Robin Sommer)
2.0-902 | 2012-07-27 12:42:13 -0700
* New variable in logging framework Log::active_streams to indicate

View file

@ -1 +1 @@
2.0-902
2.0-905

View file

@ -1338,13 +1338,18 @@ void Manager::Rotate(WriterInfo* winfo)
}
bool Manager::FinishedRotation(WriterFrontend* writer, const char* new_name, const char* old_name,
double open, double close, bool terminating)
double open, double close, bool success, bool terminating)
{
assert(writer);
--rotations_pending;
if ( ! writer )
// Writer didn't produce local output.
if ( ! success )
{
DBG_LOG(DBG_LOGGING, "Non-successful rotating writer '%s', file '%s' at %.6f,",
writer->Name(), filename, network_time);
return true;
}
DBG_LOG(DBG_LOGGING, "Finished rotating %s at %.6f, new name %s",
writer->Name(), network_time, new_name);
@ -1387,12 +1392,3 @@ bool Manager::FinishedRotation(WriterFrontend* writer, const char* new_name, con
return result;
}
bool Manager::FailedRotation(WriterFrontend* writer, const char* filename,
double open, double close, bool terminating)
{
--rotations_pending;
DBG_LOG(DBG_LOGGING, "Failed rotating writer '%s', file '%s' at %.6f,",
writer->Name(), filename, network_time);
return true;
}

View file

@ -177,13 +177,7 @@ protected:
// Signals that a file has been rotated.
bool FinishedRotation(WriterFrontend* writer, const char* new_name, const char* old_name,
double open, double close, bool terminating);
// Signals that a file couldn't be rotated, either because the writer
// implementation decided there was nothing to do or because a real error
// occurred. In the error case, a separate message for the reason is sent.
bool FailedRotation(WriterFrontend* writer, const char* filename,
double open, double close, bool terminating);
double open, double close, bool success, bool terminating);
// Deletes the values as passed into Write().
void DeleteVals(int num_fields, threading::Value** vals);

View file

@ -19,10 +19,10 @@ class RotationFinishedMessage : public threading::OutputMessage<WriterFrontend>
{
public:
RotationFinishedMessage(WriterFrontend* writer, const char* new_name, const char* old_name,
double open, double close, bool terminating)
double open, double close, bool success, bool terminating)
: threading::OutputMessage<WriterFrontend>("RotationFinished", writer),
new_name(copy_string(new_name)), old_name(copy_string(old_name)), open(open),
close(close), terminating(terminating) { }
close(close), success(success), terminating(terminating) { }
virtual ~RotationFinishedMessage()
{
@ -32,7 +32,7 @@ public:
virtual bool Process()
{
return log_mgr->FinishedRotation(Object(), new_name, old_name, open, close, terminating);
return log_mgr->FinishedRotation(Object(), new_name, old_name, open, close, success, terminating);
}
private:
@ -40,32 +40,7 @@ private:
const char* old_name;
double open;
double close;
bool terminating;
};
class RotationFailedMessage : public threading::OutputMessage<WriterFrontend>
{
public:
RotationFailedMessage(WriterFrontend* writer, const char* filename,
double open, double close, bool terminating)
: threading::OutputMessage<WriterFrontend>("RotationFailed", writer),
filename(copy_string(filename)), open(open),
close(close), terminating(terminating) { }
virtual ~RotationFailedMessage()
{
delete [] filename;
}
virtual bool Process()
{
return log_mgr->FailedRotation(Object(), filename, open, close, terminating);
}
private:
const char* filename;
double open;
double close;
bool success;
bool terminating;
};
@ -152,6 +127,7 @@ WriterBackend::WriterBackend(WriterFrontend* arg_frontend) : MsgThread()
buffering = true;
frontend = arg_frontend;
info = new WriterInfo(frontend->Info());
rotation_counter = 0;
SetName(frontend->Name());
}
@ -186,14 +162,15 @@ void WriterBackend::DeleteVals(int num_writes, Value*** vals)
bool WriterBackend::FinishedRotation(const char* new_name, const char* old_name,
double open, double close, bool terminating)
{
SendOut(new RotationFinishedMessage(frontend, new_name, old_name, open, close, terminating));
--rotation_counter;
SendOut(new RotationFinishedMessage(frontend, new_name, old_name, open, close, true, terminating));
return true;
}
bool WriterBackend::FailedRotation(const char* filename, double open,
double close, bool terminating)
bool WriterBackend::FinishedRotation()
{
SendOut(new RotationFailedMessage(frontend, filename, open, close, terminating));
--rotation_counter;
SendOut(new RotationFinishedMessage(frontend, 0, 0, 0, 0, false, false));
return true;
}
@ -303,12 +280,21 @@ bool WriterBackend::Rotate(const char* rotated_path, double open,
if ( Failed() )
return true;
rotation_counter = 1;
if ( ! DoRotate(rotated_path, open, close, terminating) )
{
DisableFrontend();
return false;
}
// Insurance against broken writers.
if ( rotation_counter > 0 )
InternalError(Fmt("writer %s did not call FinishedRotation() in DoRotation()", Name()));
if ( rotation_counter < 0 )
InternalError(Fmt("writer %s called FinishedRotation() more than once in DoRotation()", Name()));
return true;
}

View file

@ -210,11 +210,15 @@ public:
bool IsBuf() { return buffering; }
/**
* Signals that a file has been rotated. This must be called by a
* writer's implementation of DoRotate() once rotation has finished.
* Signals that a file has been successfully rotated and any
* potential post-processor can now run.
*
* Most of the parameters should be passed through from DoRotate().
*
* Note: Exactly one of the two FinishedRotation() methods must be
* called by a writer's implementation of DoRotate() once rotation
* has finished.
*
* @param new_name The filename of the rotated file.
*
* @param old_name The filename of the original file.
@ -230,13 +234,18 @@ public:
double open, double close, bool terminating);
/**
* Signals that a file couldn't be rotated. This must be called by a
* writer's implementation of DoRotate() in all cases where
* FinishedRotation() was not called or failed.
* Signals that a file rotation request has been processed, but no
* further post-processing needs to be performed (either because
* there was an error, or there was nothing to rotate to begin with
* with this writer).
*
* Most of the parameters should be passed through from DoRotate().
* Note: Exactly one of the two FinishedRotation() methods must be
* called by a writer's implementation of DoRotate() once rotation
* has finished.
*
* @param filename The name of the file that was attempted to be rotated.
* @param new_name The filename of the rotated file.
*
* @param old_name The filename of the original file.
*
* @param open: The timestamp when the original file was opened.
*
@ -245,8 +254,7 @@ public:
* @param terminating: True if the original rotation request occured
* due to the main Bro process shutting down.
*/
bool FailedRotation(const char* filename, double open, double close,
bool terminating);
bool FinishedRotation();
/** Helper method to render an IP address as a string.
*
@ -344,8 +352,8 @@ protected:
* Writer-specific method implementing log rotation. Most directly
* this only applies to writers writing into files, which should then
* close the current file and open a new one. However, a writer may
* also trigger other apppropiate actions if semantics are similar. *
* Once rotation has finished, the implementation must call
* also trigger other apppropiate actions if semantics are similar.
* Once rotation has finished, the implementation *must* call
* FinishedRotation() to signal the log manager that potential
* postprocessors can now run.
*
@ -407,6 +415,8 @@ private:
int num_fields; // Number of log fields.
const threading::Field* const* fields; // Log fields.
bool buffering; // True if buffering is enabled.
int rotation_counter; // Tracks FinishedRotation() calls.
};

View file

@ -248,9 +248,8 @@ void WriterFrontend::Rotate(const char* rotated_path, double open, double close,
if ( backend )
backend->SendIn(new RotateMessage(backend, this, rotated_path, open, close, terminating));
else
// Still signal log manager that we're done, but signal that
// nothing happened by setting the writer to zeri.
log_mgr->FinishedRotation(0, "", rotated_path, open, close, terminating);
// Still signal log manager that we're done.
log_mgr->FinishedRotation(this, 0, 0, 0, 0, false, terminating);
}
void WriterFrontend::DeleteVals(Value** vals)

View file

@ -374,7 +374,7 @@ bool Ascii::DoRotate(const char* rotated_path, double open, double close, bool t
// Don't rotate special files or if there's not one currently open.
if ( ! fd || IsSpecial(Info().path) )
{
FailedRotation(rotated_path, open, close, terminating);
FinishedRotation();
return true;
}
@ -385,7 +385,6 @@ bool Ascii::DoRotate(const char* rotated_path, double open, double close, bool t
if ( ! FinishedRotation(nname.c_str(), fname.c_str(), open, close, terminating) )
{
FailedRotation(rotated_path, open, close, terminating);
Error(Fmt("error rotating %s to %s", fname.c_str(), nname.c_str()));
return false;
}

View file

@ -407,7 +407,6 @@ bool DataSeries::DoRotate(const char* rotated_path, double open, double close, b
if ( ! FinishedRotation(nname.c_str(), dsname.c_str(), open, close, terminating) )
{
FailedRotation(rotated_path, open, close, terminating);
Error(Fmt("error rotating %s to %s", dsname.c_str(), nname.c_str()));
return false;
}

View file

@ -322,10 +322,7 @@ bool ElasticSearch::DoRotate(const char* rotated_path, double open, double close
}
if ( ! FinishedRotation(current_index.c_str(), prev_index.c_str(), open, close, terminating) )
{
FailedRotation(rotated_path, open, close, terminating);
Error(Fmt("error rotating %s to %s", prev_index.c_str(), current_index.c_str()));
}
return true;
}

View file

@ -46,7 +46,6 @@ bool None::DoRotate(const char* rotated_path, double open, double close, bool te
{
if ( ! FinishedRotation("/dev/null", Info().path, open, close, terminating))
{
FailedRotation(rotated_path, open, close, terminating);
Error(Fmt("error rotating %s", Info().path));
return false;
}

View file

@ -113,6 +113,9 @@ std::string get_escaped_string(const std::string& str, bool escape_all)
char* copy_string(const char* s)
{
if ( ! s )
return 0;
char* c = new char[strlen(s)+1];
strcpy(c, s);
return c;