Further reworking the thread API.

This commit is contained in:
Robin Sommer 2012-07-17 17:09:49 -07:00
parent f7a6407ab1
commit f6b883bafc
7 changed files with 36 additions and 13 deletions

View file

@ -141,9 +141,6 @@ void WriterFrontend::Stop()
{ {
FlushWriteBuffer(); FlushWriteBuffer();
SetDisable(); SetDisable();
if ( backend )
backend->Stop();
} }
void WriterFrontend::Init(const WriterBackend::WriterInfo& arg_info, int arg_num_fields, const Field* const * arg_fields) void WriterFrontend::Init(const WriterBackend::WriterInfo& arg_info, int arg_num_fields, const Field* const * arg_fields)

View file

@ -50,7 +50,7 @@ public:
/** /**
* Stops all output to this writer. Calling this methods disables all * Stops all output to this writer. Calling this methods disables all
* message forwarding to the backend and stops the backend thread. * message forwarding to the backend.
* *
* This method must only be called from the main thread. * This method must only be called from the main thread.
*/ */

View file

@ -152,12 +152,13 @@ MsgThread::MsgThread() : BasicThread()
{ {
cnt_sent_in = cnt_sent_out = 0; cnt_sent_in = cnt_sent_out = 0;
finished = false; finished = false;
stopped = false;
thread_mgr->AddMsgThread(this); thread_mgr->AddMsgThread(this);
} }
void MsgThread::OnStop() void MsgThread::OnStop()
{ {
if ( finished ) if ( stopped )
return; return;
// Signal thread to terminate and wait until it has acknowledged. // Signal thread to terminate and wait until it has acknowledged.
@ -303,13 +304,8 @@ BasicInputMessage* MsgThread::RetrieveIn()
void MsgThread::Run() void MsgThread::Run()
{ {
while ( true ) while ( ! finished )
{ {
// When requested to terminate, we only do so when
// all input has been processed.
if ( Terminating() && ! queue_in.Ready() )
break;
BasicInputMessage* msg = RetrieveIn(); BasicInputMessage* msg = RetrieveIn();
bool result = msg->Process(); bool result = msg->Process();
@ -318,12 +314,13 @@ void MsgThread::Run()
{ {
string s = msg->Name() + " failed, terminating thread (MsgThread)"; string s = msg->Name() + " failed, terminating thread (MsgThread)";
Error(s.c_str()); Error(s.c_str());
Stop();
break; break;
} }
delete msg; delete msg;
} }
Finished();
} }
void MsgThread::GetStats(Stats* stats) void MsgThread::GetStats(Stats* stats)

View file

@ -293,6 +293,7 @@ private:
uint64_t cnt_sent_out; // Counts message sent by child. uint64_t cnt_sent_out; // Counts message sent by child.
bool finished; // Set to true by Finished message. bool finished; // Set to true by Finished message.
bool stopped; // Set to true by OnStop().
}; };
/** /**

View file

@ -1290,6 +1290,28 @@ uint64 calculate_unique_id(size_t pool)
return HashKey::HashBytes(&(uid_pool[pool].key), sizeof(uid_pool[pool].key)); return HashKey::HashBytes(&(uid_pool[pool].key), sizeof(uid_pool[pool].key));
} }
bool safe_write(int fd, const char* data, int len)
{
return true;
while ( len > 0 )
{
int n = write(fd, data, len);
if ( n < 0 )
{
if ( errno == EINTR )
continue;
return false;
}
data += n;
len -= n;
}
return true;
}
void out_of_memory(const char* where) void out_of_memory(const char* where)
{ {
reporter->FatalError("out of memory in %s.\n", where); reporter->FatalError("out of memory in %s.\n", where);

View file

@ -289,6 +289,11 @@ inline size_t pad_size(size_t size)
#define padded_sizeof(x) (pad_size(sizeof(x))) #define padded_sizeof(x) (pad_size(sizeof(x)))
// Like write() but handles interrupted system calls by restarting. Returns
// true if the write was successful, otherwise sets errno. This function is
// thread-safe as long as no two threads write to the same descriptor.
extern bool safe_write(int fd, const char* data, int len);
extern void out_of_memory(const char* where); extern void out_of_memory(const char* where);
inline void* safe_realloc(void* ptr, size_t size) inline void* safe_realloc(void* ptr, size_t size)
@ -338,4 +343,5 @@ inline int safe_vsnprintf(char* str, size_t size, const char* format, va_list al
// handed out by malloc. // handed out by malloc.
extern void get_memory_usage(unsigned int* total, extern void get_memory_usage(unsigned int* total,
unsigned int* malloced); unsigned int* malloced);
#endif #endif

View file

@ -2,4 +2,4 @@
# #
# Default canonifier used with the tests in testing/btest/*. # Default canonifier used with the tests in testing/btest/*.
`dirname $0`/diff-remove-timestamps `dirname $0`/diff-remove-timestamps | grep -v XXX