Bug fix and feature.

- Fixed bug with how data is sent to elasticsearch.

- Added a feature to only allow data of a certain
  size to be buffered before sending to the
  elasticsearch server.  Configured with the
  LogElasticSearch::max_byte_size variable.
This commit is contained in:
Seth Hall 2012-06-16 22:22:40 -04:00
parent b1561437e9
commit cd8169dda3
3 changed files with 20 additions and 18 deletions

View file

@ -20,11 +20,15 @@ export {
## The batch size is the number of messages that will be queued up before
## they are sent to be bulk indexed.
## Note: this is mainly a memory usage parameter.
const batch_size = 1000 &redef;
const max_batch_size = 1000 &redef;
## The maximum amount of wall-clock time that is allowed to pass without
## finishing a bulk log send. This represents the maximum delay you
## would like to have with your logs before they show up in ElasticSearch.
const max_batch_interval = 1min &redef;
## The maximum byte size for a buffered JSON string to send to the bulk
## insert API.
const max_byte_size = 1024 * 1024 &redef;
}

View file

@ -91,5 +91,6 @@ const server_host: string;
const server_port: count;
const index_name: string;
const type_prefix: string;
const batch_size: count;
const max_batch_size: count;
const max_batch_interval: interval;
const max_byte_size: count;

View file

@ -186,31 +186,27 @@ bool ElasticSearch::DoWrite(int num_fields, const Field* const * fields,
{
// Our action line looks like:
// {"index":{"_index":"$index_name","_type":"$type_prefix$path"}}\n
if ( counter == 0 )
{
buffer.AddRaw("{\"index\":{\"_index\":\"", 20);
buffer.AddN((const char*) BifConst::LogElasticSearch::index_name->Bytes(),
BifConst::LogElasticSearch::index_name->Len());
buffer.AddRaw("\",\"_type\":\"", 11);
buffer.AddN((const char*) BifConst::LogElasticSearch::type_prefix->Bytes(),
BifConst::LogElasticSearch::type_prefix->Len());
buffer.Add(Path());
buffer.AddRaw("\"}\n", 3);
}
buffer.AddRaw("{\"index\":{\"_index\":\"", 20);
buffer.AddN((const char*) BifConst::LogElasticSearch::index_name->Bytes(),
BifConst::LogElasticSearch::index_name->Len());
buffer.AddRaw("\",\"_type\":\"", 11);
buffer.AddN((const char*) BifConst::LogElasticSearch::type_prefix->Bytes(),
BifConst::LogElasticSearch::type_prefix->Len());
buffer.Add(Path());
buffer.AddRaw("\"}\n", 3);
buffer.AddRaw("{", 1);
for ( int i = 0; i < num_fields; i++ )
{
if ( i == 0 )
buffer.AddRaw("{", 1);
else if ( buffer.Bytes()[buffer.Len()] != ',' && vals[i]->present )
if ( i > 0 && buffer.Bytes()[buffer.Len()] != ',' && vals[i]->present )
buffer.AddRaw(",", 1);
AddFieldToBuffer(vals[i], fields[i]);
}
buffer.AddRaw("}\n", 2);
counter++;
if ( counter >= BifConst::LogElasticSearch::batch_size )
if ( counter >= BifConst::LogElasticSearch::max_batch_size ||
uint(buffer.Len()) >= BifConst::LogElasticSearch::max_byte_size )
BatchIndex();
return true;
@ -219,6 +215,7 @@ bool ElasticSearch::DoWrite(int num_fields, const Field* const * fields,
bool ElasticSearch::DoRotate(string rotated_path, double open, double close, bool terminating)
{
//TODO: Determine what, if anything, needs to be done here.
return true;
}