mirror of
https://github.com/zeek/zeek.git
synced 2025-10-06 00:28:21 +00:00

* topic/robin/master-test: (60 commits) Script fix for Linux. Updating test base line. Another small change to MsgThread API. Bug fix for BasicThread. make version_ok return true for TLSv12 Sed usage in canonifier script didn't work on non-Linux systems. Changing HTTP DPD port 3138 to 3128. Temporarily removing tuning/logs-to-elasticsearch.bro from the test-all-policy. More documentation updates. Revert "Fixing calc_next_rotate to use UTC based time functions." Some documentation updates for elasticsearch plugin. Give configure a --disable-perftools option. Updating tests for the #start/#end change. Further threading and API restructuring for logging and input frameworks. Reworking forceful thread termination. Moving the ASCII writer over to use UNIX I/O rather than stdio. Further reworking the thread API. Reworking thread termination logic. If a thread doesn't terminate, we log that but not longer proceed (because it could hang later still). Removing the thread kill functionality. ...
268 lines
5.5 KiB
C++
268 lines
5.5 KiB
C++
#ifndef THREADING_QUEUE_H
|
|
#define THREADING_QUEUE_H
|
|
|
|
#include <pthread.h>
|
|
#include <queue>
|
|
#include <deque>
|
|
#include <stdint.h>
|
|
#include <sys/time.h>
|
|
|
|
#include "Reporter.h"
|
|
#include "BasicThread.h"
|
|
|
|
#undef Queue // Defined elsewhere unfortunately.
|
|
|
|
namespace threading {
|
|
|
|
/**
|
|
* A thread-safe single-reader single-writer queue.
|
|
*
|
|
* The implementation uses multiple queues and reads/writes in rotary fashion
|
|
* in an attempt to limit contention.
|
|
*
|
|
* All Queue instances must be instantiated by Bro's main thread.
|
|
*
|
|
* TODO: Unclear how critical performance is for this qeueue. We could like;y
|
|
* optimize it further if helpful.
|
|
*/
|
|
template<typename T>
|
|
class Queue
|
|
{
|
|
public:
|
|
/**
|
|
* Constructor.
|
|
*
|
|
* reader, writer: The corresponding threads. This is for checking
|
|
* whether they have terminated so that we can abort I/O opeations.
|
|
* Can be left null for the main thread.
|
|
*/
|
|
Queue(BasicThread* arg_reader, BasicThread* arg_writer);
|
|
|
|
/**
|
|
* Destructor.
|
|
*/
|
|
~Queue();
|
|
|
|
/**
|
|
* Retrieves one element. This may block for a little while of no
|
|
* input is available and eventually return with a null element if
|
|
* nothing shows up.
|
|
*/
|
|
T Get();
|
|
|
|
/**
|
|
* Queues one element.
|
|
*/
|
|
void Put(T data);
|
|
|
|
/**
|
|
* Returns true if the next Get() operation will succeed.
|
|
*/
|
|
bool Ready();
|
|
|
|
/**
|
|
* Returns true if the next Get() operation might succeed.
|
|
* This function may occasionally return a value not
|
|
* indicating the actual state, but won't do so very often.
|
|
*/
|
|
bool MaybeReady() { return ( ( read_ptr - write_ptr) != 0 ); }
|
|
|
|
/** Wake up the reader if it's currently blocked for input. This is
|
|
primarily to give it a chance to check termination quickly.
|
|
**/
|
|
void WakeUp();
|
|
|
|
/**
|
|
* Returns the number of queued items not yet retrieved.
|
|
*/
|
|
uint64_t Size();
|
|
|
|
/**
|
|
* Statistics about inter-thread communication.
|
|
*/
|
|
struct Stats
|
|
{
|
|
uint64_t num_reads; //! Number of messages read from the queue.
|
|
uint64_t num_writes; //! Number of messages written to the queue.
|
|
};
|
|
|
|
/**
|
|
* Returns statistics about the queue's usage.
|
|
*
|
|
* @param stats A pointer to a structure that will be filled with
|
|
* current numbers. */
|
|
void GetStats(Stats* stats);
|
|
|
|
private:
|
|
static const int NUM_QUEUES = 8;
|
|
|
|
pthread_mutex_t mutex[NUM_QUEUES]; // Mutex protected shared accesses.
|
|
pthread_cond_t has_data[NUM_QUEUES]; // Signals when data becomes available
|
|
std::queue<T> messages[NUM_QUEUES]; // Actually holds the queued messages
|
|
|
|
int read_ptr; // Where the next operation will read from
|
|
int write_ptr; // Where the next operation will write to
|
|
|
|
BasicThread* reader;
|
|
BasicThread* writer;
|
|
|
|
// Statistics.
|
|
uint64_t num_reads;
|
|
uint64_t num_writes;
|
|
};
|
|
|
|
inline static void safe_lock(pthread_mutex_t* mutex)
|
|
{
|
|
if ( pthread_mutex_lock(mutex) != 0 )
|
|
reporter->FatalErrorWithCore("cannot lock mutex");
|
|
}
|
|
|
|
inline static void safe_unlock(pthread_mutex_t* mutex)
|
|
{
|
|
if ( pthread_mutex_unlock(mutex) != 0 )
|
|
reporter->FatalErrorWithCore("cannot unlock mutex");
|
|
}
|
|
|
|
template<typename T>
|
|
inline Queue<T>::Queue(BasicThread* arg_reader, BasicThread* arg_writer)
|
|
{
|
|
read_ptr = 0;
|
|
write_ptr = 0;
|
|
num_reads = num_writes = 0;
|
|
reader = arg_reader;
|
|
writer = arg_writer;
|
|
|
|
for( int i = 0; i < NUM_QUEUES; ++i )
|
|
{
|
|
if ( pthread_cond_init(&has_data[i], 0) != 0 )
|
|
reporter->FatalError("cannot init queue condition variable");
|
|
|
|
if ( pthread_mutex_init(&mutex[i], 0) != 0 )
|
|
reporter->FatalError("cannot init queue mutex");
|
|
}
|
|
}
|
|
|
|
template<typename T>
|
|
inline Queue<T>::~Queue()
|
|
{
|
|
for( int i = 0; i < NUM_QUEUES; ++i )
|
|
{
|
|
pthread_cond_destroy(&has_data[i]);
|
|
pthread_mutex_destroy(&mutex[i]);
|
|
}
|
|
}
|
|
|
|
template<typename T>
|
|
inline T Queue<T>::Get()
|
|
{
|
|
if ( (reader && reader->Killed()) || (writer && writer->Killed()) )
|
|
return 0;
|
|
|
|
safe_lock(&mutex[read_ptr]);
|
|
|
|
int old_read_ptr = read_ptr;
|
|
|
|
if ( messages[read_ptr].empty() )
|
|
{
|
|
struct timespec ts;
|
|
ts.tv_sec = time(0) + 5;
|
|
ts.tv_nsec = 0;
|
|
|
|
pthread_cond_timedwait(&has_data[read_ptr], &mutex[read_ptr], &ts);
|
|
safe_unlock(&mutex[read_ptr]);
|
|
return 0;
|
|
}
|
|
|
|
T data = messages[read_ptr].front();
|
|
messages[read_ptr].pop();
|
|
|
|
read_ptr = (read_ptr + 1) % NUM_QUEUES;
|
|
++num_reads;
|
|
|
|
safe_unlock(&mutex[old_read_ptr]);
|
|
|
|
return data;
|
|
}
|
|
|
|
template<typename T>
|
|
inline void Queue<T>::Put(T data)
|
|
{
|
|
safe_lock(&mutex[write_ptr]);
|
|
|
|
int old_write_ptr = write_ptr;
|
|
|
|
bool need_signal = messages[write_ptr].empty();
|
|
|
|
messages[write_ptr].push(data);
|
|
|
|
if ( need_signal )
|
|
pthread_cond_signal(&has_data[write_ptr]);
|
|
|
|
write_ptr = (write_ptr + 1) % NUM_QUEUES;
|
|
++num_writes;
|
|
|
|
safe_unlock(&mutex[old_write_ptr]);
|
|
}
|
|
|
|
|
|
template<typename T>
|
|
inline bool Queue<T>::Ready()
|
|
{
|
|
safe_lock(&mutex[read_ptr]);
|
|
|
|
bool ret = (messages[read_ptr].size());
|
|
|
|
safe_unlock(&mutex[read_ptr]);
|
|
|
|
return ret;
|
|
}
|
|
|
|
template<typename T>
|
|
inline uint64_t Queue<T>::Size()
|
|
{
|
|
// Need to lock all queues.
|
|
for ( int i = 0; i < NUM_QUEUES; i++ )
|
|
safe_lock(&mutex[i]);
|
|
|
|
uint64_t size = 0;
|
|
|
|
for ( int i = 0; i < NUM_QUEUES; i++ )
|
|
size += messages[i].size();
|
|
|
|
for ( int i = 0; i < NUM_QUEUES; i++ )
|
|
safe_unlock(&mutex[i]);
|
|
|
|
return size;
|
|
}
|
|
|
|
template<typename T>
|
|
inline void Queue<T>::GetStats(Stats* stats)
|
|
{
|
|
// To be safe, we look all queues. That's probably unneccessary, but
|
|
// doesn't really hurt.
|
|
for ( int i = 0; i < NUM_QUEUES; i++ )
|
|
safe_lock(&mutex[i]);
|
|
|
|
stats->num_reads = num_reads;
|
|
stats->num_writes = num_writes;
|
|
|
|
for ( int i = 0; i < NUM_QUEUES; i++ )
|
|
safe_unlock(&mutex[i]);
|
|
}
|
|
|
|
template<typename T>
|
|
inline void Queue<T>::WakeUp()
|
|
{
|
|
for ( int i = 0; i < NUM_QUEUES; i++ )
|
|
{
|
|
safe_lock(&mutex[i]);
|
|
pthread_cond_signal(&has_data[i]);
|
|
safe_unlock(&mutex[i]);
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
|
|
#endif
|
|
|