Remove Log::rotation_control (addresses #572).

Log rotation is now controlled directly through Filter records.

Also addressed a TODO in the default_path_func regarding the
LogMgr::AddFilter function generating internal filter path
suggestions/fallbacks.  Now, if the user doesn't explicitly set a filter
path, the filter's path will be the result of the first call to
default_path_func (happens during the first write to the log).  And in
that case the path suggestion argument to the path_func is an empty
string.
This commit is contained in:
Jon Siwek 2011-09-08 14:46:17 -05:00
parent d8c716ae17
commit fe38c22d2b
11 changed files with 73 additions and 291 deletions

View file

@ -36,6 +36,8 @@ struct LogMgr::Filter {
EnumVal* writer;
bool local;
bool remote;
double interval;
Func* postprocessor;
int num_fields;
LogField** fields;
@ -52,7 +54,9 @@ struct LogMgr::WriterInfo {
EnumVal* type;
double open_time;
Timer* rotation_timer;
LogWriter *writer;
double interval;
Func* postprocessor;
LogWriter* writer;
};
struct LogMgr::Stream {
@ -382,17 +386,6 @@ bool LogVal::Write(SerializationFormat* fmt) const
LogMgr::Filter::~Filter()
{
// If there's a rotation control table entry associated with this
// filter's (writer, path), remove it
TableVal* rc = BifConst::Log::rotation_control->AsTableVal();
ListVal* index = new ListVal(TYPE_ANY);
index->Append(writer->Ref());
index->Append(path_val->Ref());
Unref(rc->Delete(index));
Unref(index);
for ( int i = 0; i < num_fields; ++i )
delete fields[i];
@ -743,6 +736,8 @@ bool LogMgr::AddFilter(EnumVal* id, RecordVal* fval)
Val* path_func = fval->LookupWithDefault(rtype->FieldOffset("path_func"));
Val* log_local = fval->LookupWithDefault(rtype->FieldOffset("log_local"));
Val* log_remote = fval->LookupWithDefault(rtype->FieldOffset("log_remote"));
Val* interv = fval->LookupWithDefault(rtype->FieldOffset("interv"));
Val* postprocessor = fval->LookupWithDefault(rtype->FieldOffset("postprocessor"));
Filter* filter = new Filter;
filter->name = name->AsString()->CheckString();
@ -752,12 +747,16 @@ bool LogMgr::AddFilter(EnumVal* id, RecordVal* fval)
filter->writer = writer->Ref()->AsEnumVal();
filter->local = log_local->AsBool();
filter->remote = log_remote->AsBool();
filter->interval = interv->AsInterval();
filter->postprocessor = postprocessor ? postprocessor->AsFunc() : 0;
Unref(name);
Unref(pred);
Unref(path_func);
Unref(log_local);
Unref(log_remote);
Unref(interv);
Unref(postprocessor);
// Build the list of fields that the filter wants included, including
// potentially rolling out fields.
@ -783,40 +782,14 @@ bool LogMgr::AddFilter(EnumVal* id, RecordVal* fval)
else
{
// If no path is given, use the Stream ID as the default but
// strip the namespace.
const char* s = stream->name.c_str();
const char* e = s + strlen(s);
const char* t = strstr(s, "::");
if ( t )
s = t + 2;
string path(s, e);
std::transform(path.begin(), path.end(), path.begin(), ::tolower);
filter->path = path;
filter->path_val = new StringVal(path.c_str());
// If no path is given, it's derived based upon the value returned by
// the first call to the filter's path_func (during first write)
filter->path_val = 0;
}
// Remove any filter with the same name we might already have.
RemoveFilter(id, filter->name);
// Install the rotation support, if given
Val* rot_val = fval->Lookup(rtype->FieldOffset("rotation"));
if ( rot_val )
{
TableVal* rc = BifConst::Log::rotation_control->AsTableVal();
ListVal* index = new ListVal(TYPE_ANY);
index->Append(filter->writer->Ref());
index->Append(filter->path_val->Ref());
rc->Assign(index, rot_val->Ref());
Unref(index);
}
// Add the new one.
stream->filters.push_back(filter);
@ -927,7 +900,12 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
{
val_list vl(3);
vl.append(id->Ref());
vl.append(filter->path_val->Ref());
Val* path_arg;
if ( filter->path_val )
path_arg = filter->path_val;
else
path_arg = new StringVal("");
vl.append(path_arg->Ref());
vl.append(columns->Ref());
Val* v = filter->path_func->Call(&vl);
@ -938,6 +916,13 @@ bool LogMgr::Write(EnumVal* id, RecordVal* columns)
return false;
}
if ( ! filter->path_val )
{
Unref(path_arg);
filter->path = v->AsString()->CheckString();
filter->path_val = v->Ref();
}
path = v->AsString()->CheckString();
Unref(v);
@ -1264,6 +1249,24 @@ LogWriter* LogMgr::CreateWriter(EnumVal* id, EnumVal* writer, string path,
winfo->writer = writer_obj;
winfo->open_time = network_time;
winfo->rotation_timer = 0;
winfo->interval = 0;
winfo->postprocessor = 0;
// search for a corresponding filter for the writer/path pair and use its
// rotation settings
list<Filter*>::const_iterator it;
for ( it = stream->filters.begin(); it != stream->filters.end(); ++it )
{
Filter* f = *it;
if ( f->writer->AsEnum() == writer->AsEnum() &&
f->path == winfo->writer->Path() )
{
winfo->interval = f->interval;
winfo->postprocessor = f->postprocessor;
break;
}
}
InstallRotationTimer(winfo);
stream->writers.insert(
@ -1441,22 +1444,6 @@ void RotationTimer::Dispatch(double t, int is_expire)
}
}
RecordVal* LogMgr::LookupRotationControl(EnumVal* writer, string path)
{
TableVal* rc = BifConst::Log::rotation_control->AsTableVal();
ListVal* index = new ListVal(TYPE_ANY);
index->Append(writer->Ref());
index->Append(new StringVal(path.c_str()));
Val* r = rc->Lookup(index);
assert(r);
Unref(index);
return r->AsRecordVal();
}
void LogMgr::InstallRotationTimer(WriterInfo* winfo)
{
if ( terminating )
@ -1468,13 +1455,7 @@ void LogMgr::InstallRotationTimer(WriterInfo* winfo)
winfo->rotation_timer = 0;
}
RecordVal* rc =
LookupRotationControl(winfo->type, winfo->writer->Path());
assert(rc);
int idx = rc->Type()->AsRecordType()->FieldOffset("interv");
double rotation_interval = rc->LookupWithDefault(idx)->AsInterval();
double rotation_interval = winfo->interval;
if ( rotation_interval )
{
@ -1535,11 +1516,6 @@ bool LogMgr::FinishedRotation(LogWriter* writer, string new_name, string old_nam
if ( ! winfo )
return true;
RecordVal* rc =
LookupRotationControl(winfo->type, winfo->writer->Path());
assert(rc);
// Create the RotationInfo record.
RecordVal* info = new RecordVal(BifType::Record::Log::RotationInfo);
info->Assign(0, winfo->type->Ref());
@ -1549,15 +1525,12 @@ bool LogMgr::FinishedRotation(LogWriter* writer, string new_name, string old_nam
info->Assign(4, new Val(close, TYPE_TIME));
info->Assign(5, new Val(terminating, TYPE_BOOL));
int idx = rc->Type()->AsRecordType()->FieldOffset("postprocessor");
assert(idx >= 0);
Val* func = rc->Lookup(idx);
Func* func = winfo->postprocessor;
if ( ! func )
{
ID* id = global_scope()->Lookup("Log::__default_rotation_postprocessor");
assert(id);
func = id->ID_Val();
func = id->ID_Val()->AsFunc();
}
assert(func);
@ -1565,7 +1538,10 @@ bool LogMgr::FinishedRotation(LogWriter* writer, string new_name, string old_nam
// Call the postprocessor function.
val_list vl(1);
vl.append(info);
Val* v = func->AsFunc()->Call(&vl);
ODesc d;
func->Describe(&d);
fprintf(stderr, "%s\n", d.Description());
Val* v = func->Call(&vl);
int result = v->AsBool();
Unref(v);
return result;