make sqlite-writer more stable.

This actually looks quite good...
This commit is contained in:
Bernhard Amann 2013-01-15 11:48:47 -08:00
parent 96aa8776d3
commit d843297a97

View file

@ -1,20 +1,16 @@
// See the file "COPYING" in the main distribution directory for copyright. // See the file "COPYING" in the main distribution directory for copyright.
#include "config.h" #include "config.h"
#ifdef USE_SQLITE #ifdef USE_SQLITE
#include <string> #include <string>
#include <errno.h> #include <errno.h>
#include <vector>
#include "../../NetVar.h" #include "../../NetVar.h"
#include "../../threading/SerialTypes.h" #include "../../threading/SerialTypes.h"
#include <vector>
#include "SQLite.h" #include "SQLite.h"
using namespace logging; using namespace logging;
@ -24,19 +20,16 @@ using threading::Field;
SQLite::SQLite(WriterFrontend* frontend) : WriterBackend(frontend) SQLite::SQLite(WriterFrontend* frontend) : WriterBackend(frontend)
{ {
set_separator.assign( set_separator.assign(
(const char*) BifConst::LogSQLite::set_separator->Bytes(), (const char*) BifConst::LogSQLite::set_separator->Bytes(),
BifConst::LogAscii::set_separator->Len() BifConst::LogAscii::set_separator->Len()
); );
unset_field.assign( unset_field.assign(
(const char*) BifConst::LogSQLite::unset_field->Bytes(), (const char*) BifConst::LogSQLite::unset_field->Bytes(),
BifConst::LogAscii::unset_field->Len() BifConst::LogAscii::unset_field->Len()
); );
db = 0; db = 0;
io = new AsciiInputOutput(this, AsciiInputOutput::SeparatorInfo(set_separator, unset_field)); io = new AsciiInputOutput(this, AsciiInputOutput::SeparatorInfo(set_separator, unset_field));
@ -54,11 +47,9 @@ SQLite::~SQLite()
} }
string SQLite::GetTableType(int arg_type, int arg_subtype) { string SQLite::GetTableType(int arg_type, int arg_subtype) {
string type; string type;
switch ( arg_type ) { switch ( arg_type ) {
case TYPE_BOOL: case TYPE_BOOL:
type = "boolean"; type = "boolean";
break; break;
@ -66,16 +57,10 @@ string SQLite::GetTableType(int arg_type, int arg_subtype) {
case TYPE_INT: case TYPE_INT:
case TYPE_COUNT: case TYPE_COUNT:
case TYPE_COUNTER: case TYPE_COUNTER:
case TYPE_PORT: case TYPE_PORT: // note that we do not save the protocol at the moment. Just like in the case of the ascii-writer
type = "integer"; type = "integer";
break; break;
/*
case TYPE_PORT:
type = "VARCHAR(10)";
break;
*/
case TYPE_SUBNET: case TYPE_SUBNET:
case TYPE_ADDR: case TYPE_ADDR:
type = "text"; // sqlite3 does not have a type for internet addresses type = "text"; // sqlite3 does not have a type for internet addresses
@ -96,19 +81,18 @@ string SQLite::GetTableType(int arg_type, int arg_subtype) {
case TYPE_TABLE: case TYPE_TABLE:
case TYPE_VECTOR: case TYPE_VECTOR:
type = "text"; // dirty - but sqlite does not directly support arrays. so - we just roll it into a ","-separated string I guess. type = "text"; // dirty - but sqlite does not directly support arrays. so - we just roll it into a ","-separated string.
//type = GetTableType(arg_subtype, 0) + "[]";
break; break;
default: default:
Error(Fmt("unsupported field format %d ", arg_type)); Error(Fmt("unsupported field format %d ", arg_type));
return ""; return ""; // not the cleanest way to abort. But sqlite will complain on create table...
} }
return type; return type;
} }
// returns true true in case of error
bool SQLite::checkError( int code ) bool SQLite::checkError( int code )
{ {
if ( code != SQLITE_OK && code != SQLITE_DONE ) if ( code != SQLITE_OK && code != SQLITE_DONE )
@ -123,7 +107,11 @@ bool SQLite::checkError( int code )
bool SQLite::DoInit(const WriterInfo& info, int num_fields, bool SQLite::DoInit(const WriterInfo& info, int num_fields,
const Field* const * fields) const Field* const * fields)
{ {
if ( sqlite3_threadsafe() == 0 ) {
Error("SQLite reports that it is not threadsafe. Bro needs a threadsafe version of SQLite. Aborting");
return false;
}
string fullpath(info.path); string fullpath(info.path);
fullpath.append(".sqlite"); fullpath.append(".sqlite");
string dbname; string dbname;
@ -157,9 +145,15 @@ bool SQLite::DoInit(const WriterInfo& info, int num_fields,
if ( i != 0 ) if ( i != 0 )
create += ",\n"; create += ",\n";
string fieldname = fields[i]->name; // sadly sqlite3 has no other method for escaping stuff. That I know of.
replace( fieldname.begin(), fieldname.end(), '.', '_' ); // sqlite does not like "." in row names. char* fieldname = sqlite3_mprintf("%Q", fields[i]->name);
if ( fieldname == 0 )
{
InternalError("Could not malloc memory");
return false;
}
create += fieldname; create += fieldname;
sqlite3_free(fieldname);
string type = GetTableType(field->type, field->subtype); string type = GetTableType(field->type, field->subtype);
@ -167,19 +161,15 @@ bool SQLite::DoInit(const WriterInfo& info, int num_fields,
/* if ( !field->optional ) { /* if ( !field->optional ) {
create += " NOT NULL"; create += " NOT NULL";
} */ } */
} }
create += "\n);"; create += "\n);";
//printf("Create: %s\n", create.c_str());
{ {
char *errorMsg = 0; char *errorMsg = 0;
int res = sqlite3_exec(db, create.c_str(), NULL, NULL, &errorMsg); int res = sqlite3_exec(db, create.c_str(), NULL, NULL, &errorMsg);
if ( res != SQLITE_OK ) if ( res != SQLITE_OK )
{ {
//printf("Error executing table creation statement: %s", errorMsg);
Error(Fmt("Error executing table creation statement: %s", errorMsg)); Error(Fmt("Error executing table creation statement: %s", errorMsg));
sqlite3_free(errorMsg); sqlite3_free(errorMsg);
return false; return false;
@ -206,16 +196,20 @@ bool SQLite::DoInit(const WriterInfo& info, int num_fields,
insert += "?"; insert += "?";
string fieldname = fields[i]->name; char* fieldname = sqlite3_mprintf("%Q", fields[i]->name);
replace( fieldname.begin(), fieldname.end(), '.', '_' ); // sqlite does not like "." in row names. printf("Fieldname: %s\n", fieldname);
names += fieldname; if ( fieldname == 0 )
{
InternalError("Could not malloc memory");
return false;
}
names.append(fieldname);
sqlite3_free(fieldname);
} }
insert += ");"; insert += ");";
names += ") "; names += ") ";
insert = names + insert; insert = names + insert;
//printf("Prepared insert: %s\n\n", insert.c_str());
if ( checkError(sqlite3_prepare_v2( db, insert.c_str(), insert.size()+1, &st, NULL )) ) if ( checkError(sqlite3_prepare_v2( db, insert.c_str(), insert.size()+1, &st, NULL )) )
return false; return false;
@ -242,12 +236,9 @@ int SQLite::AddParams(Value* val, int pos)
{ {
if ( ! val->present ) if ( ! val->present )
{ return sqlite3_bind_null(st, pos);
return sqlite3_bind_null(st, pos);
}
switch ( val->type ) { switch ( val->type ) {
case TYPE_BOOL: case TYPE_BOOL:
return sqlite3_bind_int(st, pos, val->val.int_val ? 1 : 0 ); return sqlite3_bind_int(st, pos, val->val.int_val ? 1 : 0 );
@ -263,13 +254,13 @@ int SQLite::AddParams(Value* val, int pos)
case TYPE_SUBNET: case TYPE_SUBNET:
{ {
string out = io->Render(val->val.subnet_val).c_str(); string out = io->Render(val->val.subnet_val);
return sqlite3_bind_text(st, pos, out.data(), out.size(), SQLITE_TRANSIENT); return sqlite3_bind_text(st, pos, out.data(), out.size(), SQLITE_TRANSIENT);
} }
case TYPE_ADDR: case TYPE_ADDR:
{ {
string out = io->Render(val->val.addr_val).c_str(); string out = io->Render(val->val.addr_val);
return sqlite3_bind_text(st, pos, out.data(), out.size(), SQLITE_TRANSIENT); return sqlite3_bind_text(st, pos, out.data(), out.size(), SQLITE_TRANSIENT);
} }