diff --git a/CMakeLists.txt b/CMakeLists.txt index 28b702ab01..404cdfeeb5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -122,6 +122,14 @@ if (LINTEL_FOUND AND DATASERIES_FOUND AND LIBXML2_FOUND) list(APPEND OPTLIBS ${LibXML2_LIBRARIES}) endif() +set(USE_LIBCURL false) +find_package(CURL) +if (CURL_FOUND) + set(USE_LIBCURL true) + include_directories(BEFORE ${CURL_INCLUDE_DIR}) + list(APPEND OPTLIBS ${CURL_LIBRARIES}) +endif() + if (ENABLE_PERFTOOLS_DEBUG) # Just a no op to prevent CMake from complaining about manually-specified # ENABLE_PERFTOOLS_DEBUG not being used if google perftools weren't found @@ -209,11 +217,13 @@ message( "\nBroccoli: ${INSTALL_BROCCOLI}" "\nBroctl: ${INSTALL_BROCTL}" "\nAux. Tools: ${INSTALL_AUX_TOOLS}" + "\nElasticSearch: ${INSTALL_ELASTICSEARCH}" "\n" "\nGeoIP: ${USE_GEOIP}" "\nGoogle perftools: ${USE_PERFTOOLS}" "\n debugging: ${USE_PERFTOOLS_DEBUG}" "\nDataSeries: ${USE_DATASERIES}" + "\nlibCURL: ${USE_LIBCURL}" "\n" "\n================================================================\n" ) diff --git a/config.h.in b/config.h.in index c2cb3ec1dc..66121cefbf 100644 --- a/config.h.in +++ b/config.h.in @@ -117,6 +117,9 @@ /* Use the DataSeries writer. */ #cmakedefine USE_DATASERIES +/* Build the ElasticSearch writer. */ +#cmakedefine INSTALL_ELASTICSEARCH + /* Version number of package */ #define VERSION "@VERSION@" diff --git a/configure b/configure index 3258d4abfc..7ea5613a6d 100755 --- a/configure +++ b/configure @@ -35,6 +35,7 @@ Usage: $0 [OPTION]... [VAR=VALUE]... --disable-auxtools don't build or install auxiliary tools --disable-python don't try to build python bindings for broccoli --disable-ruby don't try to build ruby bindings for broccoli + --enable-elasticsearch build the elasticsearch writer Required Packages in Non-Standard Locations: --with-openssl=PATH path to OpenSSL install root @@ -98,6 +99,7 @@ append_cache_entry BRO_SCRIPT_INSTALL_PATH STRING $prefix/share/bro append_cache_entry BRO_ETC_INSTALL_DIR PATH $prefix/etc append_cache_entry ENABLE_DEBUG BOOL false append_cache_entry ENABLE_PERFTOOLS_DEBUG BOOL false +append_cache_entry INSTALL_ELASTICSEARCH BOOL false append_cache_entry BinPAC_SKIP_INSTALL BOOL true append_cache_entry BUILD_SHARED_LIBS BOOL true append_cache_entry INSTALL_AUX_TOOLS BOOL true @@ -156,6 +158,9 @@ while [ $# -ne 0 ]; do --disable-auxtools) append_cache_entry INSTALL_AUX_TOOLS BOOL false ;; + --enable-elasticsearch) + append_cache_entry INSTALL_ELASTICSEARCH BOOL true + ;; --disable-python) append_cache_entry DISABLE_PYTHON_BINDINGS BOOL true ;; diff --git a/scripts/base/frameworks/logging/__load__.bro b/scripts/base/frameworks/logging/__load__.bro index 17e03e2ef7..7dafc45397 100644 --- a/scripts/base/frameworks/logging/__load__.bro +++ b/scripts/base/frameworks/logging/__load__.bro @@ -2,3 +2,4 @@ @load ./postprocessors @load ./writers/ascii @load ./writers/dataseries +@load ./writers/elasticsearch \ No newline at end of file diff --git a/scripts/base/frameworks/logging/writers/elasticsearch.bro b/scripts/base/frameworks/logging/writers/elasticsearch.bro new file mode 100644 index 0000000000..82dbcc43d4 --- /dev/null +++ b/scripts/base/frameworks/logging/writers/elasticsearch.bro @@ -0,0 +1,25 @@ +module LogElasticSearch; + +export { + ## Name of the ES cluster + const cluster_name = "elasticsearch" &redef; + + ## ES Server + const server_host = "127.0.0.1" &redef; + + ## ES Port + const server_port = 9200 &redef; + + ## Name of the ES index + const index_name = "bro-logs" &redef; + + ## The ES type prefix comes before the name of the related log. + ## e.g. prefix = "bro_" would create types of bro_dns, bro_software, etc. + const type_prefix = "" &redef; + + ## The batch size is the number of messages that will be queued up before + ## they are sent to be bulk indexed. + ## Note: this is mainly a memory usage parameter. + const batch_size = 10000 &redef; +} + diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6a68d1e7c5..fbbb01fd22 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -419,6 +419,7 @@ set(bro_SRCS logging/WriterFrontend.cc logging/writers/Ascii.cc logging/writers/DataSeries.cc + logging/writers/ElasticSearch.cc logging/writers/None.cc input/Manager.cc diff --git a/src/logging.bif b/src/logging.bif index efc6ed0b4b..308ea78b7a 100644 --- a/src/logging.bif +++ b/src/logging.bif @@ -81,3 +81,14 @@ const extent_size: count; const dump_schema: bool; const use_integer_for_time: bool; const num_threads: count; + +# Options for the ElasticSearch writer. + +module LogElasticSearch; + +const cluster_name: string; +const server_host: string; +const server_port: count; +const index_name: string; +const type_prefix: string; +const batch_size: count; diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index d338ac97f8..ddfed0f70f 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -17,7 +17,7 @@ #include "writers/Ascii.h" #include "writers/None.h" -#ifdef USE_ELASTICSEARCH +#ifdef INSTALL_ELASTICSEARCH #include "writers/ElasticSearch.h" #endif @@ -40,8 +40,8 @@ WriterDefinition log_writers[] = { { BifEnum::Log::WRITER_NONE, "None", 0, writer::None::Instantiate }, { BifEnum::Log::WRITER_ASCII, "Ascii", 0, writer::Ascii::Instantiate }, -#ifdef USE_ELASTICSEARCH - { BifEnum::Log::WRITER_ASCII, "ElasticSearch", 0, writer::ElasticSearch::Instantiate }, +#ifdef INSTALL_ELASTICSEARCH + { BifEnum::Log::WRITER_ELASTICSEARCH, "ElasticSearch", 0, writer::ElasticSearch::Instantiate }, #endif #ifdef USE_DATASERIES diff --git a/src/logging/writers/ElasticSearch.cc b/src/logging/writers/ElasticSearch.cc index eb83f26542..61f3734f87 100644 --- a/src/logging/writers/ElasticSearch.cc +++ b/src/logging/writers/ElasticSearch.cc @@ -2,7 +2,7 @@ #include "config.h" -#ifdef USE_ELASTICSEARCH +#ifdef INSTALL_ELASTICSEARCH #include #include @@ -12,6 +12,9 @@ #include "NetVar.h" #include "threading/SerialTypes.h" +#include +#include + #include "ElasticSearch.h" using namespace logging; @@ -24,28 +27,35 @@ using threading::Field; ElasticSearch::ElasticSearch(WriterFrontend* frontend) : WriterBackend(frontend) { cluster_name_len = BifConst::LogElasticSearch::cluster_name->Len(); - cluster_name = new char[cluster_name_len]; + cluster_name = new char[cluster_name_len + 1]; memcpy(cluster_name, BifConst::LogElasticSearch::cluster_name->Bytes(), cluster_name_len); + cluster_name[cluster_name_len] = 0; server_host_len = BifConst::LogElasticSearch::server_host->Len(); - server_host = new char[server_host_len]; + server_host = new char[server_host_len + 1]; memcpy(server_host, BifConst::LogElasticSearch::server_host->Bytes(), server_host_len); + server_host[server_host_len] = 0; index_name_len = BifConst::LogElasticSearch::index_name->Len(); - index_name = new char[index_name_len]; + index_name = new char[index_name_len + 1]; memcpy(index_name, BifConst::LogElasticSearch::index_name->Bytes(), index_name_len); + index_name[index_name_len] = 0; type_prefix_len = BifConst::LogElasticSearch::type_prefix->Len(); - type_prefix = new char[type_prefix_len]; + type_prefix = new char[type_prefix_len + 1]; memcpy(type_prefix, BifConst::LogElasticSearch::type_prefix->Bytes(), type_prefix_len); + type_prefix[type_prefix_len] = 0; server_port = BifConst::LogElasticSearch::server_port; batch_size = BifConst::LogElasticSearch::batch_size; - buffer = safe_malloc(MAX_EVENT_SIZE * batch_size); + buffer = (char *)safe_malloc(MAX_EVENT_SIZE * batch_size); current_offset = 0; - buffer[current_offset] = "\0"; + buffer[current_offset] = 0; counter = 0; + + curl_handle = HTTPSetup(); + curl_result = new char[1024]; } ElasticSearch::~ElasticSearch() @@ -74,115 +84,129 @@ bool ElasticSearch::DoFinish() return WriterBackend::DoFinish(); } -char* ElasticSearch::FormatField(const char* field_name, const char* field_value) -{ - char* result = new char[MAX_EVENT_SIZE]; - strcpy(result, "\""); - strcpy(result, field_name); - strcpy(result, "\":\""); - strcpy(result, field_value); - strcpy(result, "\""); - return result; - -} - bool ElasticSearch::BatchIndex() { - file = fopen("/tmp/batch.test", 'w'); - fwrite(buffer, current_offset, 1, file); - fclose(file); - file = 0; + return HTTPSend(); +} + +char* ElasticSearch::FieldToString(Value* val, const Field* field) +{ + char* result = new char[MAX_EVENT_SIZE]; + + switch ( val->type ) { + + // ElasticSearch defines bools as: 0 == false, everything else == true. So we treat it as an int. + case TYPE_BOOL: + case TYPE_INT: + sprintf(result, "%d", (int) val->val.int_val); return result; + + case TYPE_COUNT: + case TYPE_COUNTER: + sprintf(result, "%d", (int) val->val.uint_val); return result; + + case TYPE_PORT: + sprintf(result, "%d", (int) val->val.port_val.port); return result; + + case TYPE_SUBNET: + sprintf(result, "\"%s\"", Render(val->val.subnet_val).c_str()); return result; + + case TYPE_ADDR: + sprintf(result, "\"%s\"", Render(val->val.addr_val).c_str()); return result; + + case TYPE_INTERVAL: + case TYPE_TIME: + sprintf(result, "\"%d\"", (int) (val->val.double_val * 1000)); return result; + case TYPE_DOUBLE: + sprintf(result, "\"%s\"", Render(val->val.double_val).c_str()); return result; + + case TYPE_ENUM: + case TYPE_STRING: + case TYPE_FILE: + case TYPE_FUNC: + { + int size = val->val.string_val->size(); + const char* data = val->val.string_val->data(); + + if ( ! size ) + return 0; + sprintf(result, "\"%s\"", data); return result; + } + + case TYPE_TABLE: + { + char* tmp = new char[MAX_EVENT_SIZE]; + int tmp_offset = 0; + strcpy(tmp, "{"); + tmp_offset = 1; + bool result_seen = false; + for ( int j = 0; j < val->val.set_val.size; j++ ) + { + char* sub_field = FieldToString(val->val.set_val.vals[j], field); + if ( sub_field ){ + + if ( result_seen ){ + strcpy(tmp + tmp_offset, ","); + tmp_offset += 1; + } + else + result_seen = true; + + sprintf(tmp + tmp_offset, "\"%s\":%s", field->name.c_str(), sub_field); + tmp_offset = strlen(tmp); + } + } + strcpy(tmp + tmp_offset, "}"); + tmp_offset += 1; + sprintf(result, "%s", tmp); + return result; + } + + case TYPE_VECTOR: + { + char* tmp = new char[MAX_EVENT_SIZE]; + int tmp_offset = 0; + strcpy(tmp, "{"); + tmp_offset = 1; + bool result_seen = false; + for ( int j = 0; j < val->val.vector_val.size; j++ ) + { + char* sub_field = FieldToString(val->val.vector_val.vals[j], field); + if ( sub_field ){ + + if ( result_seen ){ + strcpy(tmp + tmp_offset, ","); + tmp_offset += 1; + } + else + result_seen = true; + + sprintf(tmp + tmp_offset, "\"%s\":%s", field->name.c_str(), sub_field); + tmp_offset = strlen(tmp); + } + } + strcpy(tmp + tmp_offset, "}"); + tmp_offset += 1; + sprintf(result, "%s", tmp); + return result; + } + + default: + { + return (char *)"{}"; + } + + } + } char* ElasticSearch::AddFieldToBuffer(Value* val, const Field* field) { if ( ! val->present ) - { - return ""; - } - - switch ( val->type ) { - - case TYPE_BOOL: - return FormatField(field->name, val->val.int_val ? "T" : "F"); - - case TYPE_INT: - return FormatField(field->name, val->val.int_val); - - case TYPE_COUNT: - case TYPE_COUNTER: - return FormatField(field->name, val->val.uint_val); - - case TYPE_PORT: - return FormatField(field->name, val->val.port_val.port); - - case TYPE_SUBNET: - return FormatField(field->name, Render(val->val.subnet_val)); - - case TYPE_ADDR: - return FormatField(field->name, Render(val->val.addr_val)); - - case TYPE_INTERVAL: - case TYPE_TIME: - case TYPE_DOUBLE: - return FormatField(field->name, val->val.double_val); - - case TYPE_ENUM: - case TYPE_STRING: - case TYPE_FILE: - case TYPE_FUNC: - { - int size = val->val.string_val->size(); - const char* data = val->val.string_val->data(); - - if ( ! size ) - return ""; - return FormatField(field->name, val->val.string_val->data()); - } - - case TYPE_TABLE: - { - if ( ! val->val.set_val.size ) - return ""; - - char* tmp = new char[MAX_EVENT_SIZE]; - strcpy(tmp, "{"); - for ( int j = 0; j < val->val.set_val.size; j++ ) - { - char* result = AddFieldToBuffer(val->val.set_val.vals[j], field); - bool resultSeen = false; - if ( result ){ - if ( resultSeen ) - strcpy(tmp, ","); - strcpy(tmp, result); - } - } - return FormatField(field->name, tmp); - } - - case TYPE_VECTOR: - { - if ( ! val->val.vector_val.size ) - return ""; - - char* tmp = new char[MAX_EVENT_SIZE]; - strcpy(tmp, "{"); - for ( int j = 0; j < val->val.vector_val.size; j++ ) - { - char* result = AddFieldToBuffer(val->val.vector_val.vals[j], field); - bool resultSeen = false; - if ( result ){ - if ( resultSeen ) - strcpy(tmp, ","); - strcpy(tmp, result); - } - } - return FormatField(field->name, tmp); - } - - default: - return ""; - } + return 0; + + char* result = new char[MAX_EVENT_SIZE]; + sprintf(result, "\"%s\":%s", field->name.c_str(), FieldToString(val, field)); + return result; } @@ -190,39 +214,37 @@ bool ElasticSearch::DoWrite(int num_fields, const Field* const * fields, Value** vals) { // Our action line looks like: - // {"index":"$index_name","type":"$type_prefix$path"}\n{ + // {"index":{"_index":"$index_name","_type":"$type_prefix$path"}}\n{ bool resultSeen = false; for ( int i = 0; i < num_fields; i++ ) { - char* result = DoWriteOne(vals[i], fields[i]); + char* result = AddFieldToBuffer(vals[i], fields[i]); if ( result ) { if ( ! resultSeen ) { - strcpy(buffer[current_offset], "{\"index\":\""); - strcat(buffer[current_offset], index_name); - strcat(buffer[current_offset], "\",\"type\":\""); - strcat(buffer[current_offset], type_prefix); - strcat(buffer[current_offset], Path()); - strcat(buffer[current_offset], "\"}\n{"); - current_offset = strlen(buffer); + current_offset += sprintf(buffer + current_offset, "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s%s\"}\n{", index_name, type_prefix, Path().c_str()); resultSeen = true; } else { - strcat(buffer[current_offset], ","); + strcat(buffer, ","); current_offset += 1; } - strcat(buffer[current_offset], result); + strcat(buffer, result); current_offset += strlen(result); } } if ( resultSeen ) { - strcat(buffer[current_offset], "}\n"); + strcat(buffer, "}\n"); current_offset += 2; counter += 1; - if ( counter >= batch_size ) + if ( counter >= batch_size ){ BatchIndex(); + current_offset = 0; + buffer[current_offset] = 0; + counter = 0; + } } return true; } @@ -239,4 +261,55 @@ bool ElasticSearch::DoSetBuf(bool enabled) return true; } +// HTTP Functions start here. + +CURL* ElasticSearch::HTTPSetup() +{ + char URL[2048]; + CURL* handle; + struct curl_slist *headers=NULL; + + handle = curl_easy_init(); + if ( ! handle ) + return handle; + + sprintf(URL, "http://%s:%d/_bulk", server_host, (int) server_port); + curl_easy_setopt(handle, CURLOPT_URL, URL); + + headers = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, headers); + + curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, &logging::writer::ElasticSearch::HTTPReceive); // This gets called with the result. + curl_easy_setopt(handle, CURLOPT_POST, 1); // All requests are POSTs + + // HTTP 1.1 likes to use chunked encoded transfers, which aren't good for speed. The best (only?) way to disable that is to + // just use HTTP 1.0 + curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0); + return handle; + +} + +bool ElasticSearch::HTTPReceive(void* ptr, int size, int nmemb, void* userdata){ + //TODO: Do some verification on the result? + return true; +} + +bool ElasticSearch::HTTPSend(){ + CURLcode return_code; + + curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, curl_result); + curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, buffer); + curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE, current_offset); + + return_code = curl_easy_perform(curl_handle); + switch(return_code) { + case CURLE_COULDNT_CONNECT: + case CURLE_COULDNT_RESOLVE_HOST: + case CURLE_WRITE_ERROR: + return false; + default: + return true; + } +} + #endif diff --git a/src/logging/writers/ElasticSearch.h b/src/logging/writers/ElasticSearch.h index 870290a6e0..ad3729f6da 100644 --- a/src/logging/writers/ElasticSearch.h +++ b/src/logging/writers/ElasticSearch.h @@ -5,6 +5,7 @@ #ifndef LOGGING_WRITER_ELASTICSEARCH_H #define LOGGING_WRITER_ELASTICSEARCH_H +#include #include "../WriterBackend.h" namespace logging { namespace writer { @@ -34,12 +35,20 @@ protected: private: char* AddFieldToBuffer(threading::Value* val, const threading::Field* field); - char* FormatField(const char* field_name, const char* field_value); + char* FieldToString(threading::Value* val, const threading::Field* field); bool BatchIndex(); + CURL* HTTPSetup(); + bool HTTPReceive(void* ptr, int size, int nmemb, void* userdata); + bool HTTPSend(); + + // Buffers, etc. char* buffer; int current_offset; - int counter; + uint64 counter; + + CURL* curl_handle; + char* curl_result; // From scripts char* cluster_name; diff --git a/src/types.bif b/src/types.bif index 76bac3e0e2..9b387b2c52 100644 --- a/src/types.bif +++ b/src/types.bif @@ -163,6 +163,7 @@ enum Writer %{ WRITER_NONE, WRITER_ASCII, WRITER_DATASERIES, + WRITER_ELASTICSEARCH, %} enum ID %{