mirror of
https://github.com/zeek/zeek.git
synced 2025-10-05 16:18:19 +00:00
Extending queue statistics.
This commit is contained in:
parent
e3f5cbb670
commit
d7c9471818
6 changed files with 54 additions and 4 deletions
|
@ -44,7 +44,7 @@ event bro_init() &priority=9
|
||||||
{
|
{
|
||||||
if ( n$node_type == WORKER && n$proxy == node )
|
if ( n$node_type == WORKER && n$proxy == node )
|
||||||
Communication::nodes[i] =
|
Communication::nodes[i] =
|
||||||
[$host=n$ip, $connect=F, $class=i, $sync=T, $auth=T, $events=worker2proxy_events];
|
[$host=n$ip, $connect=F, $class=i, $sync=F, $auth=T, $events=worker2proxy_events];
|
||||||
|
|
||||||
# accepts connections from the previous one.
|
# accepts connections from the previous one.
|
||||||
# (This is not ideal for setups with many proxies)
|
# (This is not ideal for setups with many proxies)
|
||||||
|
|
|
@ -210,11 +210,16 @@ void ProfileLogger::Log()
|
||||||
i != thread_stats.end(); ++i )
|
i != thread_stats.end(); ++i )
|
||||||
{
|
{
|
||||||
threading::MsgThread::Stats s = i->second;
|
threading::MsgThread::Stats s = i->second;
|
||||||
file->Write(fmt("%0.6f %-25s in=%" PRIu64 " out=%" PRIu64 " pending=%" PRIu64 "/%" PRIu64 "\n",
|
file->Write(fmt("%0.6f %-25s in=%" PRIu64 " out=%" PRIu64 " pending=%" PRIu64 "/%" PRIu64
|
||||||
|
" (#queue r/w: in=%" PRIu64 "/%" PRIu64 " out=%" PRIu64 "/%" PRIu64 ")"
|
||||||
|
"\n",
|
||||||
network_time,
|
network_time,
|
||||||
i->first.c_str(),
|
i->first.c_str(),
|
||||||
s.sent_in, s.sent_out,
|
s.sent_in, s.sent_out,
|
||||||
s.pending_in, s.pending_out));
|
s.pending_in, s.pending_out,
|
||||||
|
s.queue_in_stats.num_reads, s.queue_in_stats.num_writes,
|
||||||
|
s.queue_out_stats.num_reads, s.queue_out_stats.num_writes
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Script-level state.
|
// Script-level state.
|
||||||
|
|
|
@ -212,7 +212,7 @@ protected:
|
||||||
const threading::Field* const* fields; // The log fields.
|
const threading::Field* const* fields; // The log fields.
|
||||||
|
|
||||||
// Buffer for bulk writes.
|
// Buffer for bulk writes.
|
||||||
static const int WRITER_BUFFER_SIZE = 50;
|
static const int WRITER_BUFFER_SIZE = 1000;
|
||||||
int write_buffer_pos; // Position of next write in buffer.
|
int write_buffer_pos; // Position of next write in buffer.
|
||||||
threading::Value*** write_buffer; // Buffer of size WRITER_BUFFER_SIZE.
|
threading::Value*** write_buffer; // Buffer of size WRITER_BUFFER_SIZE.
|
||||||
};
|
};
|
||||||
|
|
|
@ -283,5 +283,7 @@ void MsgThread::GetStats(Stats* stats)
|
||||||
stats->sent_out = cnt_sent_out;
|
stats->sent_out = cnt_sent_out;
|
||||||
stats->pending_in = queue_in.Size();
|
stats->pending_in = queue_in.Size();
|
||||||
stats->pending_out = queue_out.Size();
|
stats->pending_out = queue_out.Size();
|
||||||
|
queue_in.GetStats(&stats->queue_in_stats);
|
||||||
|
queue_out.GetStats(&stats->queue_out_stats);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -154,6 +154,10 @@ public:
|
||||||
uint64_t sent_out; //! Number of messages sent from the child thread to the main thread
|
uint64_t sent_out; //! Number of messages sent from the child thread to the main thread
|
||||||
uint64_t pending_in; //! Number of messages sent to the child but not yet processed.
|
uint64_t pending_in; //! Number of messages sent to the child but not yet processed.
|
||||||
uint64_t pending_out; //! Number of messages sent from the child but not yet processed by the main thread.
|
uint64_t pending_out; //! Number of messages sent from the child but not yet processed by the main thread.
|
||||||
|
|
||||||
|
/// Statistics from our queues.
|
||||||
|
Queue<BasicInputMessage *>::Stats queue_in_stats;
|
||||||
|
Queue<BasicOutputMessage *>::Stats queue_out_stats;
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -58,6 +58,22 @@ public:
|
||||||
*/
|
*/
|
||||||
uint64_t Size();
|
uint64_t Size();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Statistics about inter-thread communication.
|
||||||
|
*/
|
||||||
|
struct Stats
|
||||||
|
{
|
||||||
|
uint64_t num_reads; //! Number of messages read from the queue.
|
||||||
|
uint64_t num_writes; //! Number of messages written to the queue.
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns statistics about the queue's usage.
|
||||||
|
*
|
||||||
|
* @param stats A pointer to a structure that will be filled with
|
||||||
|
* current numbers. */
|
||||||
|
void GetStats(Stats* stats);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static const int NUM_QUEUES = 8;
|
static const int NUM_QUEUES = 8;
|
||||||
|
|
||||||
|
@ -67,6 +83,10 @@ private:
|
||||||
|
|
||||||
int read_ptr; // Where the next operation will read from
|
int read_ptr; // Where the next operation will read from
|
||||||
int write_ptr; // Where the next operation will write to
|
int write_ptr; // Where the next operation will write to
|
||||||
|
|
||||||
|
// Statistics.
|
||||||
|
uint64_t num_reads;
|
||||||
|
uint64_t num_writes;
|
||||||
};
|
};
|
||||||
|
|
||||||
inline static void safe_lock(pthread_mutex_t* mutex)
|
inline static void safe_lock(pthread_mutex_t* mutex)
|
||||||
|
@ -86,6 +106,7 @@ inline Queue<T>::Queue()
|
||||||
{
|
{
|
||||||
read_ptr = 0;
|
read_ptr = 0;
|
||||||
write_ptr = 0;
|
write_ptr = 0;
|
||||||
|
num_reads = num_writes = 0;
|
||||||
|
|
||||||
for( int i = 0; i < NUM_QUEUES; ++i )
|
for( int i = 0; i < NUM_QUEUES; ++i )
|
||||||
{
|
{
|
||||||
|
@ -121,6 +142,7 @@ inline T Queue<T>::Get()
|
||||||
messages[read_ptr].pop();
|
messages[read_ptr].pop();
|
||||||
|
|
||||||
read_ptr = (read_ptr + 1) % NUM_QUEUES;
|
read_ptr = (read_ptr + 1) % NUM_QUEUES;
|
||||||
|
++num_reads;
|
||||||
|
|
||||||
safe_unlock(&mutex[old_read_ptr]);
|
safe_unlock(&mutex[old_read_ptr]);
|
||||||
|
|
||||||
|
@ -142,6 +164,7 @@ inline void Queue<T>::Put(T data)
|
||||||
pthread_cond_signal(&has_data[write_ptr]);
|
pthread_cond_signal(&has_data[write_ptr]);
|
||||||
|
|
||||||
write_ptr = (write_ptr + 1) % NUM_QUEUES;
|
write_ptr = (write_ptr + 1) % NUM_QUEUES;
|
||||||
|
++num_writes;
|
||||||
|
|
||||||
safe_unlock(&mutex[old_write_ptr]);
|
safe_unlock(&mutex[old_write_ptr]);
|
||||||
}
|
}
|
||||||
|
@ -177,7 +200,23 @@ inline uint64_t Queue<T>::Size()
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template<typename T>
|
||||||
|
inline void Queue<T>::GetStats(Stats* stats)
|
||||||
|
{
|
||||||
|
// To be safe, we look all queues. That's probably unneccessary, but
|
||||||
|
// doesn't really hurt.
|
||||||
|
for ( int i = 0; i < NUM_QUEUES; i++ )
|
||||||
|
safe_lock(&mutex[i]);
|
||||||
|
|
||||||
|
stats->num_reads = num_reads;
|
||||||
|
stats->num_writes = num_writes;
|
||||||
|
|
||||||
|
for ( int i = 0; i < NUM_QUEUES; i++ )
|
||||||
|
safe_unlock(&mutex[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue