Merge remote-tracking branch 'origin/topic/johanna/rawleak'

BIT-1594 #merged

* origin/topic/johanna/rawleak:
  Exec: fix reader cleanup when using read_files
  Raw Writer: First step - make code more c++11-y, remove raw pointers.
This commit is contained in:
Robin Sommer 2016-06-17 16:37:58 -07:00
commit 633dcab790
3 changed files with 44 additions and 55 deletions

View file

@ -116,7 +116,7 @@ event Input::end_of_data(orig_name: string, source:string)
if ( track_file !in result$files ) if ( track_file !in result$files )
result$files[track_file] = vector(); result$files[track_file] = vector();
Input::remove(name); Input::remove(orig_name);
if ( name !in pending_files ) if ( name !in pending_files )
delete pending_commands[name]; delete pending_commands[name];

View file

@ -26,10 +26,8 @@ using threading::Field;
const int Raw::block_size = 4096; // how big do we expect our chunks of data to be. const int Raw::block_size = 4096; // how big do we expect our chunks of data to be.
Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend) Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend), file(nullptr, fclose), stderrfile(nullptr, fclose)
{ {
file = 0;
stderrfile = 0;
execute = false; execute = false;
firstrun = true; firstrun = true;
mtime = 0; mtime = 0;
@ -40,8 +38,6 @@ Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend)
sep_length = BifConst::InputRaw::record_separator->Len(); sep_length = BifConst::InputRaw::record_separator->Len();
buf = 0;
outbuf = 0;
bufpos = 0; bufpos = 0;
stdin_fileno = fileno(stdin); stdin_fileno = fileno(stdin);
@ -61,13 +57,9 @@ Raw::~Raw()
void Raw::DoClose() void Raw::DoClose()
{ {
if ( file != 0 ) if ( file )
CloseInput(); CloseInput();
// Just throw away output that has not been flushed.
delete [] buf;
buf = 0;
if ( execute && childpid > 0 && kill(childpid, 0) == 0 ) if ( execute && childpid > 0 && kill(childpid, 0) == 0 )
{ {
// Kill child process group. // Kill child process group.
@ -255,7 +247,7 @@ bool Raw::Execute()
else else
ClosePipeEnd(stderr_in); ClosePipeEnd(stderr_in);
file = fdopen(pipes[stdout_in], "r"); file = std::unique_ptr<FILE, int(*)(FILE*)>(fdopen(pipes[stdout_in], "r"), fclose);
if ( ! file ) if ( ! file )
{ {
@ -267,7 +259,7 @@ bool Raw::Execute()
if ( use_stderr ) if ( use_stderr )
{ {
stderrfile = fdopen(pipes[stderr_in], "r"); stderrfile = std::unique_ptr<FILE, int(*)(FILE*)>(fdopen(pipes[stderr_in], "r"), fclose);
if ( ! stderrfile ) if ( ! stderrfile )
{ {
@ -289,14 +281,14 @@ bool Raw::OpenInput()
else else
{ {
file = fopen(fname.c_str(), "r"); file = std::unique_ptr<FILE, int(*)(FILE*)>(fopen(fname.c_str(), "r"), fclose);
if ( ! file ) if ( ! file )
{ {
Error(Fmt("Init: cannot open %s", fname.c_str())); Error(Fmt("Init: cannot open %s", fname.c_str()));
return false; return false;
} }
if ( ! SetFDFlags(fileno(file), F_SETFD, FD_CLOEXEC) ) if ( ! SetFDFlags(fileno(file.get()), F_SETFD, FD_CLOEXEC) )
Warning(Fmt("Init: cannot set close-on-exec for %s", fname.c_str())); Warning(Fmt("Init: cannot set close-on-exec for %s", fname.c_str()));
} }
@ -305,7 +297,7 @@ bool Raw::OpenInput()
int whence = (offset >= 0) ? SEEK_SET : SEEK_END; int whence = (offset >= 0) ? SEEK_SET : SEEK_END;
int64_t pos = (offset >= 0) ? offset : offset + 1; // we want -1 to be the end of the file int64_t pos = (offset >= 0) ? offset : offset + 1; // we want -1 to be the end of the file
if ( fseek(file, pos, whence) < 0 ) if ( fseek(file.get(), pos, whence) < 0 )
{ {
char buf[256]; char buf[256];
strerror_r(errno, buf, sizeof(buf)); strerror_r(errno, buf, sizeof(buf));
@ -318,7 +310,7 @@ bool Raw::OpenInput()
bool Raw::CloseInput() bool Raw::CloseInput()
{ {
if ( file == 0 ) if ( ! file )
{ {
InternalWarning(Fmt("Trying to close closed file for stream %s", InternalWarning(Fmt("Trying to close closed file for stream %s",
fname.c_str())); fname.c_str()));
@ -328,10 +320,10 @@ bool Raw::CloseInput()
Debug(DBG_INPUT, "Raw reader starting close"); Debug(DBG_INPUT, "Raw reader starting close");
#endif #endif
fclose(file); file.reset(nullptr);
if ( use_stderr ) if ( use_stderr )
fclose(stderrfile); stderrfile.reset(nullptr);
if ( execute ) if ( execute )
{ {
@ -339,9 +331,6 @@ bool Raw::CloseInput()
ClosePipeEnd(i); ClosePipeEnd(i);
} }
file = 0;
stderrfile = 0;
#ifdef DEBUG #ifdef DEBUG
Debug(DBG_INPUT, "Raw reader finished close"); Debug(DBG_INPUT, "Raw reader finished close");
#endif #endif
@ -455,14 +444,14 @@ int64_t Raw::GetLine(FILE* arg_file)
int pos = 0; // strstr_n only works on ints - so no use to use something different here int pos = 0; // strstr_n only works on ints - so no use to use something different here
int offset = 0; int offset = 0;
if ( buf == 0 ) if ( ! buf )
buf = new char[block_size]; buf = std::unique_ptr<char[]>(new char[block_size]);
int repeats = 1; int repeats = 1;
for ( ;; ) for ( ;; )
{ {
size_t readbytes = fread(buf+bufpos+offset, 1, block_size-bufpos, arg_file); size_t readbytes = fread(buf.get()+bufpos+offset, 1, block_size-bufpos, arg_file);
pos += bufpos + readbytes; pos += bufpos + readbytes;
//printf("Pos: %d\n", pos); //printf("Pos: %d\n", pos);
bufpos = offset = 0; // read full block size in next read... bufpos = offset = 0; // read full block size in next read...
@ -473,7 +462,7 @@ int64_t Raw::GetLine(FILE* arg_file)
// researching everything each time is a bit... cpu-intensive. But otherwhise we have // researching everything each time is a bit... cpu-intensive. But otherwhise we have
// to deal with situations where the separator is multi-character and split over multiple // to deal with situations where the separator is multi-character and split over multiple
// reads... // reads...
int found = strstr_n(pos, (unsigned char*) buf, separator.size(), (unsigned char*) separator.c_str()); int found = strstr_n(pos, (unsigned char*) buf.get(), separator.size(), (unsigned char*) separator.c_str());
if ( found == -1 ) if ( found == -1 )
{ {
@ -485,30 +474,27 @@ int64_t Raw::GetLine(FILE* arg_file)
return -1; // signal EOF - and that we had no more data. return -1; // signal EOF - and that we had no more data.
else else
{ {
outbuf = buf; outbuf = std::move(buf); // buf is null after this
buf = 0;
return pos; return pos;
} }
} }
repeats++; repeats++;
// bah, we cannot use realloc because we would have to change the delete in the manager to a free. // bah, we cannot use realloc because we would have to change the delete in the manager to a free.
char * newbuf = new char[block_size*repeats]; std::unique_ptr<char[]> newbuf = std::unique_ptr<char[]>(new char[block_size*repeats]);
memcpy(newbuf, buf, block_size*(repeats-1)); memcpy(newbuf.get(), buf.get(), block_size*(repeats-1));
delete [] buf; buf = std::move(newbuf);
buf = newbuf;
offset = block_size*(repeats-1); offset = block_size*(repeats-1);
} }
else else
{ {
outbuf = buf; outbuf = std::move(buf);
buf = 0;
if ( found < pos ) if ( found < pos )
{ {
// we have leftovers. copy them into the buffer for the next line // we have leftovers. copy them into the buffer for the next line
buf = new char[block_size]; buf = std::unique_ptr<char[]>(new char[block_size]);
memcpy(buf, outbuf + found + sep_length, pos - found - sep_length); memcpy(buf.get(), outbuf.get() + found + sep_length, pos - found - sep_length);
bufpos = pos - found - sep_length; bufpos = pos - found - sep_length;
} }
@ -586,9 +572,9 @@ bool Raw::DoUpdate()
case MODE_MANUAL: case MODE_MANUAL:
case MODE_STREAM: case MODE_STREAM:
if ( Info().mode == MODE_STREAM && file != 0 ) if ( Info().mode == MODE_STREAM && file )
{ {
clearerr(file); // remove end of file evil bits clearerr(file.get()); // remove end of file evil bits
break; break;
} }
@ -610,7 +596,7 @@ bool Raw::DoUpdate()
if ( stdin_towrite > 0 ) if ( stdin_towrite > 0 )
WriteToStdin(); WriteToStdin();
int64_t length = GetLine(file); int64_t length = GetLine(file.get());
//printf("Read %lld bytes\n", length); //printf("Read %lld bytes\n", length);
if ( length == -3 ) if ( length == -3 )
@ -624,7 +610,7 @@ bool Raw::DoUpdate()
// filter has exactly one text field. convert to it. // filter has exactly one text field. convert to it.
Value* val = new Value(TYPE_STRING, true); Value* val = new Value(TYPE_STRING, true);
val->val.string_val.data = outbuf; val->val.string_val.data = outbuf.release();
val->val.string_val.length = length; val->val.string_val.length = length;
fields[0] = val; fields[0] = val;
@ -636,15 +622,13 @@ bool Raw::DoUpdate()
} }
Put(fields); Put(fields);
outbuf = 0;
} }
if ( use_stderr ) if ( use_stderr )
{ {
for ( ;; ) for ( ;; )
{ {
int64_t length = GetLine(stderrfile); int64_t length = GetLine(stderrfile.get());
//printf("Read stderr %lld bytes\n", length); //printf("Read stderr %lld bytes\n", length);
if ( length == -3 ) if ( length == -3 )
return false; return false;
@ -654,7 +638,7 @@ bool Raw::DoUpdate()
Value** fields = new Value*[2]; Value** fields = new Value*[2];
Value* val = new Value(TYPE_STRING, true); Value* val = new Value(TYPE_STRING, true);
val->val.string_val.data = outbuf; val->val.string_val.data = outbuf.release();
val->val.string_val.length = length; val->val.string_val.length = length;
fields[0] = val; fields[0] = val;
Value* bval = new Value(TYPE_BOOL, true); Value* bval = new Value(TYPE_BOOL, true);
@ -662,8 +646,6 @@ bool Raw::DoUpdate()
fields[1] = bval; fields[1] = bval;
Put(fields); Put(fields);
outbuf = 0;
} }
} }

View file

@ -5,6 +5,7 @@
#include <vector> #include <vector>
#include <pthread.h> #include <pthread.h>
#include <memory>
#include "input/ReaderBackend.h" #include "input/ReaderBackend.h"
@ -16,16 +17,22 @@ namespace input { namespace reader {
*/ */
class Raw : public ReaderBackend { class Raw : public ReaderBackend {
public: public:
Raw(ReaderFrontend* frontend); explicit Raw(ReaderFrontend* frontend);
~Raw(); ~Raw();
// prohibit copying and moving
Raw(const Raw&) = delete;
Raw(Raw&&) = delete;
Raw& operator=(const Raw&) = delete;
Raw& operator=(Raw&&) = delete;
static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); } static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new Raw(frontend); }
protected: protected:
virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields); bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields) override;
virtual void DoClose(); void DoClose() override;
virtual bool DoUpdate(); bool DoUpdate() override;
virtual bool DoHeartbeat(double network_time, double current_time); bool DoHeartbeat(double network_time, double current_time) override;
private: private:
void ClosePipeEnd(int i); void ClosePipeEnd(int i);
@ -40,8 +47,8 @@ private:
void WriteToStdin(); void WriteToStdin();
string fname; // Source with a potential "|" removed. string fname; // Source with a potential "|" removed.
FILE* file; std::unique_ptr<FILE, int(*)(FILE*)> file;
FILE* stderrfile; std::unique_ptr<FILE, int(*)(FILE*)> stderrfile;
bool execute; bool execute;
bool firstrun; bool firstrun;
time_t mtime; time_t mtime;
@ -51,8 +58,8 @@ private:
unsigned int sep_length; // length of the separator unsigned int sep_length; // length of the separator
int bufpos; int bufpos;
char* buf; std::unique_ptr<char[]> buf;
char* outbuf; std::unique_ptr<char[]> outbuf;
int stdin_fileno; int stdin_fileno;
int stdout_fileno; int stdout_fileno;