New bif log_set_buf() to set the buffering state for a stream.

This commit is contained in:
Robin Sommer 2011-02-21 17:33:29 -08:00
parent 434f57f85f
commit cf148c8a25
8 changed files with 78 additions and 37 deletions

View file

@ -1,18 +1,19 @@
List of the things not implemented yet: List of the things not implemented yet:
- Dynamic path function.
- Cluster-style remote_print - Cluster-style remote_print
- Rotation support - Rotation support
- Flushing support
- Spawning writers in separate threads (not clear if we want that initially). - Spawning writers in separate threads (not clear if we want that initially).
- Not sure if the logging does the right thing with &optional and - Not sure if the logging does the right thing with &optional and
&default values. Needs testing. &default values. Needs testing.
- Seems we could do some of the filter-related type checks - Seems we could do some of the filter-related type checks
currently done dynamically at startup via a TraversalCallback. currently done dynamically at startup via a TraversalCallback.
- LogMgs' error handling
- Put script function/constants into namespace.
- Cleanup code.
There's probably more missing. There's probably more missing.
Question: Questions:
* Is giving the record column twice to create_stream too * Is giving the record column twice to create_stream too
redundant? (once directly, and once via the event):: redundant? (once directly, and once via the event)::

View file

@ -76,6 +76,19 @@ LogMgr::~LogMgr()
delete *s; delete *s;
} }
LogMgr::Stream* LogMgr::FindStream(EnumVal* stream_id)
{
unsigned int idx = stream_id->AsEnum();
if ( idx >= streams.size() || ! streams[idx] )
{
run_time("unknown log stream");
return 0;
}
return streams[idx];
}
bool LogMgr::CreateStream(EnumVal* stream_id, RecordType* columns, EventHandlerPtr handler) bool LogMgr::CreateStream(EnumVal* stream_id, RecordType* columns, EventHandlerPtr handler)
{ {
// TODO: Should check that the record has only supported types. // TODO: Should check that the record has only supported types.
@ -182,15 +195,9 @@ bool LogMgr::AddFilter(EnumVal* stream_id, RecordVal* fval)
return false; return false;
} }
unsigned int i = stream_id->AsEnum(); Stream* stream = FindStream(stream_id);
if ( ! stream )
if ( i >= streams.size() || ! streams[i] )
{
run_time("unknown log stream");
return false; return false;
}
Stream* stream = streams[i];
// Find the right writer type. // Find the right writer type.
int writer = 0; int writer = 0;
@ -317,15 +324,9 @@ bool LogMgr::AddFilter(EnumVal* stream_id, RecordVal* fval)
bool LogMgr::RemoveFilter(EnumVal* stream_id, StringVal* filter) bool LogMgr::RemoveFilter(EnumVal* stream_id, StringVal* filter)
{ {
unsigned int idx = stream_id->AsEnum(); Stream* stream = FindStream(stream_id);
if ( ! stream )
if ( idx >= streams.size() || ! streams[idx] )
{
run_time("unknown log stream");
return false; return false;
}
Stream* stream = streams[idx];
string name = filter->AsString()->CheckString(); string name = filter->AsString()->CheckString();
@ -348,15 +349,9 @@ bool LogMgr::RemoveFilter(EnumVal* stream_id, StringVal* filter)
bool LogMgr::Write(EnumVal* stream_id, RecordVal* columns) bool LogMgr::Write(EnumVal* stream_id, RecordVal* columns)
{ {
unsigned int idx = stream_id->AsEnum(); Stream* stream = FindStream(stream_id);
if ( ! stream )
if ( idx >= streams.size() || ! streams[idx] )
{
run_time("unknown log stream");
return false; return false;
}
Stream* stream = streams[idx];
columns = columns->CoerceTo(stream->columns); columns = columns->CoerceTo(stream->columns);
@ -538,9 +533,21 @@ LogVal** LogMgr::RecordToFilterVals(Filter* filter, RecordVal* columns)
return vals; return vals;
} }
bool LogMgr::SetBuf(EnumVal* stream_id, bool enabled)
{
Stream* stream = FindStream(stream_id);
if ( ! stream )
return false;
for ( list<Filter*>::iterator i = stream->filters.begin(); i != stream->filters.end(); ++i )
{
for ( Filter::WriterMap::iterator j = (*i)->writers.begin(); j != (*i)->writers.end(); j++ )
j->second->SetBuf(enabled);
}
return true;
}
void LogMgr::Error(LogWriter* writer, const char* msg) void LogMgr::Error(LogWriter* writer, const char* msg)
{ {
#if 0
#endif
} }

View file

@ -59,6 +59,7 @@ public:
bool AddFilter(EnumVal* stream_id, RecordVal* filter); bool AddFilter(EnumVal* stream_id, RecordVal* filter);
bool RemoveFilter(EnumVal* stream_id, StringVal* filter); bool RemoveFilter(EnumVal* stream_id, StringVal* filter);
bool Write(EnumVal* stream_id, RecordVal* columns); bool Write(EnumVal* stream_id, RecordVal* columns);
bool SetBuf(EnumVal* stream_id, bool enabled); // Changes the state for all writers for that stream.
protected: protected:
friend class LogWriter; friend class LogWriter;
@ -74,6 +75,7 @@ private:
bool TraverseRecord(Filter* filter, RecordType* rt, TableVal* include, TableVal* exclude, string path, list<int> indices); bool TraverseRecord(Filter* filter, RecordType* rt, TableVal* include, TableVal* exclude, string path, list<int> indices);
LogVal** RecordToFilterVals(Filter* filter, RecordVal* columns); LogVal** RecordToFilterVals(Filter* filter, RecordVal* columns);
Stream* FindStream(EnumVal* stream_id);
vector<Stream *> streams; // Indexed by stream enum. vector<Stream *> streams; // Indexed by stream enum.
}; };

View file

@ -6,6 +6,7 @@ LogWriter::LogWriter()
{ {
buf = 0; buf = 0;
buf_len = 1024; buf_len = 1024;
buffering = true;
} }
LogWriter::~LogWriter() LogWriter::~LogWriter()
@ -37,6 +38,16 @@ void LogWriter::Finish()
DoFinish(); DoFinish();
} }
bool LogWriter::SetBuf(bool enabled)
{
if ( enabled == buffering )
// No change.
return true;
buffering = enabled;
return DoSetBuf(enabled);
}
const char* LogWriter::Fmt(const char* format, ...) const char* LogWriter::Fmt(const char* format, ...)
{ {
if ( ! buf ) if ( ! buf )

View file

@ -36,6 +36,9 @@ public:
// performed. // performed.
void Finish(); void Finish();
// Sets the buffering status for the writer, if the writer supports
bool SetBuf(bool enabled);
protected: protected:
//// Methods for Writers to override. //// Methods for Writers to override.
@ -50,15 +53,17 @@ protected:
// should be reported via Error(). // should be reported via Error().
virtual bool DoWrite(int num_fields, LogField** fields, LogVal** vals) = 0; virtual bool DoWrite(int num_fields, LogField** fields, LogVal** vals) = 0;
// Called when the flushing status for this writer is changed. If // Called when the buffering status for this writer is changed. If
// flushing is enabled, the writer should attempt to write out // buffering is disabled, the writer should attempt to write out
// information as quickly as possible even if that may have an // information as quickly as possible even if that may have an
// performance impact. If disabled (which the writer should assume to be // performance impact. If enabled (which the writer should assume to be
// the default), then it can buffer things up as necessary and write out // the default), then it can buffer things up as necessary and write out
// in a way optimized for performance. // in a way optimized for performance. The current buffering state can
// alse be queried via IsBuf().
// //
// A writer may ignore flushing if it doesn't fit with its semantics. // A writer may ignore buffering if it doesn't fit with its semantics.
virtual void DoSetFlushing(bool enabled) = 0; // Still return true in that case.
virtual bool DoSetBuf(bool enabled) = 0;
// Called when a log output is to be rotated. Most directly, this only // Called when a log output is to be rotated. Most directly, this only
// applies to writers outputting files, thoug a writer may also trigger // applies to writers outputting files, thoug a writer may also trigger
@ -91,6 +96,9 @@ protected:
// A thread-safe version of fmt(). // A thread-safe version of fmt().
const char* Fmt(const char* format, ...); const char* Fmt(const char* format, ...);
// Returns the current buffering state.
bool IsBuf() { return buffering; }
// Reports an error. // Reports an error.
void Error(const char *msg); void Error(const char *msg);
@ -104,6 +112,7 @@ private:
string path; string path;
int num_fields; int num_fields;
LogField** fields; LogField** fields;
bool buffering;
// For Fmt(). // For Fmt().
char* buf; char* buf;

View file

@ -125,6 +125,9 @@ bool LogWriterAscii::DoWrite(int num_fields, LogField** fields, LogVal** vals)
return false; return false;
} }
if ( IsBuf() )
fflush(file);
return true; return true;
} }
@ -133,8 +136,10 @@ bool LogWriterAscii::DoRotate(string rotated_path)
return true; return true;
} }
void LogWriterAscii::DoSetFlushing(bool enabled) bool LogWriterAscii::DoSetBuf(bool enabled)
{ {
// Nothing to do.
return true;
} }

View file

@ -17,7 +17,7 @@ public:
protected: protected:
virtual bool DoInit(string path, int num_fields, LogField** fields); virtual bool DoInit(string path, int num_fields, LogField** fields);
virtual bool DoWrite(int num_fields, LogField** fields, LogVal** vals); virtual bool DoWrite(int num_fields, LogField** fields, LogVal** vals);
virtual void DoSetFlushing(bool enabled); virtual bool DoSetBuf(bool enabled);
virtual bool DoRotate(string rotated_path); virtual bool DoRotate(string rotated_path);
virtual void DoFinish(); virtual void DoFinish();

View file

@ -548,6 +548,12 @@ function log_write%(id: Log_ID, columns: any%) : bool
return new Val(result, TYPE_BOOL); return new Val(result, TYPE_BOOL);
%} %}
function log_set_buf%(id: Log_ID, buffered: bool%): bool
%{
bool result = log_mgr->SetBuf(id->AsEnumVal(), buffered);
return new Val(result, TYPE_BOOL);
%}
function record_type_to_vector%(rt: string%): string_vec function record_type_to_vector%(rt: string%): string_vec
%{ %{
VectorVal* result = VectorVal* result =