diff --git a/scripts/base/frameworks/logging/__load__.bro b/scripts/base/frameworks/logging/__load__.bro index 2c2a6d2f59..b65cb1dea3 100644 --- a/scripts/base/frameworks/logging/__load__.bro +++ b/scripts/base/frameworks/logging/__load__.bro @@ -2,4 +2,5 @@ @load ./postprocessors @load ./writers/ascii @load ./writers/dataseries -@load ./writers/elasticsearch@load ./writers/none +@load ./writers/elasticsearch +@load ./writers/none diff --git a/scripts/base/frameworks/logging/writers/elasticsearch.bro b/scripts/base/frameworks/logging/writers/elasticsearch.bro index b262201c85..93c6c98705 100644 --- a/scripts/base/frameworks/logging/writers/elasticsearch.bro +++ b/scripts/base/frameworks/logging/writers/elasticsearch.bro @@ -11,7 +11,7 @@ export { const server_port = 9200 &redef; ## Name of the ES index - const index_name = "bro" &redef; + const index_prefix = "bro" &redef; ## The ES type prefix comes before the name of the related log. ## e.g. prefix = "bro_" would create types of bro_dns, bro_software, etc. diff --git a/src/logging.bif b/src/logging.bif index 23b9378b26..3cdb414d80 100644 --- a/src/logging.bif +++ b/src/logging.bif @@ -89,7 +89,7 @@ module LogElasticSearch; const cluster_name: string; const server_host: string; const server_port: count; -const index_name: string; +const index_prefix: string; const type_prefix: string; const max_batch_size: count; const max_batch_interval: interval; diff --git a/src/logging/writers/ElasticSearch.cc b/src/logging/writers/ElasticSearch.cc index 75a4e0514f..6d2f8363cc 100644 --- a/src/logging/writers/ElasticSearch.cc +++ b/src/logging/writers/ElasticSearch.cc @@ -30,8 +30,17 @@ ElasticSearch::ElasticSearch(WriterFrontend* frontend) : WriterBackend(frontend) memcpy(cluster_name, BifConst::LogElasticSearch::cluster_name->Bytes(), cluster_name_len); cluster_name[cluster_name_len] = 0; + index_prefix = string((const char*) BifConst::LogElasticSearch::index_prefix->Bytes(), BifConst::LogElasticSearch::index_prefix->Len()); + + es_server = string(Fmt("http://%s:%d", BifConst::LogElasticSearch::server_host->Bytes(), + (int) BifConst::LogElasticSearch::server_port)); + bulk_url = string(Fmt("%s/_bulk", es_server.c_str())); + + http_headers = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); buffer.Clear(); counter = 0; + current_index = string(); + prev_index = string(); last_send = current_time(); curl_handle = HTTPSetup(); @@ -42,67 +51,84 @@ ElasticSearch::~ElasticSearch() delete [] cluster_name; } -bool ElasticSearch::DoInit(string path, int num_fields, const Field* const * fields) +bool ElasticSearch::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const* fields) { - //TODO: Determine what, if anything, needs to be done here. return true; } bool ElasticSearch::DoFlush() { + BatchIndex(); return true; } bool ElasticSearch::DoFinish() { BatchIndex(); + curl_slist_free_all(http_headers); curl_easy_cleanup(curl_handle); return WriterBackend::DoFinish(); } bool ElasticSearch::BatchIndex() { - HTTPSend(); + curl_easy_reset(curl_handle); + curl_easy_setopt(curl_handle, CURLOPT_URL, bulk_url.c_str()); + curl_easy_setopt(curl_handle, CURLOPT_POST, 1); + curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t)buffer.Len()); + curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, buffer.Bytes()); + HTTPSend(curl_handle); + buffer.Clear(); counter = 0; last_send = current_time(); + return true; } -bool ElasticSearch::AddFieldValueToBuffer(Value* val, const Field* field) +bool ElasticSearch::AddValueToBuffer(ODesc* b, Value* val) { switch ( val->type ) { // ES treats 0 as false and any other value as true so bool types go here. case TYPE_BOOL: case TYPE_INT: - buffer.Add(val->val.int_val); + b->Add(val->val.int_val); break; case TYPE_COUNT: case TYPE_COUNTER: - buffer.Add(val->val.uint_val); + { + // ElasticSearch doesn't seem to support unsigned 64bit ints. + if ( val->val.uint_val >= INT64_MAX ) + { + Error(Fmt("count value too large: %" PRIu64, val->val.uint_val)); + b->AddRaw("null", 4); + } + else + b->Add(val->val.uint_val); break; + } case TYPE_PORT: - buffer.Add(val->val.port_val.port); + b->Add(val->val.port_val.port); break; case TYPE_SUBNET: - buffer.AddRaw("\"", 1); - buffer.Add(Render(val->val.subnet_val)); - buffer.AddRaw("\"", 1); + b->AddRaw("\"", 1); + b->Add(Render(val->val.subnet_val)); + b->AddRaw("\"", 1); break; case TYPE_ADDR: - buffer.AddRaw("\"", 1); - buffer.Add(Render(val->val.addr_val)); - buffer.AddRaw("\"", 1); + b->AddRaw("\"", 1); + b->Add(Render(val->val.addr_val)); + b->AddRaw("\"", 1); break; case TYPE_DOUBLE: case TYPE_INTERVAL: - buffer.Add(val->val.double_val); + b->Add(val->val.double_val); break; case TYPE_TIME: @@ -113,10 +139,10 @@ bool ElasticSearch::AddFieldValueToBuffer(Value* val, const Field* field) if ( ts >= INT64_MAX ) { Error(Fmt("time value too large: %" PRIu64, ts)); - buffer.AddRaw("null", 4); + b->AddRaw("null", 4); } else - buffer.Add(ts); + b->Add(ts); break; } @@ -125,51 +151,48 @@ bool ElasticSearch::AddFieldValueToBuffer(Value* val, const Field* field) case TYPE_FILE: case TYPE_FUNC: { - buffer.AddRaw("\"", 1); + b->AddRaw("\"", 1); for ( uint i = 0; i < val->val.string_val->size(); ++i ) { char c = val->val.string_val->data()[i]; - // HTML entity encode special characters. + // 2byte Unicode escape special characters. if ( c < 32 || c > 126 || c == '\n' || c == '"' || c == '\'' || c == '\\' || c == '&' ) { static const char hex_chars[] = "0123456789abcdef"; - buffer.AddRaw("\\u00", 4); - buffer.AddRaw(&hex_chars[(c & 0xf0) >> 4], 1); - buffer.AddRaw(&hex_chars[c & 0x0f], 1); - //buffer.AddRaw("&#//", 2); - //buffer.Add((uint8_t) c); - //buffer.AddRaw(";", 1); + b->AddRaw("\\u00", 4); + b->AddRaw(&hex_chars[(c & 0xf0) >> 4], 1); + b->AddRaw(&hex_chars[c & 0x0f], 1); } else - buffer.AddRaw(&c, 1); + b->AddRaw(&c, 1); } - buffer.AddRaw("\"", 1); + b->AddRaw("\"", 1); break; } case TYPE_TABLE: { - buffer.AddRaw("[", 1); + b->AddRaw("[", 1); for ( int j = 0; j < val->val.set_val.size; j++ ) { if ( j > 0 ) - buffer.AddRaw(",", 1); - AddFieldValueToBuffer(val->val.set_val.vals[j], field); + b->AddRaw(",", 1); + AddValueToBuffer(b, val->val.set_val.vals[j]); } - buffer.AddRaw("]", 1); + b->AddRaw("]", 1); break; } case TYPE_VECTOR: { - buffer.AddRaw("[", 1); + b->AddRaw("[", 1); for ( int j = 0; j < val->val.vector_val.size; j++ ) { if ( j > 0 ) - buffer.AddRaw(",", 1); - AddFieldValueToBuffer(val->val.vector_val.vals[j], field); + b->AddRaw(",", 1); + AddValueToBuffer(b, val->val.vector_val.vals[j]); } - buffer.AddRaw("]", 1); + b->AddRaw("]", 1); break; } @@ -179,38 +202,37 @@ bool ElasticSearch::AddFieldValueToBuffer(Value* val, const Field* field) return true; } -bool ElasticSearch::AddFieldToBuffer(Value* val, const Field* field) +bool ElasticSearch::AddFieldToBuffer(ODesc *b, Value* val, const Field* field) { if ( ! val->present ) return false; - buffer.AddRaw("\"", 1); - buffer.Add(field->name); - buffer.AddRaw("\":", 2); - AddFieldValueToBuffer(val, field); + b->AddRaw("\"", 1); + b->Add(field->name); + b->AddRaw("\":", 2); + AddValueToBuffer(b, val); return true; } bool ElasticSearch::DoWrite(int num_fields, const Field* const * fields, Value** vals) { + if ( current_index.empty() ) + UpdateIndex(network_time, Info().rotation_interval, Info().rotation_base); + // Our action line looks like: - // {"index":{"_index":"$index_name","_type":"$type_prefix$path"}}\n buffer.AddRaw("{\"index\":{\"_index\":\"", 20); - buffer.AddN((const char*) BifConst::LogElasticSearch::index_name->Bytes(), - BifConst::LogElasticSearch::index_name->Len()); + buffer.Add(current_index); buffer.AddRaw("\",\"_type\":\"", 11); - buffer.AddN((const char*) BifConst::LogElasticSearch::type_prefix->Bytes(), - BifConst::LogElasticSearch::type_prefix->Len()); - buffer.Add(Path()); - buffer.AddRaw("\"}\n", 3); + buffer.Add(Info().path); + buffer.AddRaw("\"}}\n", 4); buffer.AddRaw("{", 1); for ( int i = 0; i < num_fields; i++ ) { if ( i > 0 && buffer.Bytes()[buffer.Len()] != ',' && vals[i]->present ) buffer.AddRaw(",", 1); - AddFieldToBuffer(vals[i], fields[i]); + AddFieldToBuffer(&buffer, vals[i], fields[i]); } buffer.AddRaw("}\n", 2); @@ -221,10 +243,63 @@ bool ElasticSearch::DoWrite(int num_fields, const Field* const * fields, return true; } + +bool ElasticSearch::UpdateIndex(double now, double rinterval, double rbase) + { + if ( rinterval == 0 ) + { + // if logs aren't being rotated, don't use a rotation oriented index name. + current_index = index_prefix; + } + else + { + double nr = calc_next_rotate(now, rinterval, rbase); + double interval_beginning = now - (rinterval - nr); + + struct tm tm; + char buf[128]; + time_t teatime = (time_t)interval_beginning; + gmtime_r(&teatime, &tm); + strftime(buf, sizeof(buf), "%Y%m%d%H%M", &tm); + + prev_index = current_index; + current_index = index_prefix + "-" + buf; + } + + //printf("%s - prev:%s current:%s\n", Info().path.c_str(), prev_index.c_str(), current_index.c_str()); + return true; + } + bool ElasticSearch::DoRotate(string rotated_path, double open, double close, bool terminating) { - //TODO: Determine what, if anything, needs to be done here. + // Update the currently used index to the new rotation interval. + UpdateIndex(close, Info().rotation_interval, Info().rotation_base); + + // Only do this stuff if there was a previous index. + if ( ! prev_index.empty() ) + { + // FIXME: I think this section is taking too long and causing the thread to die. + + // Compress the previous index + //curl_easy_reset(curl_handle); + //curl_easy_setopt(curl_handle, CURLOPT_URL, Fmt("%s/%s/_settings", es_server.c_str(), prev_index.c_str())); + //curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, "PUT"); + //curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, "{\"index\":{\"store.compress.stored\":\"true\"}}"); + //curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) 42); + //HTTPSend(curl_handle); + + // Optimize the previous index. + // TODO: make this into variables. + //curl_easy_reset(curl_handle); + //curl_easy_setopt(curl_handle, CURLOPT_URL, Fmt("%s/%s/_optimize?max_num_segments=1&wait_for_merge=false", es_server.c_str(), prev_index.c_str())); + //HTTPSend(curl_handle); + } + + //if ( ! FinishedRotation(current_index, prev_index, open, close, terminating) ) + // { + // Error(Fmt("error rotating %s to %s", prev_index.c_str(), current_index.c_str())); + // } return true; } @@ -237,7 +312,7 @@ bool ElasticSearch::DoSetBuf(bool enabled) bool ElasticSearch::DoHeartbeat(double network_time, double current_time) { - if ( last_send > 0 && + if ( last_send > 0 && buffer.Len() > 0 && current_time-last_send > BifConst::LogElasticSearch::max_batch_interval ) { BatchIndex(); @@ -247,31 +322,15 @@ bool ElasticSearch::DoHeartbeat(double network_time, double current_time) } -// HTTP Functions start here. - CURL* ElasticSearch::HTTPSetup() { - const char *URL = fmt("http://%s:%d/_bulk", BifConst::LogElasticSearch::server_host->CheckString(), - (int) BifConst::LogElasticSearch::server_port);; - CURL* handle; - struct curl_slist *headers=NULL; - - handle = curl_easy_init(); + CURL* handle = curl_easy_init(); if ( ! handle ) - return handle; + { + Error("cURL did not initialize correctly."); + return 0; + } - //sprintf(URL, "http://%s:%d/_bulk", BifConst::LogElasticSearch::server_host->CheckString(), (int) BifConst::LogElasticSearch::server_port); - curl_easy_setopt(handle, CURLOPT_URL, URL); - - headers = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); - curl_easy_setopt(handle, CURLOPT_HTTPHEADER, headers); - - curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, &logging::writer::ElasticSearch::HTTPReceive); // This gets called with the result. - curl_easy_setopt(handle, CURLOPT_POST, 1); // All requests are POSTs - - // HTTP 1.1 likes to use chunked encoded transfers, which aren't good for speed. The best (only?) way to disable that is to - // just use HTTP 1.0 - curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0); return handle; } @@ -281,14 +340,16 @@ bool ElasticSearch::HTTPReceive(void* ptr, int size, int nmemb, void* userdata) return true; } -bool ElasticSearch::HTTPSend() +bool ElasticSearch::HTTPSend(CURL *handle) { - CURLcode return_code; + curl_easy_setopt(handle, CURLOPT_HTTPHEADER, http_headers); + curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, &logging::writer::ElasticSearch::HTTPReceive); // This gets called with the result. + // HTTP 1.1 likes to use chunked encoded transfers, which aren't good for speed. + // The best (only?) way to disable that is to just use HTTP 1.0 + curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0); - curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE_LARGE, buffer.Len()); - curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, buffer.Bytes()); + CURLcode return_code = curl_easy_perform(handle); - return_code = curl_easy_perform(curl_handle); switch ( return_code ) { case CURLE_COULDNT_CONNECT: @@ -296,6 +357,16 @@ bool ElasticSearch::HTTPSend() case CURLE_WRITE_ERROR: return false; + case CURLE_OK: + { + uint http_code = 0; + curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_code); + if ( http_code != 200 ) + Error(Fmt("Received a non-successful status code back from ElasticSearch server.")); + + return true; + } + default: return true; } diff --git a/src/logging/writers/ElasticSearch.h b/src/logging/writers/ElasticSearch.h index bd1351214b..375845b002 100644 --- a/src/logging/writers/ElasticSearch.h +++ b/src/logging/writers/ElasticSearch.h @@ -22,8 +22,8 @@ public: protected: // Overidden from WriterBackend. - virtual bool DoInit(string path, int num_fields, - const threading::Field* const * fields); + virtual bool DoInit(const WriterInfo& info, int num_fields, + const threading::Field* const* fields); virtual bool DoWrite(int num_fields, const threading::Field* const* fields, threading::Value** vals); @@ -35,18 +35,22 @@ protected: virtual bool DoHeartbeat(double network_time, double current_time); private: - bool AddFieldToBuffer(threading::Value* val, const threading::Field* field); - bool AddFieldValueToBuffer(threading::Value* val, const threading::Field* field); + bool AddFieldToBuffer(ODesc *b, threading::Value* val, const threading::Field* field); + bool AddValueToBuffer(ODesc *b, threading::Value* val); bool BatchIndex(); + bool SendMappings(); + bool UpdateIndex(double now, double rinterval, double rbase); CURL* HTTPSetup(); bool HTTPReceive(void* ptr, int size, int nmemb, void* userdata); - bool HTTPSend(); + bool HTTPSend(CURL *handle); // Buffers, etc. ODesc buffer; uint64 counter; double last_send; + string current_index; + string prev_index; CURL* curl_handle; @@ -54,6 +58,14 @@ private: char* cluster_name; int cluster_name_len; + string es_server; + string bulk_url; + + struct curl_slist *http_headers; + + string path; + string index_prefix; + uint64 batch_size; };