diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 8fc541ee78..435fbdf30f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -438,6 +438,7 @@ set(bro_SRCS input/readers/Ascii.cc input/readers/Raw.cc input/readers/Benchmark.cc + input/readers/SQLite.cc nb_dns.c digest.h diff --git a/src/input/Manager.cc b/src/input/Manager.cc index b5dfdcb2cd..791035bba7 100644 --- a/src/input/Manager.cc +++ b/src/input/Manager.cc @@ -9,6 +9,10 @@ #include "readers/Raw.h" #include "readers/Benchmark.h" +#ifdef USE_SQLITE +#include "readers/SQLite.h" +#endif + #include "Event.h" #include "EventHandler.h" #include "NetVar.h" @@ -34,6 +38,9 @@ ReaderDefinition input_readers[] = { { BifEnum::Input::READER_ASCII, "Ascii", 0, reader::Ascii::Instantiate }, { BifEnum::Input::READER_RAW, "Raw", 0, reader::Raw::Instantiate }, { BifEnum::Input::READER_BENCHMARK, "Benchmark", 0, reader::Benchmark::Instantiate }, +#ifdef USE_SQLITE + { BifEnum::Input::READER_SQLITE, "SQLite", 0, reader::SQLite::Instantiate }, +#endif // End marker { BifEnum::Input::READER_DEFAULT, "None", 0, (ReaderBackend* (*)(ReaderFrontend* frontend))0 } diff --git a/src/input/readers/SQLite.cc b/src/input/readers/SQLite.cc new file mode 100644 index 0000000000..53a3609e24 --- /dev/null +++ b/src/input/readers/SQLite.cc @@ -0,0 +1,356 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "config.h" + +#ifdef USE_SQLITE + +#include "SQLite.h" +#include "NetVar.h" + +#include +#include + +#include "../../threading/SerialTypes.h" + +#include +#include +#include + +using namespace input::reader; +using threading::Value; +using threading::Field; + + +SQLite::SQLite(ReaderFrontend *frontend) : ReaderBackend(frontend) + { + + } + +SQLite::~SQLite() + { + DoClose(); + + } + +void SQLite::DoClose() + { + if ( db != 0 ) + { + sqlite3_close(db); + db = 0; + } + } + +bool SQLite::checkError( int code ) + { + if ( code != SQLITE_OK && code != SQLITE_DONE ) + { + Error(Fmt("SQLite call failed: %s", sqlite3_errmsg(db))); + return true; + } + + return false; + } + +bool SQLite::DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* arg_fields) + { + started = false; + + string fullpath(info.source); + fullpath.append(".sqlite"); + + string dbname; + map::const_iterator it = info.config.find("dbname"); + if ( it == info.config.end() ) + { + MsgThread::Info(Fmt("dbname configuration option not found. Defaulting to source %s", info.source)); + Error(Fmt("dbname configuration option not found. Defaulting to source %s", info.source)); + dbname = info.source; + } + else + dbname = it->second; + + string query; + it = info.config.find("query"); + if ( it == info.config.end() ) + { + Error(Fmt("No query specified when setting up SQLite data source. Aborting.", info.source)); + return false; + } + else + query = it->second; + + if ( checkError(sqlite3_open_v2( + fullpath.c_str(), + &db, + SQLITE_OPEN_READWRITE | + SQLITE_OPEN_FULLMUTEX // perhaps change to nomutex + , + NULL)) ) + return false; + + + num_fields = arg_num_fields; + fields = arg_fields; + + // create the prepared select statement that we will re-use forever... + if ( checkError(sqlite3_prepare_v2( db, query.c_str(), query.size()+1, &st, NULL )) ) + { + return false; + } + + + DoUpdate(); + + return true; + } + +Value* SQLite::EntryToVal(sqlite3_stmt *st, const threading::Field *field, int pos) + { + + if ( sqlite3_column_type(st, pos ) == SQLITE_NULL ) + return new Value(field->type, false); + + Value* val = new Value(field->type, true); + + switch ( field->type ) { + case TYPE_ENUM: + case TYPE_STRING: + { + const char *text = (const char*) sqlite3_column_text(st, pos); + int length = sqlite3_column_bytes(st, pos); + + char *out = new char[length]; + memcpy(out, text, length); + + val->val.string_val.length = length; + val->val.string_val.data = out; + break; + } + + case TYPE_BOOL: + { + if ( sqlite3_column_type(st, pos) != SQLITE_INTEGER ) { + Error("Invalid data type for boolean - expected Integer"); + return 0; + } + + int res = sqlite3_column_int(st, pos); + + if ( res == 0 || res == 1 ) + val->val.int_val = res; + else + { + Error(Fmt("Invalid value for boolean: %d", res)); + return 0; + } + break; + } + + case TYPE_INT: + val->val.int_val = sqlite3_column_int64(st, pos); + printf("Value: %d\n", val->val.int_val); + break; + + case TYPE_DOUBLE: + case TYPE_TIME: + case TYPE_INTERVAL: + val->val.double_val = sqlite3_column_double(st, pos); + break; + + case TYPE_COUNT: + case TYPE_COUNTER: + val->val.uint_val = sqlite3_column_int64(st, pos); + break; + + case TYPE_PORT: + val->val.port_val.port = sqlite3_column_int(st, pos); + val->val.port_val.proto = TRANSPORT_UNKNOWN; + break; + + case TYPE_SUBNET: { + const char *text = (const char*) sqlite3_column_text(st, pos); + string s(text, sqlite3_column_bytes(st, pos)); + int pos = s.find("/"); + int width = atoi(s.substr(pos+1).c_str()); + string addr = s.substr(0, pos); + + val->val.subnet_val.prefix = StringToAddr(addr); + val->val.subnet_val.length = width; + break; + + } + case TYPE_ADDR: + { + const char *text = (const char*) sqlite3_column_text(st, pos); + string s(text, sqlite3_column_bytes(st, pos)); + val->val.addr_val = StringToAddr(s); + break; + } + + case TYPE_TABLE: + case TYPE_VECTOR: + assert(false); + /* + // First - common initialization + // Then - initialization for table. + // Then - initialization for vector. + // Then - common stuff + { + // how many entries do we have... + unsigned int length = 1; + for ( unsigned int i = 0; i < s.size(); i++ ) + if ( s[i] == ',') length++; + + unsigned int pos = 0; + + if ( s.compare(empty_field) == 0 ) + length = 0; + + + Value** lvals = new Value* [length]; + + if ( field->type == TYPE_TABLE ) + { + val->val.set_val.vals = lvals; + val->val.set_val.size = length; + } + else if ( field->type == TYPE_VECTOR ) + { + val->val.vector_val.vals = lvals; + val->val.vector_val.size = length; + else + assert(false); + + if ( length == 0 ) + break; //empty + + istringstream splitstream(s); + while ( splitstream ) + { + string element; + + if ( !getline(splitstream, element, ',') ) + break; + + if ( pos >= length ) + { + Error(Fmt("Internal error while parsing set. pos %d >= length %d. Element: %s", pos, length, element.c_str())); + break; + } + + Field* newfield = new Field(*field); + newfield->type = field->subtype; + Value* newval = EntryToVal(element, newfield); + delete(newfield); + if ( newval == 0 ) + { + Error("Error while reading set"); + return 0; + } + lvals[pos] = newval; + + pos++; + + } + + + if ( pos != length ) + { + Error("Internal error while parsing set: did not find all elements"); + return 0; + } + + break; + } + */ + + + default: + Error(Fmt("unsupported field format %d", field->type)); + return 0; + } + + return val; + + } + +bool SQLite::DoUpdate() + { + + int numcolumns = sqlite3_column_count(st); + + /* This can happen legitimately I think... + if ( numcolumns != num_fields ) + { + Error(Fmt("SQLite query returned %d results, but input framework expected %d. Aborting", numcolumns, num_fields)); + return false; + } + */ + + int *mapping = new int [num_fields]; + // first set them all to -1 + for ( unsigned int i = 0; i < num_fields; ++i ) { + mapping[i] = -1; + } + + for ( unsigned int i = 0; i < numcolumns; ++i ) + { + const char *name = sqlite3_column_name(st, i); + + for ( unsigned j = 0; j < num_fields; j++ ) { + if ( strcmp(fields[j]->name, name) == 0 ) { + if ( mapping[j] != -1 ) + { + Error(Fmt("SQLite statement returns several columns with name %s! Cannot decide which to choose, aborting", name)); + return false; + } + + mapping[j] = i; + break; + } + } + + } + + for ( unsigned int i = 0; i < num_fields; ++i ) { + if ( mapping[i] == -1 ) + { + Error(Fmt("Required field %s not found after SQLite statement", fields[i]->name)); + return false; + } + } + + int errorcode; + while ( ( errorcode = sqlite3_step(st)) == SQLITE_ROW ) + { + Value** ofields = new Value*[num_fields]; + + for ( unsigned int j = 0; j < num_fields; ++j) + { + + ofields[j] = EntryToVal(st, fields[j], mapping[j]); + if ( ofields[j] == 0 ) { + return false; + } + + } + + SendEntry(ofields); + } + + if ( checkError(errorcode) ) // check the last error code returned by sqlite + return false; + + + EndCurrentSend(); + + delete (mapping); + + if ( checkError(sqlite3_reset(st)) ) + return false; + + return true; + } + +#endif /* USE_SQLITE */ diff --git a/src/input/readers/SQLite.h b/src/input/readers/SQLite.h new file mode 100644 index 0000000000..5ed24ce393 --- /dev/null +++ b/src/input/readers/SQLite.h @@ -0,0 +1,60 @@ +// See the file "COPYING" in the main distribution directory for copyright. +// + +#ifndef INPUT_READERS_POSTGRES_H +#define INPUT_READERS_POSTGRES_H + +#include "config.h" + +#ifdef USE_SQLITE + +#include +#include + +#include "../ReaderBackend.h" +#include "sqlite3.h" + +namespace input { namespace reader { + +class SQLite : public ReaderBackend { +public: + SQLite(ReaderFrontend* frontend); + ~SQLite(); + + static ReaderBackend* Instantiate(ReaderFrontend* frontend) { return new SQLite(frontend); } + +protected: + virtual bool DoInit(const ReaderInfo& info, int arg_num_fields, const threading::Field* const* fields); + + virtual void DoClose(); + + virtual bool DoUpdate(); + + virtual bool DoHeartbeat(double network_time, double current_time) { return true; } +private: + bool checkError(int code); + + unsigned int num_fields; + + const threading::Field* const * fields; // raw mapping + + threading::Value* EntryToVal(sqlite3_stmt *st, const threading::Field *field, int pos); + + int mode; + + bool started; + string query; + + sqlite3 *db; + sqlite3_stmt *st; + +}; + + +} +} + +#endif /* USE_SQLITE */ + +#endif /* INPUT_READERS_POSTGRES_H */ + diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index bea19018ee..2a7ef63295 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -26,8 +26,6 @@ #include "writers/DataSeries.h" #endif -#define USE_SQLITE 1 - #ifdef USE_SQLITE #include "writers/SQLite.h" #endif diff --git a/src/logging/writers/SQLite.cc b/src/logging/writers/SQLite.cc index 9c78636867..1028710fc0 100644 --- a/src/logging/writers/SQLite.cc +++ b/src/logging/writers/SQLite.cc @@ -1,10 +1,11 @@ // See the file "COPYING" in the main distribution directory for copyright. -#define USE_SQLITE 1 -#ifdef USE_SQLITE #include "config.h" + +#ifdef USE_SQLITE + #include #include @@ -100,7 +101,6 @@ bool SQLite::checkError( int code ) { if ( code != SQLITE_OK && code != SQLITE_DONE ) { - printf("SQLite call failed: %s\n", sqlite3_errmsg(db)); Error(Fmt("SQLite call failed: %s", sqlite3_errmsg(db))); return true; } diff --git a/src/logging/writers/SQLite.h b/src/logging/writers/SQLite.h index 8a3baaec9e..0c8addc9da 100644 --- a/src/logging/writers/SQLite.h +++ b/src/logging/writers/SQLite.h @@ -2,6 +2,7 @@ // // Log writer for SQLITE logs. + #ifndef LOGGING_WRITER_SQLITE_H #define LOGGING_WRITER_SQLITE_H diff --git a/src/types.bif b/src/types.bif index 3def034219..4400c78b52 100644 --- a/src/types.bif +++ b/src/types.bif @@ -189,6 +189,7 @@ enum Reader %{ READER_ASCII, READER_RAW, READER_BENCHMARK, + READER_SQLITE, %} enum Event %{