Fix for when not producing local output; that hung.

* origin/topic/robin/dataseries:
  Moving trace for rotation test into traces directory.
  Fixing a rotation race condition at termination.
  Portability fixes.
  Extending DS docs with some examples.
  Updating doc.
  Fixing pack_scale and time-as-int.
  Adding format specifier to DS spec to print out double as %.6f.
  DataSeries updates and fixes.
  DataSeries tuning.
  Tweaking DataSeries support.
  Extending log post-processor call to include the name of the writer.
  Removing an unnecessary const cast.
  DataSeries TODO list with open issues/questions.
  Starting DataSeries HowTo.
  Additional test output canonification for ds2txt's timestamps.
  In threads, an internal error now immediately aborts.
  DataSeries cleanup.
  Working on DataSeries support.
  Merging in DataSeries support from topic/gilbert/logging.
  Fixing  threads' DoFinish() method.
This commit is contained in:
Robin Sommer 2012-05-17 12:38:47 -07:00
commit 7cc863c5fc
52 changed files with 1844 additions and 133 deletions

View file

@ -7,6 +7,7 @@
#include "../NetVar.h"
#include "../Net.h"
#include "threading/Manager.h"
#include "threading/SerialTypes.h"
#include "Manager.h"
@ -16,9 +17,11 @@
#include "writers/Ascii.h"
#include "writers/None.h"
#ifdef USE_DATASERIES
#include "writers/DataSeries.h"
#endif
using namespace logging;
using threading::Value;
using threading::Field;
// Structure describing a log writer type.
struct WriterDefinition {
@ -32,6 +35,9 @@ struct WriterDefinition {
WriterDefinition log_writers[] = {
{ BifEnum::Log::WRITER_NONE, "None", 0, writer::None::Instantiate },
{ BifEnum::Log::WRITER_ASCII, "Ascii", 0, writer::Ascii::Instantiate },
#ifdef USE_DATASERIES
{ BifEnum::Log::WRITER_DATASERIES, "DataSeries", 0, writer::DataSeries::Instantiate },
#endif
// End marker, don't touch.
{ BifEnum::Log::WRITER_DEFAULT, "None", 0, (WriterBackend* (*)(WriterFrontend* frontend))0 }
@ -51,7 +57,7 @@ struct Manager::Filter {
Func* postprocessor;
int num_fields;
Field** fields;
threading::Field** fields;
// Vector indexed by field number. Each element is a list of record
// indices defining a path leading to the value across potential
@ -119,6 +125,7 @@ Manager::Stream::~Stream()
Manager::Manager()
{
rotations_pending = 0;
}
Manager::~Manager()
@ -127,6 +134,16 @@ Manager::~Manager()
delete *s;
}
list<string> Manager::SupportedFormats()
{
list<string> formats;
for ( WriterDefinition* ld = log_writers; ld->type != BifEnum::Log::WRITER_DEFAULT; ++ld )
formats.push_back(ld->name);
return formats;
}
WriterBackend* Manager::CreateBackend(WriterFrontend* frontend, bro_int_t type)
{
WriterDefinition* ld = log_writers;
@ -135,7 +152,7 @@ WriterBackend* Manager::CreateBackend(WriterFrontend* frontend, bro_int_t type)
{
if ( ld->type == BifEnum::Log::WRITER_DEFAULT )
{
reporter->Error("unknow writer when creating writer");
reporter->Error("unknown writer type requested");
return 0;
}
@ -159,10 +176,8 @@ WriterBackend* Manager::CreateBackend(WriterFrontend* frontend, bro_int_t type)
// function.
ld->factory = 0;
DBG_LOG(DBG_LOGGING, "failed to init writer class %s",
ld->name);
return false;
reporter->Error("initialization of writer %s failed", ld->name);
return 0;
}
}
@ -449,7 +464,7 @@ bool Manager::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt,
filter->indices.push_back(new_indices);
filter->fields = (Field**)
filter->fields = (threading::Field**)
realloc(filter->fields,
sizeof(Field) * ++filter->num_fields);
@ -459,7 +474,7 @@ bool Manager::TraverseRecord(Stream* stream, Filter* filter, RecordType* rt,
return false;
}
Field* field = new Field();
threading::Field* field = new threading::Field();
field->name = new_path;
field->type = t->Tag();
@ -571,7 +586,7 @@ bool Manager::AddFilter(EnumVal* id, RecordVal* fval)
for ( int i = 0; i < filter->num_fields; i++ )
{
Field* field = filter->fields[i];
threading::Field* field = filter->fields[i];
DBG_LOG(DBG_LOGGING, " field %10s: %s",
field->name.c_str(), type_name(field->type));
}
@ -743,10 +758,10 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
// Copy the fields for WriterFrontend::Init() as it
// will take ownership.
Field** arg_fields = new Field*[filter->num_fields];
threading::Field** arg_fields = new threading::Field*[filter->num_fields];
for ( int j = 0; j < filter->num_fields; ++j )
arg_fields[j] = new Field(*filter->fields[j]);
arg_fields[j] = new threading::Field(*filter->fields[j]);
writer = CreateWriter(stream->id, filter->writer,
path, filter->num_fields,
@ -897,10 +912,10 @@ threading::Value* Manager::ValToLogVal(Val* val, BroType* ty)
return lval;
}
Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
RecordVal* columns)
{
Value** vals = new Value*[filter->num_fields];
threading::Value** vals = new threading::Value*[filter->num_fields];
for ( int i = 0; i < filter->num_fields; ++i )
{
@ -919,7 +934,7 @@ Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
if ( ! val )
{
// Value, or any of its parents, is not set.
vals[i] = new Value(filter->fields[i]->type, false);
vals[i] = new threading::Value(filter->fields[i]->type, false);
break;
}
}
@ -932,7 +947,7 @@ Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
}
WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
int num_fields, const Field* const* fields, bool local, bool remote)
int num_fields, const threading::Field* const* fields, bool local, bool remote)
{
Stream* stream = FindStream(id);
@ -996,7 +1011,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
return writer_obj;
}
void Manager::DeleteVals(int num_fields, Value** vals)
void Manager::DeleteVals(int num_fields, threading::Value** vals)
{
// Note this code is duplicated in WriterBackend::DeleteVals().
for ( int i = 0; i < num_fields; i++ )
@ -1006,7 +1021,7 @@ void Manager::DeleteVals(int num_fields, Value** vals)
}
bool Manager::Write(EnumVal* id, EnumVal* writer, string path, int num_fields,
Value** vals)
threading::Value** vals)
{
Stream* stream = FindStream(id);
@ -1113,10 +1128,19 @@ bool Manager::Flush(EnumVal* id)
void Manager::Terminate()
{
// Make sure we process all the pending rotations.
while ( rotations_pending )
{
thread_mgr->ForceProcessing(); // A blatant layering violation ...
usleep(1000);
}
for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s )
{
if ( *s )
Flush((*s)->id);
if ( ! *s )
continue;
Flush((*s)->id);
}
}
@ -1219,11 +1243,19 @@ void Manager::Rotate(WriterInfo* winfo)
// Trigger the rotation.
winfo->writer->Rotate(tmp, winfo->open_time, network_time, terminating);
++rotations_pending;
}
bool Manager::FinishedRotation(WriterFrontend* writer, string new_name, string old_name,
double open, double close, bool terminating)
{
--rotations_pending;
if ( ! writer )
// Writer didn't produce local output.
return true;
DBG_LOG(DBG_LOGGING, "Finished rotating %s at %.6f, new name %s",
writer->Path().c_str(), network_time, new_name.c_str());