Merge branch 'robin/topic/writer-info'

* robin/topic/writer-info:
  Extending the log writer DoInit() API.
  Reworking log writer API to make it easier to pass additional information to a writer's initialization method.

Conflicts:
	src/logging/WriterBackend.cc
	src/logging/WriterBackend.h
	src/logging/WriterFrontend.cc
This commit is contained in:
Robin Sommer 2012-07-02 15:19:15 -07:00
commit 90763bb2f2
27 changed files with 371 additions and 92 deletions

View file

@ -51,6 +51,7 @@ struct Manager::Filter {
string path;
Val* path_val;
EnumVal* writer;
TableVal* config;
bool local;
bool remote;
double interval;
@ -74,6 +75,7 @@ struct Manager::WriterInfo {
double interval;
Func* postprocessor;
WriterFrontend* writer;
WriterBackend::WriterInfo info;
};
struct Manager::Stream {
@ -518,6 +520,7 @@ bool Manager::AddFilter(EnumVal* id, RecordVal* fval)
Val* log_remote = fval->LookupWithDefault(rtype->FieldOffset("log_remote"));
Val* interv = fval->LookupWithDefault(rtype->FieldOffset("interv"));
Val* postprocessor = fval->LookupWithDefault(rtype->FieldOffset("postprocessor"));
Val* config = fval->LookupWithDefault(rtype->FieldOffset("config"));
Filter* filter = new Filter;
filter->name = name->AsString()->CheckString();
@ -529,6 +532,7 @@ bool Manager::AddFilter(EnumVal* id, RecordVal* fval)
filter->remote = log_remote->AsBool();
filter->interval = interv->AsInterval();
filter->postprocessor = postprocessor ? postprocessor->AsFunc() : 0;
filter->config = config->Ref()->AsTableVal();
Unref(name);
Unref(pred);
@ -537,6 +541,7 @@ bool Manager::AddFilter(EnumVal* id, RecordVal* fval)
Unref(log_remote);
Unref(interv);
Unref(postprocessor);
Unref(config);
// Build the list of fields that the filter wants included, including
// potentially rolling out fields.
@ -764,8 +769,27 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
for ( int j = 0; j < filter->num_fields; ++j )
arg_fields[j] = new threading::Field(*filter->fields[j]);
WriterBackend::WriterInfo info;
info.path = path;
HashKey* k;
IterCookie* c = filter->config->AsTable()->InitForIteration();
TableEntryVal* v;
while ( (v = filter->config->AsTable()->NextEntry(k, c)) )
{
ListVal* index = filter->config->RecoverIndex(k);
string key = index->Index(0)->AsString()->CheckString();
string value = v->Value()->AsString()->CheckString();
info.config.insert(std::make_pair(key, value));
Unref(index);
delete k;
}
// CreateWriter() will set the other fields in info.
writer = CreateWriter(stream->id, filter->writer,
path, filter->num_fields,
info, filter->num_fields,
arg_fields, filter->local, filter->remote);
if ( ! writer )
@ -773,7 +797,6 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
Unref(columns);
return false;
}
}
// Alright, can do the write now.
@ -953,7 +976,7 @@ threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
return vals;
}
WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, const WriterBackend::WriterInfo& info,
int num_fields, const threading::Field* const* fields, bool local, bool remote)
{
Stream* stream = FindStream(id);
@ -963,7 +986,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
return false;
Stream::WriterMap::iterator w =
stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), path));
stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), info.path));
if ( w != stream->writers.end() )
// If we already have a writer for this. That's fine, we just
@ -973,8 +996,6 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
WriterFrontend* writer_obj = new WriterFrontend(id, writer, local, remote);
assert(writer_obj);
writer_obj->Init(path, num_fields, fields);
WriterInfo* winfo = new WriterInfo;
winfo->type = writer->Ref()->AsEnumVal();
winfo->writer = writer_obj;
@ -982,6 +1003,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
winfo->rotation_timer = 0;
winfo->interval = 0;
winfo->postprocessor = 0;
winfo->info = info;
// Search for a corresponding filter for the writer/path pair and use its
// rotation settings. If no matching filter is found, fall back on
@ -993,7 +1015,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
{
Filter* f = *it;
if ( f->writer->AsEnum() == writer->AsEnum() &&
f->path == winfo->writer->Path() )
f->path == winfo->writer->info.path )
{
found_filter_match = true;
winfo->interval = f->interval;
@ -1012,9 +1034,19 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
InstallRotationTimer(winfo);
stream->writers.insert(
Stream::WriterMap::value_type(Stream::WriterPathPair(writer->AsEnum(), path),
Stream::WriterMap::value_type(Stream::WriterPathPair(writer->AsEnum(), info.path),
winfo));
// Still need to set the WriterInfo's rotation parameters, which we
// computed above.
const char* base_time = log_rotate_base_time ?
log_rotate_base_time->AsString()->CheckString() : 0;
winfo->info.rotation_interval = winfo->interval;
winfo->info.rotation_base = parse_rotate_base_time(base_time);
writer_obj->Init(winfo->info, num_fields, fields);
return writer_obj;
}
@ -1093,7 +1125,7 @@ void Manager::SendAllWritersTo(RemoteSerializer::PeerID peer)
EnumVal writer_val(i->first.first, BifType::Enum::Log::Writer);
remote_serializer->SendLogCreateWriter(peer, (*s)->id,
&writer_val,
i->first.second,
i->second->info,
writer->NumFields(),
writer->Fields());
}
@ -1218,8 +1250,9 @@ void Manager::InstallRotationTimer(WriterInfo* winfo)
const char* base_time = log_rotate_base_time ?
log_rotate_base_time->AsString()->CheckString() : 0;
double base = parse_rotate_base_time(base_time);
double delta_t =
calc_next_rotate(rotation_interval, base_time);
calc_next_rotate(network_time, rotation_interval, base);
winfo->rotation_timer =
new RotationTimer(network_time + delta_t, winfo, true);
@ -1246,7 +1279,7 @@ void Manager::Rotate(WriterInfo* winfo)
localtime_r(&teatime, &tm);
strftime(buf, sizeof(buf), date_fmt, &tm);
string tmp = string(fmt("%s-%s", winfo->writer->Path().c_str(), buf));
string tmp = string(fmt("%s-%s", winfo->writer->Info().path.c_str(), buf));
// Trigger the rotation.
winfo->writer->Rotate(tmp, winfo->open_time, network_time, terminating);
@ -1274,7 +1307,7 @@ bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string o
RecordVal* info = new RecordVal(BifType::Record::Log::RotationInfo);
info->Assign(0, winfo->type->Ref());
info->Assign(1, new StringVal(new_name.c_str()));
info->Assign(2, new StringVal(winfo->writer->Path().c_str()));
info->Assign(2, new StringVal(winfo->writer->Info().path.c_str()));
info->Assign(3, new Val(open, TYPE_TIME));
info->Assign(4, new Val(close, TYPE_TIME));
info->Assign(5, new Val(terminating, TYPE_BOOL));