diff --git a/scripts/base/frameworks/logging/writers/elasticsearch.bro b/scripts/base/frameworks/logging/writers/elasticsearch.bro index e2d14a68e3..b262201c85 100644 --- a/scripts/base/frameworks/logging/writers/elasticsearch.bro +++ b/scripts/base/frameworks/logging/writers/elasticsearch.bro @@ -20,11 +20,15 @@ export { ## The batch size is the number of messages that will be queued up before ## they are sent to be bulk indexed. ## Note: this is mainly a memory usage parameter. - const batch_size = 1000 &redef; + const max_batch_size = 1000 &redef; ## The maximum amount of wall-clock time that is allowed to pass without ## finishing a bulk log send. This represents the maximum delay you ## would like to have with your logs before they show up in ElasticSearch. const max_batch_interval = 1min &redef; + + ## The maximum byte size for a buffered JSON string to send to the bulk + ## insert API. + const max_byte_size = 1024 * 1024 &redef; } diff --git a/src/logging.bif b/src/logging.bif index 5434ac3705..cbae66efdb 100644 --- a/src/logging.bif +++ b/src/logging.bif @@ -91,5 +91,6 @@ const server_host: string; const server_port: count; const index_name: string; const type_prefix: string; -const batch_size: count; +const max_batch_size: count; const max_batch_interval: interval; +const max_byte_size: count; diff --git a/src/logging/writers/ElasticSearch.cc b/src/logging/writers/ElasticSearch.cc index 46282404a6..fd028e9b68 100644 --- a/src/logging/writers/ElasticSearch.cc +++ b/src/logging/writers/ElasticSearch.cc @@ -186,31 +186,27 @@ bool ElasticSearch::DoWrite(int num_fields, const Field* const * fields, { // Our action line looks like: // {"index":{"_index":"$index_name","_type":"$type_prefix$path"}}\n - if ( counter == 0 ) - { - buffer.AddRaw("{\"index\":{\"_index\":\"", 20); - buffer.AddN((const char*) BifConst::LogElasticSearch::index_name->Bytes(), - BifConst::LogElasticSearch::index_name->Len()); - buffer.AddRaw("\",\"_type\":\"", 11); - buffer.AddN((const char*) BifConst::LogElasticSearch::type_prefix->Bytes(), - BifConst::LogElasticSearch::type_prefix->Len()); - buffer.Add(Path()); - buffer.AddRaw("\"}\n", 3); - } + buffer.AddRaw("{\"index\":{\"_index\":\"", 20); + buffer.AddN((const char*) BifConst::LogElasticSearch::index_name->Bytes(), + BifConst::LogElasticSearch::index_name->Len()); + buffer.AddRaw("\",\"_type\":\"", 11); + buffer.AddN((const char*) BifConst::LogElasticSearch::type_prefix->Bytes(), + BifConst::LogElasticSearch::type_prefix->Len()); + buffer.Add(Path()); + buffer.AddRaw("\"}\n", 3); + buffer.AddRaw("{", 1); for ( int i = 0; i < num_fields; i++ ) { - if ( i == 0 ) - buffer.AddRaw("{", 1); - else if ( buffer.Bytes()[buffer.Len()] != ',' && vals[i]->present ) + if ( i > 0 && buffer.Bytes()[buffer.Len()] != ',' && vals[i]->present ) buffer.AddRaw(",", 1); AddFieldToBuffer(vals[i], fields[i]); } - buffer.AddRaw("}\n", 2); counter++; - if ( counter >= BifConst::LogElasticSearch::batch_size ) + if ( counter >= BifConst::LogElasticSearch::max_batch_size || + uint(buffer.Len()) >= BifConst::LogElasticSearch::max_byte_size ) BatchIndex(); return true; @@ -219,6 +215,7 @@ bool ElasticSearch::DoWrite(int num_fields, const Field* const * fields, bool ElasticSearch::DoRotate(string rotated_path, double open, double close, bool terminating) { //TODO: Determine what, if anything, needs to be done here. + return true; }