now the writer supports tables and vectors.

still not tested, but using Log::default_writer=Log::WRITER_SQLITE seems to generate all
the right log-databases, etc.
This commit is contained in:
Bernhard Amann 2012-06-14 15:54:22 -07:00
parent 0a439b6b61
commit c664c40ac2
5 changed files with 145 additions and 19 deletions

View file

@ -2,3 +2,4 @@
@load ./postprocessors
@load ./writers/ascii
@load ./writers/dataseries
@load ./writers/sqlite

View file

@ -0,0 +1,10 @@
##! Interface for the SQLite log writer. Redefinable options are available
##! to tweak the output format of the SQLite reader.
module LogSQLite;
export {
## Separator between set elements.
const set_separator = "," &redef;
}

View file

@ -81,3 +81,8 @@ const extent_size: count;
const dump_schema: bool;
const use_integer_for_time: bool;
const num_threads: count;
module LogSQLite;
const set_separator: string;

View file

@ -23,6 +23,11 @@ using threading::Field;
SQLite::SQLite(WriterFrontend* frontend) : WriterBackend(frontend)
{
set_separator_len = BifConst::LogSQLite::set_separator->Len();
set_separator = new char[set_separator_len];
memcpy(set_separator, BifConst::LogSQLite::set_separator->Bytes(),
set_separator_len);
db = 0;
}
@ -78,12 +83,7 @@ string SQLite::GetTableType(int arg_type, int arg_subtype) {
case TYPE_TABLE:
case TYPE_VECTOR:
// nope, we do simply not support this at the moment. SQLite does not support array types and that would mean
// that this module has to roll everything into a string and an importer has to do the reverse. And that is bad-bad-bad
// for a relational database
InternalError("Table types are not supported by SQLite writer");
//type = "text"; // dirty - but sqlite does not directly support arrays. so - we just roll it into a ","-separated string I guess.
type = "text"; // dirty - but sqlite does not directly support arrays. so - we just roll it into a ","-separated string I guess.
//type = GetTableType(arg_subtype, 0) + "[]";
break;
@ -138,12 +138,6 @@ bool SQLite::DoInit(string path, int num_fields,
replace( fieldname.begin(), fieldname.end(), '.', '_' ); // sqlite does not like "." in row names.
create += fieldname;
if ( field->type == TYPE_TABLE || field->type == TYPE_VECTOR )
{
Error("Sorry, the SQLite writer does not support table and vector types");
return false;
}
string type = GetTableType(field->type, field->subtype);
create += " "+type;
@ -231,6 +225,84 @@ char* SQLite::FS(const char* format, ...) {
return buf;
}
// this one is mainly ripped from Ascii.cc - with some adaptions.
void SQLite::ValToAscii(ODesc* desc, Value* val)
{
if ( ! val->present )
{
assert(false);
}
switch ( val->type ) {
case TYPE_BOOL:
desc->Add(val->val.int_val ? "T" : "F");
break;
case TYPE_INT:
desc->Add(val->val.int_val);
break;
case TYPE_COUNT:
case TYPE_COUNTER:
desc->Add(val->val.uint_val);
break;
case TYPE_PORT:
desc->Add(val->val.port_val.port);
break;
case TYPE_SUBNET:
desc->Add(Render(val->val.subnet_val));
break;
case TYPE_ADDR:
desc->Add(Render(val->val.addr_val));
break;
case TYPE_DOUBLE:
// Rendering via Add() truncates trailing 0s after the
// decimal point. The difference with TIME/INTERVAL is mainly
// to keep the log format consistent.
desc->Add(val->val.double_val);
break;
case TYPE_INTERVAL:
case TYPE_TIME:
// Rendering via Render() keeps trailing 0s after the decimal
// point. The difference with DOUBLEis mainly to keep the log
// format consistent.
desc->Add(Render(val->val.double_val));
break;
case TYPE_ENUM:
case TYPE_STRING:
case TYPE_FILE:
case TYPE_FUNC:
{
int size = val->val.string_val->size();
const char* data = val->val.string_val->data();
if ( size )
desc->AddN(data, size);
break;
}
case TYPE_TABLE:
case TYPE_VECTOR:
assert(false);
// this would mean that we have a table/vector inside a table/vector.
// that is not possible and shoulr have been caught way earlier.
default:
// there may not be any types that we do not know here.
assert(false);
}
}
int SQLite::AddParams(Value* val, int pos)
{
@ -283,8 +355,42 @@ int SQLite::AddParams(Value* val, int pos)
}
case TYPE_TABLE:
{
ODesc desc;
desc.Clear();
desc.AddEscapeSequence(set_separator, set_separator_len);
for ( int j = 0; j < val->val.set_val.size; j++ )
{
if ( j > 0 )
desc.AddRaw(set_separator, set_separator_len);
ValToAscii(&desc, val->val.set_val.vals[j]);
}
return sqlite3_bind_text(st, pos, (const char*) desc.Bytes(), desc.Len(), SQLITE_TRANSIENT);
}
case TYPE_VECTOR:
// we do not support these, fallthrough
{
ODesc desc;
desc.Clear();
desc.AddEscapeSequence(set_separator, set_separator_len);
for ( int j = 0; j < val->val.vector_val.size; j++ )
{
if ( j > 0 )
desc.AddRaw(set_separator, set_separator_len);
ValToAscii(&desc, val->val.vector_val.vals[j]);
}
return sqlite3_bind_text(st, pos, (const char*) desc.Bytes(), desc.Len(), SQLITE_TRANSIENT);
}
default:
Error(Fmt("unsupported field format %d", val->type ));

View file

@ -35,6 +35,7 @@ protected:
private:
bool checkError(int code);
void ValToAscii(ODesc* desc, threading::Value* val);
int AddParams(threading::Value* val, int pos);
string GetTableType(int, int);
@ -42,6 +43,9 @@ private:
sqlite3 *db;
sqlite3_stmt *st;
char* set_separator;
int set_separator_len;
};
}