Fix log manager hanging on waiting for pending file rotations.

This changes writer implementations to always respond to rotation
messages in their DoRotate() method, even for failure/no-op cases
with a new RotationFailedMessage.  This informs the manager to
decrement its count of pending rotations.

Addresses #860.
This commit is contained in:
Jon Siwek 2012-07-27 13:31:17 -05:00 committed by Robin Sommer
parent 4bdac985cb
commit 4359bf6b42
8 changed files with 79 additions and 1 deletions

View file

@ -1215,12 +1215,16 @@ bool Manager::Flush(EnumVal* id)
void Manager::Terminate() void Manager::Terminate()
{ {
// Make sure we process all the pending rotations. // Make sure we process all the pending rotations.
while ( rotations_pending )
while ( rotations_pending > 0 )
{ {
thread_mgr->ForceProcessing(); // A blatant layering violation ... thread_mgr->ForceProcessing(); // A blatant layering violation ...
usleep(1000); usleep(1000);
} }
if ( rotations_pending < 0 )
reporter->InternalError("Negative pending log rotations: %d", rotations_pending);
for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s ) for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s )
{ {
if ( ! *s ) if ( ! *s )
@ -1384,3 +1388,11 @@ bool Manager::FinishedRotation(WriterFrontend* writer, const char* new_name, con
return result; return result;
} }
bool Manager::FailedRotation(WriterFrontend* writer, const char* filename,
double open, double close, bool terminating)
{
--rotations_pending;
DBG_LOG(DBG_LOGGING, "Failed rotating writer '%s', file '%s' at %.6f,",
writer->Name(), filename, network_time);
return true;
}

View file

@ -153,6 +153,7 @@ public:
protected: protected:
friend class WriterFrontend; friend class WriterFrontend;
friend class RotationFinishedMessage; friend class RotationFinishedMessage;
friend class RotationFailedMessage;
friend class ::RemoteSerializer; friend class ::RemoteSerializer;
friend class ::RotationTimer; friend class ::RotationTimer;
@ -178,6 +179,12 @@ protected:
bool FinishedRotation(WriterFrontend* writer, const char* new_name, const char* old_name, bool FinishedRotation(WriterFrontend* writer, const char* new_name, const char* old_name,
double open, double close, bool terminating); double open, double close, bool terminating);
// Signals that a file couldn't be rotated, either because the writer
// implementation decided there was nothing to do or because a real error
// occurred. In the error case, a separate message for the reason is sent.
bool FailedRotation(WriterFrontend* writer, const char* filename,
double open, double close, bool terminating);
// Deletes the values as passed into Write(). // Deletes the values as passed into Write().
void DeleteVals(int num_fields, threading::Value** vals); void DeleteVals(int num_fields, threading::Value** vals);

View file

@ -43,6 +43,32 @@ private:
bool terminating; bool terminating;
}; };
class RotationFailedMessage : public threading::OutputMessage<WriterFrontend>
{
public:
RotationFailedMessage(WriterFrontend* writer, const char* filename,
double open, double close, bool terminating)
: threading::OutputMessage<WriterFrontend>("RotationFailed", writer),
filename(copy_string(filename)), open(open),
close(close), terminating(terminating) { }
virtual ~RotationFailedMessage()
{
delete [] filename;
}
virtual bool Process()
{
return log_mgr->FailedRotation(Object(), filename, open, close, terminating);
}
private:
const char* filename;
double open;
double close;
bool terminating;
};
class FlushWriteBufferMessage : public threading::OutputMessage<WriterFrontend> class FlushWriteBufferMessage : public threading::OutputMessage<WriterFrontend>
{ {
public: public:
@ -164,6 +190,13 @@ bool WriterBackend::FinishedRotation(const char* new_name, const char* old_name,
return true; return true;
} }
bool WriterBackend::FailedRotation(const char* filename, double open,
double close, bool terminating)
{
SendOut(new RotationFailedMessage(frontend, filename, open, close, terminating));
return true;
}
void WriterBackend::DisableFrontend() void WriterBackend::DisableFrontend()
{ {
SendOut(new DisableMessage(frontend)); SendOut(new DisableMessage(frontend));

View file

@ -229,6 +229,25 @@ public:
bool FinishedRotation(const char* new_name, const char* old_name, bool FinishedRotation(const char* new_name, const char* old_name,
double open, double close, bool terminating); double open, double close, bool terminating);
/**
* Signals that a file couldn't be rotated. This must be called by a
* writer's implementation of DoRotate() in all cases where
* FinishedRotation() was not called or failed.
*
* Most of the parameters should be passed through from DoRotate().
*
* @param filename The name of the file that was attempted to be rotated.
*
* @param open: The timestamp when the original file was opened.
*
* @param close: The timestamp when the origina file was closed.
*
* @param terminating: True if the original rotation request occured
* due to the main Bro process shutting down.
*/
bool FailedRotation(const char* filename, double open, double close,
bool terminating);
/** Helper method to render an IP address as a string. /** Helper method to render an IP address as a string.
* *
* @param addr The address. * @param addr The address.

View file

@ -373,7 +373,10 @@ bool Ascii::DoRotate(const char* rotated_path, double open, double close, bool t
{ {
// Don't rotate special files or if there's not one currently open. // Don't rotate special files or if there's not one currently open.
if ( ! fd || IsSpecial(Info().path) ) if ( ! fd || IsSpecial(Info().path) )
{
FailedRotation(rotated_path, open, close, terminating);
return true; return true;
}
CloseFile(close); CloseFile(close);
@ -382,6 +385,7 @@ bool Ascii::DoRotate(const char* rotated_path, double open, double close, bool t
if ( ! FinishedRotation(nname.c_str(), fname.c_str(), open, close, terminating) ) if ( ! FinishedRotation(nname.c_str(), fname.c_str(), open, close, terminating) )
{ {
FailedRotation(rotated_path, open, close, terminating);
Error(Fmt("error rotating %s to %s", fname.c_str(), nname.c_str())); Error(Fmt("error rotating %s to %s", fname.c_str(), nname.c_str()));
return false; return false;
} }

View file

@ -407,6 +407,7 @@ bool DataSeries::DoRotate(const char* rotated_path, double open, double close, b
if ( ! FinishedRotation(nname.c_str(), dsname.c_str(), open, close, terminating) ) if ( ! FinishedRotation(nname.c_str(), dsname.c_str(), open, close, terminating) )
{ {
FailedRotation(rotated_path, open, close, terminating);
Error(Fmt("error rotating %s to %s", dsname.c_str(), nname.c_str())); Error(Fmt("error rotating %s to %s", dsname.c_str(), nname.c_str()));
return false; return false;
} }

View file

@ -323,6 +323,7 @@ bool ElasticSearch::DoRotate(const char* rotated_path, double open, double close
if ( ! FinishedRotation(current_index.c_str(), prev_index.c_str(), open, close, terminating) ) if ( ! FinishedRotation(current_index.c_str(), prev_index.c_str(), open, close, terminating) )
{ {
FailedRotation(rotated_path, open, close, terminating);
Error(Fmt("error rotating %s to %s", prev_index.c_str(), current_index.c_str())); Error(Fmt("error rotating %s to %s", prev_index.c_str(), current_index.c_str()));
} }

View file

@ -46,6 +46,7 @@ bool None::DoRotate(const char* rotated_path, double open, double close, bool te
{ {
if ( ! FinishedRotation("/dev/null", Info().path, open, close, terminating)) if ( ! FinishedRotation("/dev/null", Info().path, open, close, terminating))
{ {
FailedRotation(rotated_path, open, close, terminating);
Error(Fmt("error rotating %s", Info().path)); Error(Fmt("error rotating %s", Info().path));
return false; return false;
} }