Merge remote-tracking branch 'origin/topic/robin/log-threads' into topic/bernhard/input-threads

Conflicts:
	src/threading/Manager.cc
This commit is contained in:
Bernhard Amann 2012-03-18 11:03:04 -07:00
commit b34a0b6deb
16 changed files with 320 additions and 117 deletions

View file

@ -234,7 +234,7 @@ static const int PRINT_BUFFER_SIZE = 10 * 1024;
static const int SOCKBUF_SIZE = 1024 * 1024;
// Buffer size for remote-log data.
static const int LOG_BUFFER_SIZE = 50 * 1024;
static const int LOG_BUFFER_SIZE = 512;
struct ping_args {
uint32 seq;
@ -532,6 +532,7 @@ RemoteSerializer::RemoteSerializer()
terminating = false;
in_sync = 0;
last_flush = 0;
received_logs = 0;
}
RemoteSerializer::~RemoteSerializer()
@ -1353,6 +1354,14 @@ double RemoteSerializer::NextTimestamp(double* local_network_time)
{
Poll(false);
if ( received_logs > 0 )
{
// If we processed logs last time, assume there's more.
idle = false;
received_logs = 0;
return timer_mgr->Time();
}
double et = events.length() ? events[0]->time : -1;
double pt = packets.length() ? packets[0]->time : -1;
@ -2552,7 +2561,9 @@ bool RemoteSerializer::SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, st
if ( ! peer->logs_requested )
return false;
assert(peer->log_buffer);
if ( ! peer->log_buffer )
// Peer shutting down.
return false;
// Serialize the log record entry.
@ -2587,8 +2598,11 @@ bool RemoteSerializer::SendLogWrite(Peer* peer, EnumVal* id, EnumVal* writer, st
if ( len > (LOG_BUFFER_SIZE - peer->log_buffer_used) || (network_time - last_flush > 1.0) )
{
if ( ! FlushLogBuffer(peer) )
{
delete [] data;
return false;
}
}
// If the data is actually larger than our complete buffer, just send it out.
if ( len > LOG_BUFFER_SIZE )
@ -2631,6 +2645,12 @@ bool RemoteSerializer::ProcessLogCreateWriter()
if ( current_peer->state == Peer::CLOSING )
return false;
#ifdef USE_PERFTOOLS
// Don't track allocations here, they'll be released only after the
// main loop exists. And it's just a tiny amount anyway.
HeapLeakChecker::Disabler disabler;
#endif
assert(current_args);
EnumVal* id_val = 0;
@ -2666,7 +2686,7 @@ bool RemoteSerializer::ProcessLogCreateWriter()
id_val = new EnumVal(id, BifType::Enum::Log::ID);
writer_val = new EnumVal(writer, BifType::Enum::Log::Writer);
if ( ! log_mgr->CreateWriter(id_val, writer_val, path, num_fields, fields) )
if ( ! log_mgr->CreateWriter(id_val, writer_val, path, num_fields, fields, true, false) )
goto error;
Unref(id_val);
@ -2735,6 +2755,8 @@ bool RemoteSerializer::ProcessLogWrite()
fmt.EndRead();
++received_logs;
return true;
error:
@ -3376,6 +3398,9 @@ void SocketComm::Run()
small_timeout.tv_usec =
io->CanWrite() || io->CanRead() ? 1 : 10;
if ( ! io->CanWrite() )
usleep(10);
int a = select(max_fd + 1, &fd_read, &fd_write, &fd_except,
&small_timeout);

View file

@ -338,6 +338,7 @@ private:
int propagate_accesses;
bool ignore_accesses;
bool terminating;
int received_logs;
Peer* source_peer;
PeerID id_counter; // Keeps track of assigned IDs.
uint32 current_sync_point;

View file

@ -105,9 +105,6 @@ Manager::Stream::~Stream()
{
WriterInfo* winfo = i->second;
if ( ! winfo )
continue;
if ( winfo->rotation_timer )
timer_mgr->Cancel(winfo->rotation_timer);
@ -207,7 +204,7 @@ Manager::WriterInfo* Manager::FindWriter(WriterFrontend* writer)
{
WriterInfo* winfo = i->second;
if ( winfo && winfo->writer == writer )
if ( winfo->writer == writer )
return winfo;
}
}
@ -221,7 +218,7 @@ void Manager::RemoveDisabledWriters(Stream* stream)
for ( Stream::WriterMap::iterator j = stream->writers.begin(); j != stream->writers.end(); j++ )
{
if ( j->second && j->second->writer->Disabled() )
if ( j->second->writer->Disabled() )
{
j->second->writer->Stop();
delete j->second;
@ -680,11 +677,11 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
Val* path_arg;
if ( filter->path_val )
path_arg = filter->path_val;
path_arg = filter->path_val->Ref();
else
path_arg = new StringVal("");
vl.append(path_arg->Ref());
vl.append(path_arg);
Val* rec_arg;
BroType* rt = filter->path_func->FType()->Args()->FieldType("rec");
@ -718,7 +715,6 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
if ( ! filter->path_val )
{
Unref(path_arg);
filter->path = v->AsString()->CheckString();
filter->path_val = v->Ref();
}
@ -740,7 +736,7 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
if ( w != stream->writers.end() )
// We know this writer already.
writer = w->second ? w->second->writer : 0;
writer = w->second->writer;
else
{
@ -753,64 +749,25 @@ bool Manager::Write(EnumVal* id, RecordVal* columns)
for ( int j = 0; j < filter->num_fields; ++j )
arg_fields[j] = new Field(*filter->fields[j]);
if ( filter->remote )
remote_serializer->SendLogCreateWriter(stream->id,
filter->writer,
path,
filter->num_fields,
arg_fields);
if ( filter->local )
{
writer = CreateWriter(stream->id, filter->writer,
path, filter->num_fields,
arg_fields);
arg_fields, filter->local, filter->remote);
if ( ! writer )
{
Unref(columns);
return false;
}
}
else
{
// Insert a null pointer into the map to make
// sure we don't try creating it again.
stream->writers.insert(Stream::WriterMap::value_type(
Stream::WriterPathPair(filter->writer->AsEnum(), path), 0));
for( int i = 0; i < filter->num_fields; ++i)
delete arg_fields[i];
delete [] arg_fields;
}
}
// Alright, can do the write now.
if ( filter->local || filter->remote )
{
threading::Value** vals = RecordToFilterVals(stream, filter, columns);
if ( filter->remote )
remote_serializer->SendLogWrite(stream->id,
filter->writer,
path,
filter->num_fields,
vals);
if ( filter->local )
{
// Write takes ownership of vals.
assert(writer);
writer->Write(filter->num_fields, vals);
}
else
DeleteVals(filter->num_fields, vals);
}
#ifdef DEBUG
DBG_LOG(DBG_LOGGING, "Wrote record to filter '%s' on stream '%s'",
@ -976,7 +933,7 @@ Value** Manager::RecordToFilterVals(Stream* stream, Filter* filter,
}
WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
int num_fields, const Field* const* fields)
int num_fields, const Field* const* fields, bool local, bool remote)
{
Stream* stream = FindStream(id);
@ -987,12 +944,12 @@ WriterFrontend* Manager::CreateWriter(EnumVal* id, EnumVal* writer, string path,
Stream::WriterMap::iterator w =
stream->writers.find(Stream::WriterPathPair(writer->AsEnum(), path));
if ( w != stream->writers.end() && w->second )
if ( w != stream->writers.end() )
// If we already have a writer for this. That's fine, we just
// return it.
return w->second->writer;
WriterFrontend* writer_obj = new WriterFrontend(writer->AsEnum());
WriterFrontend* writer_obj = new WriterFrontend(id, writer, local, remote);
assert(writer_obj);
writer_obj->Init(path, num_fields, fields);
@ -1089,7 +1046,6 @@ bool Manager::Write(EnumVal* id, EnumVal* writer, string path, int num_fields,
return false;
}
if ( w->second )
w->second->writer->Write(num_fields, vals);
DBG_LOG(DBG_LOGGING,
@ -1111,9 +1067,6 @@ void Manager::SendAllWritersTo(RemoteSerializer::PeerID peer)
for ( Stream::WriterMap::iterator i = stream->writers.begin();
i != stream->writers.end(); i++ )
{
if ( ! i->second )
continue;
WriterFrontend* writer = i->second->writer;
EnumVal writer_val(i->first.first, BifType::Enum::Log::Writer);
@ -1134,10 +1087,7 @@ bool Manager::SetBuf(EnumVal* id, bool enabled)
for ( Stream::WriterMap::iterator i = stream->writers.begin();
i != stream->writers.end(); i++ )
{
if ( i->second )
i->second->writer->SetBuf(enabled);
}
RemoveDisabledWriters(stream);
@ -1155,10 +1105,7 @@ bool Manager::Flush(EnumVal* id)
for ( Stream::WriterMap::iterator i = stream->writers.begin();
i != stream->writers.end(); i++ )
{
if ( i->second )
i->second->writer->Flush();
}
RemoveDisabledWriters(stream);

View file

@ -159,7 +159,8 @@ protected:
// Takes ownership of fields.
WriterFrontend* CreateWriter(EnumVal* id, EnumVal* writer, string path,
int num_fields, const threading::Field* const* fields);
int num_fields, const threading::Field* const* fields,
bool local, bool remote);
// Takes ownership of values..
bool Write(EnumVal* id, EnumVal* writer, string path,

View file

@ -99,21 +99,36 @@ public:
using namespace logging;
WriterFrontend::WriterFrontend(bro_int_t type)
WriterFrontend::WriterFrontend(EnumVal* arg_stream, EnumVal* arg_writer, bool arg_local, bool arg_remote)
{
stream = arg_stream;
writer = arg_writer;
Ref(stream);
Ref(writer);
disabled = initialized = false;
buf = true;
local = arg_local;
remote = arg_remote;
write_buffer = 0;
write_buffer_pos = 0;
ty_name = "<not set>";
backend = log_mgr->CreateBackend(this, type);
if ( local )
{
backend = log_mgr->CreateBackend(this, writer->AsEnum());
assert(backend);
backend->Start();
}
else
backend = 0;
}
WriterFrontend::~WriterFrontend()
{
Unref(stream);
Unref(writer);
}
string WriterFrontend::Name() const
@ -128,6 +143,8 @@ void WriterFrontend::Stop()
{
FlushWriteBuffer();
SetDisable();
if ( backend )
backend->Stop();
}
@ -144,7 +161,17 @@ void WriterFrontend::Init(string arg_path, int arg_num_fields, const Field* cons
fields = arg_fields;
initialized = true;
if ( backend )
backend->SendIn(new InitMessage(backend, arg_path, arg_num_fields, arg_fields));
if ( remote )
remote_serializer->SendLogCreateWriter(stream,
writer,
arg_path,
arg_num_fields,
arg_fields);
}
void WriterFrontend::Write(int num_fields, Value** vals)
@ -152,6 +179,19 @@ void WriterFrontend::Write(int num_fields, Value** vals)
if ( disabled )
return;
if ( remote )
remote_serializer->SendLogWrite(stream,
writer,
path,
num_fields,
vals);
if ( ! backend )
{
DeleteVals(vals);
return;
}
if ( ! write_buffer )
{
// Need new buffer.
@ -173,6 +213,7 @@ void WriterFrontend::FlushWriteBuffer()
// Nothing to do.
return;
if ( backend )
backend->SendIn(new WriteMessage(backend, num_fields, write_buffer_pos, write_buffer));
// Clear buffer (no delete, we pass ownership to child thread.)
@ -187,6 +228,7 @@ void WriterFrontend::SetBuf(bool enabled)
buf = enabled;
if ( backend )
backend->SendIn(new SetBufMessage(backend, enabled));
if ( ! buf )
@ -200,6 +242,8 @@ void WriterFrontend::Flush()
return;
FlushWriteBuffer();
if ( backend )
backend->SendIn(new FlushMessage(backend));
}
@ -209,6 +253,8 @@ void WriterFrontend::Rotate(string rotated_path, double open, double close, bool
return;
FlushWriteBuffer();
if ( backend )
backend->SendIn(new RotateMessage(backend, this, rotated_path, open, close, terminating));
}
@ -218,9 +264,20 @@ void WriterFrontend::Finish()
return;
FlushWriteBuffer();
if ( backend )
backend->SendIn(new FinishMessage(backend));
}
void WriterFrontend::DeleteVals(Value** vals)
{
// Note this code is duplicated in Manager::DeleteVals().
for ( int i = 0; i < num_fields; i++ )
delete vals[i];
delete [] vals;
}

View file

@ -25,14 +25,21 @@ public:
/**
* Constructor.
*
* type: The backend writer type, with the value corresponding to the
* stream: The logging stream.
*
* writer: The backend writer type, with the value corresponding to the
* script-level \c Log::Writer enum (e.g., \a WRITER_ASCII). The
* frontend will internally instantiate a WriterBackend of the
* corresponding type.
*
* local: If true, the writer will instantiate a local backend.
*
* remote: If true, the writer will forward all data to remote
* clients.
*
* Frontends must only be instantiated by the main thread.
*/
WriterFrontend(bro_int_t type);
WriterFrontend(EnumVal* stream, EnumVal* writer, bool local, bool remote);
/**
* Destructor.
@ -187,10 +194,17 @@ public:
protected:
friend class Manager;
void DeleteVals(threading::Value** vals);
EnumVal* stream;
EnumVal* writer;
WriterBackend* backend; // The backend we have instanatiated.
bool disabled; // True if disabled.
bool initialized; // True if initialized.
bool buf; // True if buffering is enabled (default).
bool local; // True if logging locally.
bool remote; // True if loggin remotely.
string ty_name; // Name of the backend type. Set by the manager.
string path; // The log path.

View file

@ -20,8 +20,8 @@ BasicThread::BasicThread()
terminating = false;
pthread = 0;
buf = 0;
buf_len = 1024;
buf_len = 2048;
buf = (char*) malloc(buf_len);
name = Fmt("thread-%d", ++thread_counter);
@ -57,9 +57,6 @@ void BasicThread::SetOSName(const string& name)
const char* BasicThread::Fmt(const char* format, ...)
{
if ( ! buf )
buf = (char*) malloc(buf_len);
va_list al;
va_start(al, format);
int n = safe_vsnprintf(buf, buf_len, format, al);
@ -67,13 +64,15 @@ const char* BasicThread::Fmt(const char* format, ...)
if ( (unsigned int) n >= buf_len )
{ // Not enough room, grow the buffer.
buf_len = n + 32;
buf = (char*) realloc(buf, buf_len);
int tmp_len = n + 32;
char* tmp = (char*) malloc(tmp_len);
// Is it portable to restart?
va_start(al, format);
n = safe_vsnprintf(buf, buf_len, format, al);
n = safe_vsnprintf(tmp, tmp_len, format, al);
va_end(al);
free(tmp);
}
return buf;

View file

@ -102,26 +102,26 @@ void Manager::Process()
next_beat = 0;
}
if ( ! t->HasOut() )
continue;
while ( t->HasOut() )
{
Message* msg = t->RetrieveOut();
if ( msg->Process() )
{
//if ( network_time ) //&& network_time ) // FIXME: ask robin again if he needs this. makes input interface not work in bro_init.
//if ( network_time ) // FIXME: ask robin again if he needs this. makes input interface not work in bro_init.
did_process = true;
}
else
{
string s = msg->Name() + " failed, terminating thread " + t->Name() + " (in ThreadManager)";
string s = msg->Name() + " failed, terminating thread";
reporter->Error("%s", s.c_str());
t->Stop();
}
delete msg;
}
}
// fprintf(stderr, "P %.6f %.6f do_beat=%d did_process=%d next_next=%.6f\n", network_time, timer_mgr->Time(), do_beat, (int)did_process, next_beat);
}

View file

@ -281,7 +281,7 @@ void MsgThread::GetStats(Stats* stats)
{
stats->sent_in = cnt_sent_in;
stats->sent_out = cnt_sent_out;
stats->pending_in = cnt_sent_in - queue_in.Size();
stats->pending_out = cnt_sent_out - queue_out.Size();
stats->pending_in = queue_in.Size();
stats->pending_out = queue_out.Size();
}

View file

@ -0,0 +1,10 @@
#separator \x09
#set_separator ,
#empty_field (empty)
#unset_field -
#path metrics
#fields ts metric_id filter_name index.host index.str index.network value
#types time enum string addr string subnet count
1331256494.591966 TEST_METRIC foo-bar 6.5.4.3 - - 4
1331256494.591966 TEST_METRIC foo-bar 7.2.1.5 - - 2
1331256494.591966 TEST_METRIC foo-bar 1.2.3.4 - - 6

View file

@ -0,0 +1,10 @@
#separator \x09
#set_separator ,
#empty_field (empty)
#unset_field -
#path test.failure
#fields t id.orig_h id.orig_p id.resp_h id.resp_p status country
#types time addr port addr port string string
1331256472.375609 1.2.3.4 1234 2.3.4.5 80 failure US
1331256472.375609 1.2.3.4 1234 2.3.4.5 80 failure UK
1331256472.375609 1.2.3.4 1234 2.3.4.5 80 failure MX

View file

@ -0,0 +1,12 @@
#separator \x09
#set_separator ,
#empty_field (empty)
#unset_field -
#path test
#fields t id.orig_h id.orig_p id.resp_h id.resp_p status country
#types time addr port addr port string string
1331256472.375609 1.2.3.4 1234 2.3.4.5 80 success unknown
1331256472.375609 1.2.3.4 1234 2.3.4.5 80 failure US
1331256472.375609 1.2.3.4 1234 2.3.4.5 80 failure UK
1331256472.375609 1.2.3.4 1234 2.3.4.5 80 success BR
1331256472.375609 1.2.3.4 1234 2.3.4.5 80 failure MX

View file

@ -0,0 +1,9 @@
#separator \x09
#set_separator ,
#empty_field (empty)
#unset_field -
#path test.success
#fields t id.orig_h id.orig_p id.resp_h id.resp_p status country
#types time addr port addr port string string
1331256472.375609 1.2.3.4 1234 2.3.4.5 80 success unknown
1331256472.375609 1.2.3.4 1234 2.3.4.5 80 success BR

View file

@ -0,0 +1,39 @@
# Needs perftools support.
#
# @TEST-REQUIRES: bro --help 2>&1 | grep -q mem-leaks
# @TEST-EXEC: btest-bg-run manager-1 HEAP_CHECK_DUMP_DIRECTORY=. HEAPCHECK=local BROPATH=$BROPATH:.. CLUSTER_NODE=manager-1 bro -m %INPUT
# @TEST-EXEC: btest-bg-run proxy-1 BROPATH=$BROPATH:.. CLUSTER_NODE=proxy-1 bro %INPUT
# @TEST-EXEC: sleep 1
# @TEST-EXEC: btest-bg-run worker-1 HEAP_CHECK_DUMP_DIRECTORY=. HEAPCHECK=local BROPATH=$BROPATH:.. CLUSTER_NODE=worker-1 bro -m -r $TRACES/web.trace --pseudo-realtime %INPUT
# @TEST-EXEC: btest-bg-run worker-2 HEAP_CHECK_DUMP_DIRECTORY=. HEAPCHECK=local BROPATH=$BROPATH:.. CLUSTER_NODE=worker-2 bro -m -r $TRACES/web.trace --pseudo-realtime %INPUT
# @TEST-EXEC: btest-bg-wait -k 30
# @TEST-EXEC: btest-diff manager-1/metrics.log
@TEST-START-FILE cluster-layout.bro
redef Cluster::nodes = {
["manager-1"] = [$node_type=Cluster::MANAGER, $ip=127.0.0.1, $p=37757/tcp, $workers=set("worker-1")],
["proxy-1"] = [$node_type=Cluster::PROXY, $ip=127.0.0.1, $p=37758/tcp, $manager="manager-1", $workers=set("worker-1")],
["worker-1"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37760/tcp, $manager="manager-1", $proxy="proxy-1", $interface="eth0"],
["worker-2"] = [$node_type=Cluster::WORKER, $ip=127.0.0.1, $p=37761/tcp, $manager="manager-1", $proxy="proxy-1", $interface="eth1"],
};
@TEST-END-FILE
redef Log::default_rotation_interval = 0secs;
redef enum Metrics::ID += {
TEST_METRIC,
};
event bro_init() &priority=5
{
Metrics::add_filter(TEST_METRIC,
[$name="foo-bar",
$break_interval=3secs]);
if ( Cluster::local_node_type() == Cluster::WORKER )
{
Metrics::add_data(TEST_METRIC, [$host=1.2.3.4], 3);
Metrics::add_data(TEST_METRIC, [$host=6.5.4.3], 2);
Metrics::add_data(TEST_METRIC, [$host=7.2.1.5], 1);
}
}

View file

@ -0,0 +1,79 @@
# Needs perftools support.
#
# @TEST-REQUIRES: bro --help 2>&1 | grep -q mem-leaks
#
# @TEST-EXEC: btest-bg-run sender HEAP_CHECK_DUMP_DIRECTORY=. HEAPCHECK=local bro -m --pseudo-realtime %INPUT ../sender.bro
# @TEST-EXEC: sleep 1
# @TEST-EXEC: btest-bg-run receiver HEAP_CHECK_DUMP_DIRECTORY=. HEAPCHECK=local bro -m --pseudo-realtime %INPUT ../receiver.bro
# @TEST-EXEC: sleep 1
# @TEST-EXEC: btest-bg-wait -k 10
# @TEST-EXEC: btest-diff sender/test.log
# @TEST-EXEC: btest-diff sender/test.failure.log
# @TEST-EXEC: btest-diff sender/test.success.log
# @TEST-EXEC: cmp receiver/test.log sender/test.log
# @TEST-EXEC: cmp receiver/test.failure.log sender/test.failure.log
# @TEST-EXEC: cmp receiver/test.success.log sender/test.success.log
# This is the common part loaded by both sender and receiver.
module Test;
export {
# Create a new ID for our log stream
redef enum Log::ID += { LOG };
# Define a record with all the columns the log file can have.
# (I'm using a subset of fields from ssh-ext for demonstration.)
type Log: record {
t: time;
id: conn_id; # Will be rolled out into individual columns.
status: string &optional;
country: string &default="unknown";
} &log;
}
event bro_init()
{
Log::create_stream(Test::LOG, [$columns=Log]);
Log::add_filter(Test::LOG, [$name="f1", $path="test.success", $pred=function(rec: Log): bool { return rec$status == "success"; }]);
}
#####
@TEST-START-FILE sender.bro
module Test;
@load frameworks/communication/listen
function fail(rec: Log): bool
{
return rec$status != "success";
}
event remote_connection_handshake_done(p: event_peer)
{
Log::add_filter(Test::LOG, [$name="f2", $path="test.failure", $pred=fail]);
local cid = [$orig_h=1.2.3.4, $orig_p=1234/tcp, $resp_h=2.3.4.5, $resp_p=80/tcp];
local r: Log = [$t=network_time(), $id=cid, $status="success"];
# Log something.
Log::write(Test::LOG, r);
Log::write(Test::LOG, [$t=network_time(), $id=cid, $status="failure", $country="US"]);
Log::write(Test::LOG, [$t=network_time(), $id=cid, $status="failure", $country="UK"]);
Log::write(Test::LOG, [$t=network_time(), $id=cid, $status="success", $country="BR"]);
Log::write(Test::LOG, [$t=network_time(), $id=cid, $status="failure", $country="MX"]);
disconnect(p);
}
@TEST-END-FILE
@TEST-START-FILE receiver.bro
#####
redef Communication::nodes += {
["foo"] = [$host = 127.0.0.1, $connect=T, $request_logs=T]
};
@TEST-END-FILE

View file

@ -7,4 +7,4 @@
cat $1 | sed "s#bro *\"\./#../../../build/src/bro \".tmp/$TEST_NAME/#g" | sed 's/ *--gv//g' >$1.tmp && mv $1.tmp $1
grep -q "No leaks found" $1
grep -qv "detected leaks of" $1