Moving the ASCII writer over to use UNIX I/O rather than stdio.

This commit is contained in:
Robin Sommer 2012-07-17 19:02:36 -07:00
parent f6b883bafc
commit e90918aa50
4 changed files with 24 additions and 24 deletions

View file

@ -15,7 +15,7 @@ using threading::Field;
Ascii::Ascii(WriterFrontend* frontend) : WriterBackend(frontend) Ascii::Ascii(WriterFrontend* frontend) : WriterBackend(frontend)
{ {
file = 0; fd = 0;
ascii_done = false; ascii_done = false;
output_to_stdout = BifConst::LogAscii::output_to_stdout; output_to_stdout = BifConst::LogAscii::output_to_stdout;
@ -53,9 +53,8 @@ Ascii::Ascii(WriterFrontend* frontend) : WriterBackend(frontend)
Ascii::~Ascii() Ascii::~Ascii()
{ {
// Normally, the file will be closed here already via the Finish() // Normally, the file will be closed here already via the Finish()
// message. But when we terminate abnormally, we may still have it // message. But when we terminate abnormally, we may still have it open.
// open. if ( fd )
if ( file )
CloseFile(0); CloseFile(0);
delete [] separator; delete [] separator;
@ -70,23 +69,25 @@ bool Ascii::WriteHeaderField(const string& key, const string& val)
string str = string(meta_prefix, meta_prefix_len) + string str = string(meta_prefix, meta_prefix_len) +
key + string(separator, separator_len) + val + "\n"; key + string(separator, separator_len) + val + "\n";
return (fwrite(str.c_str(), str.length(), 1, file) == 1); return safe_write(fd, str.c_str(), str.length());
} }
void Ascii::CloseFile(double t) void Ascii::CloseFile(double t)
{ {
if ( ! file ) if ( ! fd)
return; return;
if ( include_meta ) if ( include_meta )
WriteHeaderField("end", t ? Timestamp(t) : "<abnormal termination>"); WriteHeaderField("end", t ? Timestamp(t) : "<abnormal termination>");
fclose(file); close(fd);
file = 0; fd = 0;
} }
bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const * fields) bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const * fields)
{ {
assert(! fd);
string path = info.path; string path = info.path;
if ( output_to_stdout ) if ( output_to_stdout )
@ -94,11 +95,13 @@ bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const *
fname = IsSpecial(path) ? path : path + "." + LogExt(); fname = IsSpecial(path) ? path : path + "." + LogExt();
if ( ! (file = fopen(fname.c_str(), "w")) ) fd = open(fname.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0777);
if ( fd < 0 )
{ {
Error(Fmt("cannot open %s: %s", fname.c_str(), Error(Fmt("cannot open %s: %s", fname.c_str(),
Strerror(errno))); Strerror(errno)));
fd = 0;
return false; return false;
} }
@ -112,7 +115,7 @@ bool Ascii::DoInit(const WriterInfo& info, int num_fields, const Field* const *
+ get_escaped_string(string(separator, separator_len), false) + get_escaped_string(string(separator, separator_len), false)
+ "\n"; + "\n";
if( fwrite(str.c_str(), str.length(), 1, file) != 1 ) if ( ! safe_write(fd, str.c_str(), str.length()) )
goto write_error; goto write_error;
if ( ! (WriteHeaderField("set_separator", get_escaped_string( if ( ! (WriteHeaderField("set_separator", get_escaped_string(
@ -151,7 +154,7 @@ write_error:
bool Ascii::DoFlush(double network_time) bool Ascii::DoFlush(double network_time)
{ {
fflush(file); fsync(fd);
return true; return true;
} }
@ -318,7 +321,7 @@ bool Ascii::DoWriteOne(ODesc* desc, Value* val, const Field* field)
bool Ascii::DoWrite(int num_fields, const Field* const * fields, bool Ascii::DoWrite(int num_fields, const Field* const * fields,
Value** vals) Value** vals)
{ {
if ( ! file ) if ( ! fd )
DoInit(Info(), NumFields(), Fields()); DoInit(Info(), NumFields(), Fields());
desc.Clear(); desc.Clear();
@ -337,24 +340,23 @@ bool Ascii::DoWrite(int num_fields, const Field* const * fields,
const char* bytes = (const char*)desc.Bytes(); const char* bytes = (const char*)desc.Bytes();
int len = desc.Len(); int len = desc.Len();
// Make sure the line doesn't look like meta information.
if ( strncmp(bytes, meta_prefix, meta_prefix_len) == 0 ) if ( strncmp(bytes, meta_prefix, meta_prefix_len) == 0 )
{ {
// It would so escape the first character. // It would so escape the first character.
char buf[16]; char buf[16];
snprintf(buf, sizeof(buf), "\\x%02x", bytes[0]); snprintf(buf, sizeof(buf), "\\x%02x", bytes[0]);
if ( fwrite(buf, strlen(buf), 1, file) != 1 ) if ( ! safe_write(fd, buf, strlen(buf)) )
goto write_error; goto write_error;
++bytes; ++bytes;
--len; --len;
} }
if ( fwrite(bytes, len, 1, file) != 1 ) if ( ! safe_write(fd, bytes, len) )
goto write_error; goto write_error;
if ( IsBuf() ) if ( IsBuf() )
fflush(file); fsync(fd);
return true; return true;
@ -366,7 +368,7 @@ write_error:
bool Ascii::DoRotate(string rotated_path, double open, double close, bool terminating) bool Ascii::DoRotate(string rotated_path, double open, double close, bool terminating)
{ {
// Don't rotate special files or if there's not one currently open. // Don't rotate special files or if there's not one currently open.
if ( ! file || IsSpecial(Info().path) ) if ( ! fd || IsSpecial(Info().path) )
return true; return true;
CloseFile(close); CloseFile(close);

View file

@ -37,7 +37,7 @@ private:
void CloseFile(double t); void CloseFile(double t);
string Timestamp(double t); string Timestamp(double t);
FILE* file; int fd;
string fname; string fname;
ODesc desc; ODesc desc;
bool ascii_done; bool ascii_done;

View file

@ -87,18 +87,17 @@ const char* BasicThread::Strerror(int err)
void BasicThread::Start() void BasicThread::Start()
{ {
if ( started ) if ( started )
return; return;
started = true;
int err = pthread_create(&pthread, 0, BasicThread::launcher, this); int err = pthread_create(&pthread, 0, BasicThread::launcher, this);
if ( err != 0 ) if ( err != 0 )
reporter->FatalError("Cannot create thread %s:%s", name.c_str(), Strerror(err)); reporter->FatalError("Cannot create thread %s:%s", name.c_str(), Strerror(err));
DBG_LOG(DBG_THREADING, "Started thread %s", name.c_str()); DBG_LOG(DBG_THREADING, "Started thread %s", name.c_str());
started = true;
OnStart(); OnStart();
} }

View file

@ -1292,7 +1292,6 @@ uint64 calculate_unique_id(size_t pool)
bool safe_write(int fd, const char* data, int len) bool safe_write(int fd, const char* data, int len)
{ {
return true;
while ( len > 0 ) while ( len > 0 )
{ {
int n = write(fd, data, len); int n = write(fd, data, len);