Merge remote-tracking branch 'origin/master' into topic/bernhard/sqlite

This commit is contained in:
Bernhard Amann 2012-10-08 10:31:22 -07:00
commit 87ef8fe649
360 changed files with 6010 additions and 990 deletions

View file

@ -95,6 +95,7 @@ struct Manager::WriterInfo {
Func* postprocessor;
WriterFrontend* writer;
WriterBackend::WriterInfo* info;
bool from_remote;
string instantiating_filter;
};
@ -249,6 +250,29 @@ Manager::WriterInfo* Manager::FindWriter(WriterFrontend* writer)
return 0;
}
bool Manager::CompareFields(const Filter* filter, const WriterFrontend* writer)
{
if ( filter->num_fields != writer->NumFields() )
return false;
for ( int i = 0; i < filter->num_fields; ++ i)
if ( filter->fields[i]->type != writer->Fields()[i]->type )
return false;
return true;
}
bool Manager::CheckFilterWriterConflict(const WriterInfo* winfo, const Filter* filter)
{
if ( winfo->from_remote )
// If the writer was instantiated as a result of remote logging, then
// a filter and writer are only compatible if field types match
return ! CompareFields(filter, winfo->writer);
else
// If the writer was instantiated locally, it is bound to one filter
return winfo->instantiating_filter != filter->name;
}
void Manager::RemoveDisabledWriters(Stream* stream)
{
list<Stream::WriterPathPair> disabled;
@ -695,16 +719,13 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
int result = 1;
try
Val* v = filter->pred->Call(&vl);
if ( v )
{
Val* v = filter->pred->Call(&vl);
result = v->AsBool();
Unref(v);
}
catch ( InterpreterException& e )
{ /* Already reported. */ }
if ( ! result )
continue;
}
@ -735,15 +756,10 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
Val* v = 0;
try
{
v = filter->path_func->Call(&vl);
}
v = filter->path_func->Call(&vl);
catch ( InterpreterException& e )
{
if ( ! v )
return false;
}
if ( ! v->Type()->Tag() == TYPE_STRING )
{
@ -767,22 +783,43 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
#endif
}
Stream::WriterPathPair wpp(filter->writer->AsEnum(), path);
// See if we already have a writer for this path.
Stream::WriterMap::iterator w =
stream->writers.find(Stream::WriterPathPair(filter->writer->AsEnum(), path));
Stream::WriterMap::iterator w = stream->writers.find(wpp);
if ( w != stream->writers.end() &&
CheckFilterWriterConflict(w->second, filter) )
{
// Auto-correct path due to conflict over the writer/path pairs.
string instantiator = w->second->instantiating_filter;
string new_path;
unsigned int i = 2;
do {
char num[32];
snprintf(num, sizeof(num), "-%u", i++);
new_path = path + num;
wpp.second = new_path;
w = stream->writers.find(wpp);
} while ( w != stream->writers.end() &&
CheckFilterWriterConflict(w->second, filter) );
Unref(filter->path_val);
filter->path_val = new StringVal(new_path.c_str());
reporter->Warning("Write using filter '%s' on path '%s' changed to"
" use new path '%s' to avoid conflict with filter '%s'",
filter->name.c_str(), path.c_str(), new_path.c_str(),
instantiator.c_str());
path = filter->path = filter->path_val->AsString()->CheckString();
}
WriterFrontend* writer = 0;
if ( w != stream->writers.end() )
{
if ( w->second->instantiating_filter != filter->name )
{
reporter->Warning("Skipping write to filter '%s' on path '%s'"
" because filter '%s' has already instantiated the same"
" writer type for that path", filter->name.c_str(),
filter->path.c_str(), w->second->instantiating_filter.c_str());
continue;
}
// We know this writer already.
writer = w->second->writer;
}
@ -819,8 +856,8 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
// CreateWriter() will set the other fields in info.
writer = CreateWriter(stream->id, filter->writer,
info, filter->num_fields,
arg_fields, filter->local, filter->remote, filter->name);
info, filter->num_fields, arg_fields, filter->local,
filter->remote, false, filter->name);
if ( ! writer )
{
@ -1019,7 +1056,7 @@ threading::Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
}
WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info,
int num_fields, const threading::Field* const* fields, bool local, bool remote,
int num_fields, const threading::Field* const* fields, bool local, bool remote, bool from_remote,
const string& instantiating_filter)
{
Stream* stream = FindStream(id);
@ -1044,6 +1081,7 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, WriterBacken
winfo->interval = 0;
winfo->postprocessor = 0;
winfo->info = info;
winfo->from_remote = from_remote;
winfo->instantiating_filter = instantiating_filter;
// Search for a corresponding filter for the writer/path pair and use its
@ -1210,12 +1248,16 @@ bool Manager::Flush(EnumVal* id)
void Manager::Terminate()
{
// Make sure we process all the pending rotations.
while ( rotations_pending )
while ( rotations_pending > 0 )
{
thread_mgr->ForceProcessing(); // A blatant layering violation ...
usleep(1000);
}
if ( rotations_pending < 0 )
reporter->InternalError("Negative pending log rotations: %d", rotations_pending);
for ( vector<Stream *>::iterator s = streams.begin(); s != streams.end(); ++s )
{
if ( ! *s )
@ -1329,13 +1371,18 @@ void Manager::Rotate(WriterInfo* winfo)
}
bool Manager::FinishedRotation(WriterFrontend* writer, const char* new_name, const char* old_name,
double open, double close, bool terminating)
double open, double close, bool success, bool terminating)
{
assert(writer);
--rotations_pending;
if ( ! writer )
// Writer didn't produce local output.
if ( ! success )
{
DBG_LOG(DBG_LOGGING, "Non-successful rotating writer '%s', file '%s' at %.6f,",
writer->Name(), filename, network_time);
return true;
}
DBG_LOG(DBG_LOGGING, "Finished rotating %s at %.6f, new name %s",
writer->Name(), network_time, new_name);
@ -1369,16 +1416,12 @@ bool Manager::FinishedRotation(WriterFrontend* writer, const char* new_name, con
int result = 0;
try
Val* v = func->Call(&vl);
if ( v )
{
Val* v = func->Call(&vl);
result = v->AsBool();
Unref(v);
}
catch ( InterpreterException& e )
{ /* Already reported. */ }
return result;
}

View file

@ -153,6 +153,7 @@ public:
protected:
friend class WriterFrontend;
friend class RotationFinishedMessage;
friend class RotationFailedMessage;
friend class ::RemoteSerializer;
friend class ::RotationTimer;
@ -165,7 +166,7 @@ protected:
// Takes ownership of fields and info.
WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, WriterBackend::WriterInfo* info,
int num_fields, const threading::Field* const* fields,
bool local, bool remote, const string& instantiating_filter="");
bool local, bool remote, bool from_remote, const string& instantiating_filter="");
// Takes ownership of values..
bool Write(EnumVal* id, EnumVal* writer, string path,
@ -176,7 +177,7 @@ protected:
// Signals that a file has been rotated.
bool FinishedRotation(WriterFrontend* writer, const char* new_name, const char* old_name,
double open, double close, bool terminating);
double open, double close, bool success, bool terminating);
// Deletes the values as passed into Write().
void DeleteVals(int num_fields, threading::Value** vals);
@ -199,6 +200,8 @@ private:
void Rotate(WriterInfo* info);
Filter* FindFilter(EnumVal* id, StringVal* filter);
WriterInfo* FindWriter(WriterFrontend* writer);
bool CompareFields(const Filter* filter, const WriterFrontend* writer);
bool CheckFilterWriterConflict(const WriterInfo* winfo, const Filter* filter);
vector<Stream *> streams; // Indexed by stream enum.
int rotations_pending; // Number of rotations not yet finished.

View file

@ -19,10 +19,10 @@ class RotationFinishedMessage : public threading::OutputMessage<WriterFrontend>
{
public:
RotationFinishedMessage(WriterFrontend* writer, const char* new_name, const char* old_name,
double open, double close, bool terminating)
double open, double close, bool success, bool terminating)
: threading::OutputMessage<WriterFrontend>("RotationFinished", writer),
new_name(copy_string(new_name)), old_name(copy_string(old_name)), open(open),
close(close), terminating(terminating) { }
close(close), success(success), terminating(terminating) { }
virtual ~RotationFinishedMessage()
{
@ -32,7 +32,7 @@ public:
virtual bool Process()
{
return log_mgr->FinishedRotation(Object(), new_name, old_name, open, close, terminating);
return log_mgr->FinishedRotation(Object(), new_name, old_name, open, close, success, terminating);
}
private:
@ -40,6 +40,7 @@ private:
const char* old_name;
double open;
double close;
bool success;
bool terminating;
};
@ -126,6 +127,7 @@ WriterBackend::WriterBackend(WriterFrontend* arg_frontend) : MsgThread()
buffering = true;
frontend = arg_frontend;
info = new WriterInfo(frontend->Info());
rotation_counter = 0;
SetName(frontend->Name());
}
@ -160,7 +162,15 @@ void WriterBackend::DeleteVals(int num_writes, Value*** vals)
bool WriterBackend::FinishedRotation(const char* new_name, const char* old_name,
double open, double close, bool terminating)
{
SendOut(new RotationFinishedMessage(frontend, new_name, old_name, open, close, terminating));
--rotation_counter;
SendOut(new RotationFinishedMessage(frontend, new_name, old_name, open, close, true, terminating));
return true;
}
bool WriterBackend::FinishedRotation()
{
--rotation_counter;
SendOut(new RotationFinishedMessage(frontend, 0, 0, 0, 0, false, false));
return true;
}
@ -174,6 +184,9 @@ bool WriterBackend::Init(int arg_num_fields, const Field* const* arg_fields)
num_fields = arg_num_fields;
fields = arg_fields;
if ( Failed() )
return true;
if ( ! DoInit(*info, arg_num_fields, arg_fields) )
{
DisableFrontend();
@ -222,12 +235,15 @@ bool WriterBackend::Write(int arg_num_fields, int num_writes, Value*** vals)
bool success = true;
for ( int j = 0; j < num_writes; j++ )
if ( ! Failed() )
{
success = DoWrite(num_fields, fields, vals[j]);
for ( int j = 0; j < num_writes; j++ )
{
success = DoWrite(num_fields, fields, vals[j]);
if ( ! success )
break;
if ( ! success )
break;
}
}
DeleteVals(num_writes, vals);
@ -244,6 +260,9 @@ bool WriterBackend::SetBuf(bool enabled)
// No change.
return true;
if ( Failed() )
return true;
buffering = enabled;
if ( ! DoSetBuf(enabled) )
@ -258,17 +277,32 @@ bool WriterBackend::SetBuf(bool enabled)
bool WriterBackend::Rotate(const char* rotated_path, double open,
double close, bool terminating)
{
if ( Failed() )
return true;
rotation_counter = 1;
if ( ! DoRotate(rotated_path, open, close, terminating) )
{
DisableFrontend();
return false;
}
// Insurance against broken writers.
if ( rotation_counter > 0 )
InternalError(Fmt("writer %s did not call FinishedRotation() in DoRotation()", Name()));
if ( rotation_counter < 0 )
InternalError(Fmt("writer %s called FinishedRotation() more than once in DoRotation()", Name()));
return true;
}
bool WriterBackend::Flush(double network_time)
{
if ( Failed() )
return true;
if ( ! DoFlush(network_time) )
{
DisableFrontend();
@ -280,11 +314,17 @@ bool WriterBackend::Flush(double network_time)
bool WriterBackend::OnFinish(double network_time)
{
if ( Failed() )
return true;
return DoFinish(network_time);
}
bool WriterBackend::OnHeartbeat(double network_time, double current_time)
{
if ( Failed() )
return true;
SendOut(new FlushWriteBufferMessage(frontend));
return DoHeartbeat(network_time, current_time);
}

View file

@ -182,6 +182,8 @@ public:
/**
* Disables the frontend that has instantiated this backend. Once
* disabled,the frontend will not send any further message over.
*
* TODO: Do we still need this method (and the corresponding message)?
*/
void DisableFrontend();
@ -208,11 +210,15 @@ public:
bool IsBuf() { return buffering; }
/**
* Signals that a file has been rotated. This must be called by a
* writer's implementation of DoRotate() once rotation has finished.
* Signals that a file has been successfully rotated and any
* potential post-processor can now run.
*
* Most of the parameters should be passed through from DoRotate().
*
* Note: Exactly one of the two FinishedRotation() methods must be
* called by a writer's implementation of DoRotate() once rotation
* has finished.
*
* @param new_name The filename of the rotated file.
*
* @param old_name The filename of the original file.
@ -227,6 +233,29 @@ public:
bool FinishedRotation(const char* new_name, const char* old_name,
double open, double close, bool terminating);
/**
* Signals that a file rotation request has been processed, but no
* further post-processing needs to be performed (either because
* there was an error, or there was nothing to rotate to begin with
* with this writer).
*
* Note: Exactly one of the two FinishedRotation() methods must be
* called by a writer's implementation of DoRotate() once rotation
* has finished.
*
* @param new_name The filename of the rotated file.
*
* @param old_name The filename of the original file.
*
* @param open: The timestamp when the original file was opened.
*
* @param close: The timestamp when the origina file was closed.
*
* @param terminating: True if the original rotation request occured
* due to the main Bro process shutting down.
*/
bool FinishedRotation();
/** Helper method to render an IP address as a string.
*
* @param addr The address.
@ -323,8 +352,8 @@ protected:
* Writer-specific method implementing log rotation. Most directly
* this only applies to writers writing into files, which should then
* close the current file and open a new one. However, a writer may
* also trigger other apppropiate actions if semantics are similar. *
* Once rotation has finished, the implementation must call
* also trigger other apppropiate actions if semantics are similar.
* Once rotation has finished, the implementation *must* call
* FinishedRotation() to signal the log manager that potential
* postprocessors can now run.
*
@ -386,6 +415,8 @@ private:
int num_fields; // Number of log fields.
const threading::Field* const* fields; // Log fields.
bool buffering; // True if buffering is enabled.
int rotation_counter; // Tracks FinishedRotation() calls.
};

View file

@ -248,9 +248,8 @@ void WriterFrontend::Rotate(const char* rotated_path, double open, double close,
if ( backend )
backend->SendIn(new RotateMessage(backend, this, rotated_path, open, close, terminating));
else
// Still signal log manager that we're done, but signal that
// nothing happened by setting the writer to zeri.
log_mgr->FinishedRotation(0, "", rotated_path, open, close, terminating);
// Still signal log manager that we're done.
log_mgr->FinishedRotation(this, 0, 0, 0, 0, false, terminating);
}
void WriterFrontend::DeleteVals(Value** vals)

View file

@ -81,18 +81,15 @@ void Ascii::CloseFile(double t)
return;
if ( include_meta )
{
string ts = t ? Timestamp(t) : string("<abnormal termination>");
WriteHeaderField("end", ts);
}
WriteHeaderField("close", Timestamp(0));
close(fd);
safe_close(fd);
fd = 0;
}
bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const * fields)
{
assert(! fd);
assert(! fd);
string path = info.path;
@ -124,8 +121,6 @@ bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const *
if ( ! safe_write(fd, str.c_str(), str.length()) )
goto write_error;
string ts = Timestamp(info.network_time);
if ( ! (WriteHeaderField("set_separator", get_escaped_string(
string(set_separator, set_separator_len), false)) &&
WriteHeaderField("empty_field", get_escaped_string(
@ -133,7 +128,7 @@ bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const *
WriteHeaderField("unset_field", get_escaped_string(
string(unset_field, unset_field_len), false)) &&
WriteHeaderField("path", get_escaped_string(path, false)) &&
WriteHeaderField("start", ts)) )
WriteHeaderField("open", Timestamp(0))) )
goto write_error;
for ( int i = 0; i < num_fields; ++i )
@ -364,7 +359,7 @@ bool Ascii::DoWrite(int num_fields, const Field* const * fields,
if ( ! safe_write(fd, bytes, len) )
goto write_error;
if ( IsBuf() )
if ( ! IsBuf() )
fsync(fd);
return true;
@ -378,7 +373,10 @@ bool Ascii::DoRotate(const char* rotated_path, double open, double close, bool t
{
// Don't rotate special files or if there's not one currently open.
if ( ! fd || IsSpecial(Info().path) )
{
FinishedRotation();
return true;
}
CloseFile(close);
@ -419,6 +417,16 @@ string Ascii::Timestamp(double t)
{
time_t teatime = time_t(t);
if ( ! teatime )
{
// Use wall clock.
struct timeval tv;
if ( gettimeofday(&tv, 0) < 0 )
Error("gettimeofday failed");
else
teatime = tv.tv_sec;
}
struct tm tmbuf;
struct tm* tm = localtime_r(&teatime, &tmbuf);

View file

@ -35,7 +35,7 @@ private:
bool DoWriteOne(ODesc* desc, threading::Value* val, const threading::Field* field);
bool WriteHeaderField(const string& key, const string& value);
void CloseFile(double t);
string Timestamp(double t);
string Timestamp(double t); // Uses current time if t is zero.
int fd;
string fname;

View file

@ -243,8 +243,25 @@ bool DataSeries::OpenLog(string path)
log_file->writeExtentLibrary(log_types);
for( size_t i = 0; i < schema_list.size(); ++i )
extents.insert(std::make_pair(schema_list[i].field_name,
GeneralField::create(log_series, schema_list[i].field_name)));
{
string fn = schema_list[i].field_name;
GeneralField* gf = 0;
#ifdef USE_PERFTOOLS_DEBUG
{
// GeneralField isn't cleaning up some results of xml parsing, reported
// here: https://github.com/dataseries/DataSeries/issues/1
// Ignore for now to make leak tests pass. There's confidence that
// we do clean up the GeneralField* since the ExtentSeries dtor for
// member log_series would trigger an assert if dynamically allocated
// fields aren't deleted beforehand.
HeapLeakChecker::Disabler disabler;
#endif
gf = GeneralField::create(log_series, fn);
#ifdef USE_PERFTOOLS_DEBUG
}
#endif
extents.insert(std::make_pair(fn, gf));
}
if ( ds_extent_size < ROW_MIN )
{

View file

@ -48,7 +48,7 @@ ElasticSearch::ElasticSearch(WriterFrontend* frontend) : WriterBackend(frontend)
last_send = current_time();
failing = false;
transfer_timeout = BifConst::LogElasticSearch::transfer_timeout * 1000;
transfer_timeout = static_cast<long>(BifConst::LogElasticSearch::transfer_timeout);
curl_handle = HTTPSetup();
}
@ -322,9 +322,7 @@ bool ElasticSearch::DoRotate(const char* rotated_path, double open, double close
}
if ( ! FinishedRotation(current_index.c_str(), prev_index.c_str(), open, close, terminating) )
{
Error(Fmt("error rotating %s to %s", prev_index.c_str(), current_index.c_str()));
}
return true;
}
@ -359,10 +357,10 @@ CURL* ElasticSearch::HTTPSetup()
return handle;
}
bool ElasticSearch::HTTPReceive(void* ptr, int size, int nmemb, void* userdata)
size_t ElasticSearch::HTTPReceive(void* ptr, int size, int nmemb, void* userdata)
{
//TODO: Do some verification on the result?
return true;
return size;
}
bool ElasticSearch::HTTPSend(CURL *handle)
@ -373,7 +371,11 @@ bool ElasticSearch::HTTPSend(CURL *handle)
// The best (only?) way to disable that is to just use HTTP 1.0
curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0);
//curl_easy_setopt(handle, CURLOPT_TIMEOUT_MS, transfer_timeout);
// Some timeout options. These will need more attention later.
curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT, transfer_timeout);
curl_easy_setopt(handle, CURLOPT_TIMEOUT, transfer_timeout);
curl_easy_setopt(handle, CURLOPT_DNS_CACHE_TIMEOUT, 60*60);
CURLcode return_code = curl_easy_perform(handle);
@ -386,12 +388,16 @@ bool ElasticSearch::HTTPSend(CURL *handle)
{
if ( ! failing )
Error(Fmt("ElasticSearch server may not be accessible."));
break;
}
case CURLE_OPERATION_TIMEDOUT:
{
if ( ! failing )
Warning(Fmt("HTTP operation with elasticsearch server timed out at %" PRIu64 " msecs.", transfer_timeout));
break;
}
case CURLE_OK:
@ -403,10 +409,13 @@ bool ElasticSearch::HTTPSend(CURL *handle)
return true;
else if ( ! failing )
Error(Fmt("Received a non-successful status code back from ElasticSearch server, check the elasticsearch server log."));
break;
}
default:
{
break;
}
}
// The "successful" return happens above

View file

@ -45,7 +45,7 @@ private:
bool UpdateIndex(double now, double rinterval, double rbase);
CURL* HTTPSetup();
bool HTTPReceive(void* ptr, int size, int nmemb, void* userdata);
size_t HTTPReceive(void* ptr, int size, int nmemb, void* userdata);
bool HTTPSend(CURL *handle);
// Buffers, etc.
@ -68,7 +68,7 @@ private:
string path;
string index_prefix;
uint64 transfer_timeout;
long transfer_timeout;
bool failing;
uint64 batch_size;