diff --git a/CMakeLists.txt b/CMakeLists.txt index bea83b0de6..f667c0cfe0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -124,6 +124,17 @@ if (LINTEL_FOUND AND DATASERIES_FOUND AND LIBXML2_FOUND) list(APPEND OPTLIBS ${LibXML2_LIBRARIES}) endif() +set(USE_ELASTICSEARCH false) +set(USE_CURL false) +find_package(CURL) + +if (CURL_FOUND) + set(USE_ELASTICSEARCH true) + set(USE_CURL true) + include_directories(BEFORE ${CURL_INCLUDE_DIR}) + list(APPEND OPTLIBS ${CURL_LIBRARIES}) +endif() + if (ENABLE_PERFTOOLS_DEBUG) # Just a no op to prevent CMake from complaining about manually-specified # ENABLE_PERFTOOLS_DEBUG not being used if google perftools weren't found @@ -215,7 +226,10 @@ message( "\nGeoIP: ${USE_GEOIP}" "\nGoogle perftools: ${USE_PERFTOOLS}" "\n debugging: ${USE_PERFTOOLS_DEBUG}" + "\ncURL: ${USE_CURL}" + "\n" "\nDataSeries: ${USE_DATASERIES}" + "\nElasticSearch: ${USE_ELASTICSEARCH}" "\n" "\n================================================================\n" ) diff --git a/config.h.in b/config.h.in index 5368d6824e..2d065f755e 100644 --- a/config.h.in +++ b/config.h.in @@ -114,9 +114,15 @@ /* Analyze Mobile IPv6 traffic */ #cmakedefine ENABLE_MOBILE_IPV6 +/* Use libCurl. */ +#cmakedefine USE_CURL + /* Use the DataSeries writer. */ #cmakedefine USE_DATASERIES +/* Use the ElasticSearch writer. */ +#cmakedefine USE_ELASTICSEARCH + /* Version number of package */ #define VERSION "@VERSION@" diff --git a/doc/logging-elasticsearch.rst b/doc/logging-elasticsearch.rst new file mode 100644 index 0000000000..b6d22cf5fa --- /dev/null +++ b/doc/logging-elasticsearch.rst @@ -0,0 +1,95 @@ + +======================================== +Indexed Logging Output with ElasticSearch +======================================== + +.. rst-class:: opening + + Bro's default ASCII log format is not exactly the most efficient + way for storing and searching large volumes of data. ElasticSearch + is a new and exciting technology for dealing with tons of data. + ElasticSearch is a search engine built on top of Apache's Lucene + project. It scales very well, both for distributed indexing and + distributed searching. + +.. contents:: + +Installing ElasticSearch +------------------------ + +ElasticSearch requires a JRE to run. Please download the latest version +from: . Once extracted, start +ElasticSearch with:: + +# ./bin/elasticsearch + +Compiling Bro with ElasticSearch Support +---------------------------------------- + +First, ensure that you have libcurl installed the run configure.:: + + # ./configure + [...] + ====================| Bro Build Summary |===================== + [...] + cURL: true + [...] + ElasticSearch: true + [...] + ================================================================ + +Activating ElasticSearch +------------------------ + +The direct way to use ElasticSearch is to switch *all* log files over to +ElasticSearch. To do that, just add ``redef +Log::default_writer=Log::WRITER_ELASTICSEARCH;`` to your ``local.bro``. +For testing, you can also just pass that on the command line:: + + bro -r trace.pcap Log::default_writer=Log::WRITER_ELASTICSEARCH + +With that, Bro will now write all its output into ElasticSearch. You can +inspect these using ElasticSearch's REST-ful interface. For more +information, see: . + +There is also a rudimentary web interface to ElasticSearch, available at: +. + +You can also switch only individual files over to ElasticSearch by adding +code like this to your ``local.bro``:: + +.. code::bro + + event bro_init() + { + local f = Log::get_filter(Conn::LOG, "default"); # Get default filter for connection log. + f$writer = Log::WRITER_ELASTICSEARCH; # Change writer type. + Log::add_filter(Conn::LOG, f); # Replace filter with adapted version. + } + +Configuring ElasticSearch +------------------------- + +Bro's ElasticSearch writer comes with a few configuration options:: + +- cluster_name: Currently unused. + +- server_host: Where to send the data. Default localhost. + +- server_port: What port to send the data to. Default 9200. + +- index_prefix: ElasticSearch indexes are like databases in a standard DB model. + This is the name of the index to which to send the data. Default bro. + +- type_prefix: ElasticSearch types are like tables in a standard DB model. This is a prefix that gets prepended to Bro log names. Example: type_prefix = "bro_" would create types "bro_dns", "bro_http", etc. Default: none. + +- batch_size: How many messages to buffer before sending to ElasticSearch. This is mainly a memory optimization - changing this doesn't seem to affect indexing performance that much. Default: 10,000. + +TODO +---- + +Lots. + +- Perform multicast discovery for server. +- Better error detection. +- Better defaults (don't index loaded-plugins, for instance). diff --git a/doc/scripts/DocSourcesList.cmake b/doc/scripts/DocSourcesList.cmake index c5eb3d724b..1abe6b9305 100644 --- a/doc/scripts/DocSourcesList.cmake +++ b/doc/scripts/DocSourcesList.cmake @@ -42,6 +42,7 @@ rest_target(${psd} base/frameworks/logging/postprocessors/scp.bro) rest_target(${psd} base/frameworks/logging/postprocessors/sftp.bro) rest_target(${psd} base/frameworks/logging/writers/ascii.bro) rest_target(${psd} base/frameworks/logging/writers/dataseries.bro) +rest_target(${psd} base/frameworks/logging/writers/elasticsearch.bro) rest_target(${psd} base/frameworks/logging/writers/none.bro) rest_target(${psd} base/frameworks/metrics/cluster.bro) rest_target(${psd} base/frameworks/metrics/main.bro) @@ -145,6 +146,7 @@ rest_target(${psd} policy/protocols/ssl/known-certs.bro) rest_target(${psd} policy/protocols/ssl/validate-certs.bro) rest_target(${psd} policy/tuning/defaults/packet-fragments.bro) rest_target(${psd} policy/tuning/defaults/warnings.bro) +rest_target(${psd} policy/tuning/logs-to-elasticsearch.bro) rest_target(${psd} policy/tuning/track-all-assets.bro) rest_target(${psd} site/local-manager.bro) rest_target(${psd} site/local-proxy.bro) diff --git a/scripts/base/frameworks/logging/__load__.bro b/scripts/base/frameworks/logging/__load__.bro index be44a7e34f..b65cb1dea3 100644 --- a/scripts/base/frameworks/logging/__load__.bro +++ b/scripts/base/frameworks/logging/__load__.bro @@ -2,4 +2,5 @@ @load ./postprocessors @load ./writers/ascii @load ./writers/dataseries +@load ./writers/elasticsearch @load ./writers/none diff --git a/scripts/base/frameworks/logging/writers/elasticsearch.bro b/scripts/base/frameworks/logging/writers/elasticsearch.bro new file mode 100644 index 0000000000..a6a485226a --- /dev/null +++ b/scripts/base/frameworks/logging/writers/elasticsearch.bro @@ -0,0 +1,46 @@ +##! Log writer for sending logs to an ElasticSearch server. +##! +##! Note: This module is in testing and is not yet considered stable! +##! +##! There is one known memory issue. If your elasticsearch server is +##! running slowly and taking too long to return from bulk insert +##! requests, the message queue to the writer thread will continue +##! growing larger and larger giving the appearance of a memory leak. + +module LogElasticSearch; + +export { + ## Name of the ES cluster + const cluster_name = "elasticsearch" &redef; + + ## ES Server + const server_host = "127.0.0.1" &redef; + + ## ES Port + const server_port = 9200 &redef; + + ## Name of the ES index + const index_prefix = "bro" &redef; + + ## The ES type prefix comes before the name of the related log. + ## e.g. prefix = "bro_" would create types of bro_dns, bro_software, etc. + const type_prefix = "" &redef; + + ## The time before an ElasticSearch transfer will timeout. + ## This is not working! + const transfer_timeout = 2secs; + + ## The batch size is the number of messages that will be queued up before + ## they are sent to be bulk indexed. + const max_batch_size = 1000 &redef; + + ## The maximum amount of wall-clock time that is allowed to pass without + ## finishing a bulk log send. This represents the maximum delay you + ## would like to have with your logs before they are sent to ElasticSearch. + const max_batch_interval = 1min &redef; + + ## The maximum byte size for a buffered JSON string to send to the bulk + ## insert API. + const max_byte_size = 1024 * 1024 &redef; +} + diff --git a/scripts/policy/tuning/logs-to-elasticsearch.bro b/scripts/policy/tuning/logs-to-elasticsearch.bro new file mode 100644 index 0000000000..c3cc9d5002 --- /dev/null +++ b/scripts/policy/tuning/logs-to-elasticsearch.bro @@ -0,0 +1,45 @@ +##! Load this script to enable global log output to an ElasticSearch database. + +module LogElasticSearch; + +export { + ## An elasticsearch specific rotation interval. + const rotation_interval = 24hr &redef; + + ## Optionally ignore any :bro:enum:`Log::ID` from being sent to + ## ElasticSearch with this script. + const excluded_log_ids: set[string] = set("Communication::LOG") &redef; + + ## If you want to explicitly only send certain :bro:enum:`Log::ID` + ## streams, add them to this set. If the set remains empty, all will + ## be sent. The :bro:id:`excluded_log_ids` option will remain in + ## effect as well. + const send_logs: set[string] = set() &redef; +} + +module Log; + +event bro_init() &priority=-5 + { + local my_filters: table[ID, string] of Filter = table(); + + for ( [id, name] in filters ) + { + local filter = filters[id, name]; + if ( fmt("%s", id) in LogElasticSearch::excluded_log_ids || + (|LogElasticSearch::send_logs| > 0 && fmt("%s", id) !in LogElasticSearch::send_logs) ) + next; + + filter$name = cat(name, "-es"); + filter$writer = Log::WRITER_ELASTICSEARCH; + filter$interv = LogElasticSearch::rotation_interval; + my_filters[id, name] = filter; + } + + # This had to be done separately to avoid an ever growing filters list + # where the for loop would never end. + for ( [id, name] in my_filters ) + { + Log::add_filter(id, filter); + } + } \ No newline at end of file diff --git a/scripts/test-all-policy.bro b/scripts/test-all-policy.bro index 415468a801..a7c43b14b3 100644 --- a/scripts/test-all-policy.bro +++ b/scripts/test-all-policy.bro @@ -60,4 +60,5 @@ @load tuning/defaults/__load__.bro @load tuning/defaults/packet-fragments.bro @load tuning/defaults/warnings.bro +@load tuning/logs-to-elasticsearch.bro @load tuning/track-all-assets.bro diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 50e58d87e3..ce440852d7 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -428,6 +428,7 @@ set(bro_SRCS logging/WriterFrontend.cc logging/writers/Ascii.cc logging/writers/DataSeries.cc + logging/writers/ElasticSearch.cc logging/writers/None.cc input/Manager.cc diff --git a/src/logging.bif b/src/logging.bif index 48e0edbb06..f5d3e8e3e6 100644 --- a/src/logging.bif +++ b/src/logging.bif @@ -82,6 +82,20 @@ const dump_schema: bool; const use_integer_for_time: bool; const num_threads: count; +# Options for the ElasticSearch writer. + +module LogElasticSearch; + +const cluster_name: string; +const server_host: string; +const server_port: count; +const index_prefix: string; +const type_prefix: string; +const transfer_timeout: interval; +const max_batch_size: count; +const max_batch_interval: interval; +const max_byte_size: count; + # Options for the None writer. module LogNone; diff --git a/src/logging/Manager.cc b/src/logging/Manager.cc index fd970c48b2..c4245680a6 100644 --- a/src/logging/Manager.cc +++ b/src/logging/Manager.cc @@ -18,6 +18,10 @@ #include "writers/Ascii.h" #include "writers/None.h" +#ifdef USE_ELASTICSEARCH +#include "writers/ElasticSearch.h" +#endif + #ifdef USE_DATASERIES #include "writers/DataSeries.h" #endif @@ -36,6 +40,11 @@ struct WriterDefinition { WriterDefinition log_writers[] = { { BifEnum::Log::WRITER_NONE, "None", 0, writer::None::Instantiate }, { BifEnum::Log::WRITER_ASCII, "Ascii", 0, writer::Ascii::Instantiate }, + +#ifdef USE_ELASTICSEARCH + { BifEnum::Log::WRITER_ELASTICSEARCH, "ElasticSearch", 0, writer::ElasticSearch::Instantiate }, +#endif + #ifdef USE_DATASERIES { BifEnum::Log::WRITER_DATASERIES, "DataSeries", 0, writer::DataSeries::Instantiate }, #endif diff --git a/src/logging/writers/ElasticSearch.cc b/src/logging/writers/ElasticSearch.cc new file mode 100644 index 0000000000..b7edcf6aa6 --- /dev/null +++ b/src/logging/writers/ElasticSearch.cc @@ -0,0 +1,415 @@ +// See the file "COPYING" in the main distribution directory for copyright. +// +// This is experimental code that is not yet ready for production usage. +// + +#include "config.h" + +#ifdef USE_ELASTICSEARCH + +#include +#include + +#include "util.h" +#include "BroString.h" + +#include "NetVar.h" +#include "threading/SerialTypes.h" + +#include +#include + +#include "ElasticSearch.h" + +using namespace logging; +using namespace writer; +using threading::Value; +using threading::Field; + +ElasticSearch::ElasticSearch(WriterFrontend* frontend) : WriterBackend(frontend) + { + cluster_name_len = BifConst::LogElasticSearch::cluster_name->Len(); + cluster_name = new char[cluster_name_len + 1]; + memcpy(cluster_name, BifConst::LogElasticSearch::cluster_name->Bytes(), cluster_name_len); + cluster_name[cluster_name_len] = 0; + + index_prefix = string((const char*) BifConst::LogElasticSearch::index_prefix->Bytes(), BifConst::LogElasticSearch::index_prefix->Len()); + + es_server = string(Fmt("http://%s:%d", BifConst::LogElasticSearch::server_host->Bytes(), + (int) BifConst::LogElasticSearch::server_port)); + bulk_url = string(Fmt("%s/_bulk", es_server.c_str())); + + http_headers = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); + buffer.Clear(); + counter = 0; + current_index = string(); + prev_index = string(); + last_send = current_time(); + failing = false; + + transfer_timeout = BifConst::LogElasticSearch::transfer_timeout * 1000; + + curl_handle = HTTPSetup(); +} + +ElasticSearch::~ElasticSearch() + { + delete [] cluster_name; + } + +bool ElasticSearch::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const* fields) + { + return true; + } + +bool ElasticSearch::DoFlush(double network_time) + { + BatchIndex(); + return true; + } + +bool ElasticSearch::DoFinish(double network_time) + { + BatchIndex(); + curl_slist_free_all(http_headers); + curl_easy_cleanup(curl_handle); + return true; + } + +bool ElasticSearch::BatchIndex() + { + curl_easy_reset(curl_handle); + curl_easy_setopt(curl_handle, CURLOPT_URL, bulk_url.c_str()); + curl_easy_setopt(curl_handle, CURLOPT_POST, 1); + curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t)buffer.Len()); + curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, buffer.Bytes()); + failing = ! HTTPSend(curl_handle); + + // We are currently throwing the data out regardless of if the send failed. Fire and forget! + buffer.Clear(); + counter = 0; + last_send = current_time(); + + return true; + } + +bool ElasticSearch::AddValueToBuffer(ODesc* b, Value* val) + { + switch ( val->type ) + { + // ES treats 0 as false and any other value as true so bool types go here. + case TYPE_BOOL: + case TYPE_INT: + b->Add(val->val.int_val); + break; + + case TYPE_COUNT: + case TYPE_COUNTER: + { + // ElasticSearch doesn't seem to support unsigned 64bit ints. + if ( val->val.uint_val >= INT64_MAX ) + { + Error(Fmt("count value too large: %" PRIu64, val->val.uint_val)); + b->AddRaw("null", 4); + } + else + b->Add(val->val.uint_val); + break; + } + + case TYPE_PORT: + b->Add(val->val.port_val.port); + break; + + case TYPE_SUBNET: + b->AddRaw("\"", 1); + b->Add(Render(val->val.subnet_val)); + b->AddRaw("\"", 1); + break; + + case TYPE_ADDR: + b->AddRaw("\"", 1); + b->Add(Render(val->val.addr_val)); + b->AddRaw("\"", 1); + break; + + case TYPE_DOUBLE: + case TYPE_INTERVAL: + b->Add(val->val.double_val); + break; + + case TYPE_TIME: + { + // ElasticSearch uses milliseconds for timestamps and json only + // supports signed ints (uints can be too large). + uint64_t ts = (uint64_t) (val->val.double_val * 1000); + if ( ts >= INT64_MAX ) + { + Error(Fmt("time value too large: %" PRIu64, ts)); + b->AddRaw("null", 4); + } + else + b->Add(ts); + break; + } + + case TYPE_ENUM: + case TYPE_STRING: + case TYPE_FILE: + case TYPE_FUNC: + { + b->AddRaw("\"", 1); + for ( int i = 0; i < val->val.string_val.length; ++i ) + { + char c = val->val.string_val.data[i]; + // 2byte Unicode escape special characters. + if ( c < 32 || c > 126 || c == '\n' || c == '"' || c == '\'' || c == '\\' || c == '&' ) + { + static const char hex_chars[] = "0123456789abcdef"; + b->AddRaw("\\u00", 4); + b->AddRaw(&hex_chars[(c & 0xf0) >> 4], 1); + b->AddRaw(&hex_chars[c & 0x0f], 1); + } + else + b->AddRaw(&c, 1); + } + b->AddRaw("\"", 1); + break; + } + + case TYPE_TABLE: + { + b->AddRaw("[", 1); + for ( int j = 0; j < val->val.set_val.size; j++ ) + { + if ( j > 0 ) + b->AddRaw(",", 1); + AddValueToBuffer(b, val->val.set_val.vals[j]); + } + b->AddRaw("]", 1); + break; + } + + case TYPE_VECTOR: + { + b->AddRaw("[", 1); + for ( int j = 0; j < val->val.vector_val.size; j++ ) + { + if ( j > 0 ) + b->AddRaw(",", 1); + AddValueToBuffer(b, val->val.vector_val.vals[j]); + } + b->AddRaw("]", 1); + break; + } + + default: + return false; + } + return true; + } + +bool ElasticSearch::AddFieldToBuffer(ODesc *b, Value* val, const Field* field) + { + if ( ! val->present ) + return false; + + b->AddRaw("\"", 1); + b->Add(field->name); + b->AddRaw("\":", 2); + AddValueToBuffer(b, val); + return true; + } + +bool ElasticSearch::DoWrite(int num_fields, const Field* const * fields, + Value** vals) + { + if ( current_index.empty() ) + UpdateIndex(network_time, Info().rotation_interval, Info().rotation_base); + + // Our action line looks like: + buffer.AddRaw("{\"index\":{\"_index\":\"", 20); + buffer.Add(current_index); + buffer.AddRaw("\",\"_type\":\"", 11); + buffer.Add(Info().path); + buffer.AddRaw("\"}}\n", 4); + + buffer.AddRaw("{", 1); + for ( int i = 0; i < num_fields; i++ ) + { + if ( i > 0 && buffer.Bytes()[buffer.Len()] != ',' && vals[i]->present ) + buffer.AddRaw(",", 1); + AddFieldToBuffer(&buffer, vals[i], fields[i]); + } + buffer.AddRaw("}\n", 2); + + counter++; + if ( counter >= BifConst::LogElasticSearch::max_batch_size || + uint(buffer.Len()) >= BifConst::LogElasticSearch::max_byte_size ) + BatchIndex(); + + return true; + } + +bool ElasticSearch::UpdateIndex(double now, double rinterval, double rbase) + { + if ( rinterval == 0 ) + { + // if logs aren't being rotated, don't use a rotation oriented index name. + current_index = index_prefix; + } + else + { + double nr = calc_next_rotate(now, rinterval, rbase); + double interval_beginning = now - (rinterval - nr); + + struct tm tm; + char buf[128]; + time_t teatime = (time_t)interval_beginning; + localtime_r(&teatime, &tm); + strftime(buf, sizeof(buf), "%Y%m%d%H%M", &tm); + + prev_index = current_index; + current_index = index_prefix + "-" + buf; + + // Send some metadata about this index. + buffer.AddRaw("{\"index\":{\"_index\":\"@", 21); + buffer.Add(index_prefix); + buffer.AddRaw("-meta\",\"_type\":\"index\",\"_id\":\"", 30); + buffer.Add(current_index); + buffer.AddRaw("-", 1); + buffer.Add(Info().rotation_base); + buffer.AddRaw("-", 1); + buffer.Add(Info().rotation_interval); + buffer.AddRaw("\"}}\n{\"name\":\"", 13); + buffer.Add(current_index); + buffer.AddRaw("\",\"start\":", 10); + buffer.Add(interval_beginning); + buffer.AddRaw(",\"end\":", 7); + buffer.Add(interval_beginning+rinterval); + buffer.AddRaw("}\n", 2); + } + + //printf("%s - prev:%s current:%s\n", Info().path.c_str(), prev_index.c_str(), current_index.c_str()); + return true; + } + + +bool ElasticSearch::DoRotate(const char* rotated_path, double open, double close, bool terminating) + { + // Update the currently used index to the new rotation interval. + UpdateIndex(close, Info().rotation_interval, Info().rotation_base); + + // Only do this stuff if there was a previous index. + if ( ! prev_index.empty() ) + { + // FIXME: I think this section is taking too long and causing the thread to die. + + // Compress the previous index + //curl_easy_reset(curl_handle); + //curl_easy_setopt(curl_handle, CURLOPT_URL, Fmt("%s/%s/_settings", es_server.c_str(), prev_index.c_str())); + //curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, "PUT"); + //curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, "{\"index\":{\"store.compress.stored\":\"true\"}}"); + //curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) 42); + //HTTPSend(curl_handle); + + // Optimize the previous index. + // TODO: make this into variables. + //curl_easy_reset(curl_handle); + //curl_easy_setopt(curl_handle, CURLOPT_URL, Fmt("%s/%s/_optimize?max_num_segments=1&wait_for_merge=false", es_server.c_str(), prev_index.c_str())); + //HTTPSend(curl_handle); + } + + if ( ! FinishedRotation(current_index.c_str(), prev_index.c_str(), open, close, terminating) ) + { + Error(Fmt("error rotating %s to %s", prev_index.c_str(), current_index.c_str())); + } + + return true; + } + +bool ElasticSearch::DoSetBuf(bool enabled) + { + // Nothing to do. + return true; + } + +bool ElasticSearch::DoHeartbeat(double network_time, double current_time) + { + if ( last_send > 0 && buffer.Len() > 0 && + current_time-last_send > BifConst::LogElasticSearch::max_batch_interval ) + { + BatchIndex(); + } + + return true; + } + + +CURL* ElasticSearch::HTTPSetup() + { + CURL* handle = curl_easy_init(); + if ( ! handle ) + { + Error("cURL did not initialize correctly."); + return 0; + } + + return handle; + } + +bool ElasticSearch::HTTPReceive(void* ptr, int size, int nmemb, void* userdata) + { + //TODO: Do some verification on the result? + return true; + } + +bool ElasticSearch::HTTPSend(CURL *handle) + { + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, http_headers); + curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, &logging::writer::ElasticSearch::HTTPReceive); // This gets called with the result. + // HTTP 1.1 likes to use chunked encoded transfers, which aren't good for speed. + // The best (only?) way to disable that is to just use HTTP 1.0 + curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0); + + //curl_easy_setopt(handle, CURLOPT_TIMEOUT_MS, transfer_timeout); + + CURLcode return_code = curl_easy_perform(handle); + + switch ( return_code ) + { + case CURLE_COULDNT_CONNECT: + case CURLE_COULDNT_RESOLVE_HOST: + case CURLE_WRITE_ERROR: + case CURLE_RECV_ERROR: + { + if ( ! failing ) + Error(Fmt("ElasticSearch server may not be accessible.")); + } + + case CURLE_OPERATION_TIMEDOUT: + { + if ( ! failing ) + Warning(Fmt("HTTP operation with elasticsearch server timed out at %" PRIu64 " msecs.", transfer_timeout)); + } + + case CURLE_OK: + { + uint http_code = 0; + curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_code); + if ( http_code == 200 ) + // Hopefully everything goes through here. + return true; + else if ( ! failing ) + Error(Fmt("Received a non-successful status code back from ElasticSearch server, check the elasticsearch server log.")); + } + + default: + { + } + } + // The "successful" return happens above + return false; + } + +#endif diff --git a/src/logging/writers/ElasticSearch.h b/src/logging/writers/ElasticSearch.h new file mode 100644 index 0000000000..0d863f2f19 --- /dev/null +++ b/src/logging/writers/ElasticSearch.h @@ -0,0 +1,81 @@ +// See the file "COPYING" in the main distribution directory for copyright. +// +// Log writer for writing to an ElasticSearch database +// +// This is experimental code that is not yet ready for production usage. +// + +#ifndef LOGGING_WRITER_ELASTICSEARCH_H +#define LOGGING_WRITER_ELASTICSEARCH_H + +#include +#include "../WriterBackend.h" + +namespace logging { namespace writer { + +class ElasticSearch : public WriterBackend { +public: + ElasticSearch(WriterFrontend* frontend); + ~ElasticSearch(); + + static WriterBackend* Instantiate(WriterFrontend* frontend) + { return new ElasticSearch(frontend); } + static string LogExt(); + +protected: + // Overidden from WriterBackend. + + virtual bool DoInit(const WriterInfo& info, int num_fields, + const threading::Field* const* fields); + + virtual bool DoWrite(int num_fields, const threading::Field* const* fields, + threading::Value** vals); + virtual bool DoSetBuf(bool enabled); + virtual bool DoRotate(const char* rotated_path, double open, + double close, bool terminating); + virtual bool DoFlush(double network_time); + virtual bool DoFinish(double network_time); + virtual bool DoHeartbeat(double network_time, double current_time); + +private: + bool AddFieldToBuffer(ODesc *b, threading::Value* val, const threading::Field* field); + bool AddValueToBuffer(ODesc *b, threading::Value* val); + bool BatchIndex(); + bool SendMappings(); + bool UpdateIndex(double now, double rinterval, double rbase); + + CURL* HTTPSetup(); + bool HTTPReceive(void* ptr, int size, int nmemb, void* userdata); + bool HTTPSend(CURL *handle); + + // Buffers, etc. + ODesc buffer; + uint64 counter; + double last_send; + string current_index; + string prev_index; + + CURL* curl_handle; + + // From scripts + char* cluster_name; + int cluster_name_len; + + string es_server; + string bulk_url; + + struct curl_slist *http_headers; + + string path; + string index_prefix; + uint64 transfer_timeout; + bool failing; + + uint64 batch_size; +}; + +} +} + + +#endif diff --git a/src/main.cc b/src/main.cc index d3937b3449..407f67c9af 100644 --- a/src/main.cc +++ b/src/main.cc @@ -12,6 +12,10 @@ #include #endif +#ifdef USE_CURL +#include +#endif + #ifdef USE_IDMEF extern "C" { #include @@ -712,6 +716,10 @@ int main(int argc, char** argv) SSL_library_init(); SSL_load_error_strings(); +#ifdef USE_CURL + curl_global_init(CURL_GLOBAL_ALL); +#endif + // FIXME: On systems that don't provide /dev/urandom, OpenSSL doesn't // seed the PRNG. We should do this here (but at least Linux, FreeBSD // and Solaris provide /dev/urandom). @@ -1062,6 +1070,10 @@ int main(int argc, char** argv) done_with_network(); net_delete(); +#ifdef USE_CURL + curl_global_cleanup(); +#endif + terminate_bro(); // Close files after net_delete(), because net_delete() diff --git a/src/types.bif b/src/types.bif index 033ee975a0..92cc8db551 100644 --- a/src/types.bif +++ b/src/types.bif @@ -163,6 +163,7 @@ enum Writer %{ WRITER_NONE, WRITER_ASCII, WRITER_DATASERIES, + WRITER_ELASTICSEARCH, %} enum ID %{ diff --git a/src/util.h b/src/util.h index 075c2af7c2..a695c6df6a 100644 --- a/src/util.h +++ b/src/util.h @@ -13,6 +13,7 @@ // Expose C99 functionality from inttypes.h, which would otherwise not be // available in C++. #define __STDC_FORMAT_MACROS +#define __STDC_LIMIT_MACROS #include #if __STDC__ diff --git a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log index 8f90296b63..ca8749956f 100644 --- a/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.bare-load-baseline/canonified_loaded_scripts.log @@ -3,7 +3,7 @@ #empty_field (empty) #unset_field - #path loaded_scripts -#start 2012-07-20-01-49-31 +#start 2012-07-20-14-34-11 #fields name #types string scripts/base/init-bare.bro @@ -21,6 +21,7 @@ scripts/base/init-bare.bro scripts/base/frameworks/logging/./postprocessors/./sftp.bro scripts/base/frameworks/logging/./writers/ascii.bro scripts/base/frameworks/logging/./writers/dataseries.bro + scripts/base/frameworks/logging/./writers/elasticsearch.bro scripts/base/frameworks/logging/./writers/none.bro scripts/base/frameworks/input/__load__.bro scripts/base/frameworks/input/./main.bro @@ -29,4 +30,4 @@ scripts/base/init-bare.bro scripts/base/frameworks/input/./readers/raw.bro scripts/base/frameworks/input/./readers/benchmark.bro scripts/policy/misc/loaded-scripts.bro -#end 2012-07-20-01-49-31 +#end 2012-07-20-14-34-11 diff --git a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log index 6bc461ed65..b464c916f2 100644 --- a/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log +++ b/testing/btest/Baseline/coverage.default-load-baseline/canonified_loaded_scripts.log @@ -3,7 +3,7 @@ #empty_field (empty) #unset_field - #path loaded_scripts -#start 2012-07-20-01-49-33 +#start 2012-07-20-14-34-40 #fields name #types string scripts/base/init-bare.bro @@ -21,6 +21,7 @@ scripts/base/init-bare.bro scripts/base/frameworks/logging/./postprocessors/./sftp.bro scripts/base/frameworks/logging/./writers/ascii.bro scripts/base/frameworks/logging/./writers/dataseries.bro + scripts/base/frameworks/logging/./writers/elasticsearch.bro scripts/base/frameworks/logging/./writers/none.bro scripts/base/frameworks/input/__load__.bro scripts/base/frameworks/input/./main.bro @@ -109,4 +110,4 @@ scripts/base/init-default.bro scripts/base/protocols/syslog/./consts.bro scripts/base/protocols/syslog/./main.bro scripts/policy/misc/loaded-scripts.bro -#end 2012-07-20-01-49-33 +#end 2012-07-20-14-34-40