add streaming reads & automatic re-reading of files to ascii reader.

completely untested, but compiles & old tests still work
This commit is contained in:
Bernhard Amann 2012-02-16 15:40:07 -08:00
parent 91943c2655
commit d21a450f36
3 changed files with 57 additions and 21 deletions

View file

@ -789,13 +789,6 @@ int Manager::SendEntryTable(const ReaderFrontend* reader, const int id, const Va
InputHash *h = filter->lastDict->Lookup(idxhash);
if ( h != 0 ) {
// seen before
valhash->Hash();
h->valhash->Hash();
if ( filter->num_val_fields == 0 || h->valhash->Hash() == valhash->Hash() ) {
// ok, exact duplicate
filter->lastDict->Remove(idxhash);

View file

@ -10,6 +10,11 @@
#define MANUAL 0
#define REREAD 1
#define STREAM 2
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
using namespace input::reader;
using threading::Value;
@ -84,6 +89,7 @@ bool Ascii::DoInit(string path, int arg_mode)
started = false;
fname = path;
mode = arg_mode;
mtime = 0;
file = new ifstream(path.c_str());
if ( !file->is_open() ) {
@ -91,7 +97,7 @@ bool Ascii::DoInit(string path, int arg_mode)
return false;
}
if ( ( mode != MANUAL ) && (mode != REREAD) ) {
if ( ( mode != MANUAL ) && (mode != REREAD) && ( mode != STREAM ) ) {
Error(Fmt("Unsupported read mode %d for source %s", mode, path.c_str()));
return false;
}
@ -402,23 +408,58 @@ Value* Ascii::EntryToVal(string s, FieldMapping field) {
// read the entire file and send appropriate thingies back to InputMgr
bool Ascii::DoUpdate() {
// dirty, fix me. (well, apparently after trying seeking, etc - this is not that bad)
if ( file && file->is_open() ) {
file->close();
}
file = new ifstream(fname.c_str());
if ( !file->is_open() ) {
Error(Fmt("cannot open %s", fname.c_str()));
return false;
switch ( mode ) {
case REREAD:
// check if the file has changed
struct stat sb;
if ( stat(fname.c_str(), &sb) == -1 ) {
Error(Fmt("Could not get stat for %s", fname.c_str()));
return false;
}
if ( sb.st_mtime <= mtime ) {
// no change
return true;
}
mtime = sb.st_mtime;
// file changed. reread.
// fallthrough
case MANUAL:
case STREAM:
// dirty, fix me. (well, apparently after trying seeking, etc - this is not that bad)
if ( file && file->is_open() ) {
if ( mode == STREAM ) {
file->clear(); // remove end of file evil bits
break;
}
file->close();
}
file = new ifstream(fname.c_str());
if ( !file->is_open() ) {
Error(Fmt("cannot open %s", fname.c_str()));
return false;
}
if ( ReadHeader() == false ) {
return false;
}
break;
default:
assert(false);
}
//
// file->seekg(0, ios::beg); // do not forget clear.
if ( ReadHeader() == false ) {
return false;
}
string line;
while ( GetLine(line ) ) {
@ -504,8 +545,9 @@ bool Ascii::DoHeartbeat(double network_time, double current_time)
// yay, we do nothing :)
break;
case REREAD:
case STREAM:
DoUpdate();
break;
default:
assert(false);
}

View file

@ -91,6 +91,7 @@ private:
int mode;
bool started;
time_t mtime;
};