Reduce the batch size to 1000 and add a maximum time interval for batches.

This commit is contained in:
Seth Hall 2012-06-15 20:53:09 -04:00
parent 0bb8b69c95
commit a4df914ab7
4 changed files with 50 additions and 20 deletions

View file

@ -20,6 +20,11 @@ export {
## The batch size is the number of messages that will be queued up before ## The batch size is the number of messages that will be queued up before
## they are sent to be bulk indexed. ## they are sent to be bulk indexed.
## Note: this is mainly a memory usage parameter. ## Note: this is mainly a memory usage parameter.
const batch_size = 10000 &redef; const batch_size = 1000 &redef;
## The maximum amount of wall-clock time that is allowed to pass without
## finishing a bulk log send. This represents the maximum delay you
## would like to have with your logs before they show up in ElasticSearch.
const max_batch_interval = 1min &redef;
} }

View file

@ -92,3 +92,4 @@ const server_port: count;
const index_name: string; const index_name: string;
const type_prefix: string; const type_prefix: string;
const batch_size: count; const batch_size: count;
const max_batch_interval: interval;

View file

@ -32,6 +32,7 @@ ElasticSearch::ElasticSearch(WriterFrontend* frontend) : WriterBackend(frontend)
buffer.Clear(); buffer.Clear();
counter = 0; counter = 0;
last_send = current_time();
curl_handle = HTTPSetup(); curl_handle = HTTPSetup();
curl_result = new char[1024]; curl_result = new char[1024];
@ -59,11 +60,20 @@ bool ElasticSearch::DoFinish()
return WriterBackend::DoFinish(); return WriterBackend::DoFinish();
} }
bool ElasticSearch::BatchIndex()
{
HTTPSend();
buffer.Clear();
counter = 0;
last_send = current_time();
return true;
}
bool ElasticSearch::AddFieldValueToBuffer(Value* val, const Field* field) bool ElasticSearch::AddFieldValueToBuffer(Value* val, const Field* field)
{ {
switch ( val->type ) switch ( val->type )
{ {
// ElasticSearch defines bools as: 0 == false, everything else == true. So we treat it as an int. // ES treats 0 as false and any other value as true so bool types go here.
case TYPE_BOOL: case TYPE_BOOL:
case TYPE_INT: case TYPE_INT:
buffer.Add(val->val.int_val); buffer.Add(val->val.int_val);
@ -197,11 +207,8 @@ bool ElasticSearch::DoWrite(int num_fields, const Field* const * fields,
counter++; counter++;
if ( counter >= BifConst::LogElasticSearch::batch_size ) if ( counter >= BifConst::LogElasticSearch::batch_size )
{ BatchIndex();
HTTPSend();
buffer.Clear();
counter = 0;
}
return true; return true;
} }
@ -217,6 +224,18 @@ bool ElasticSearch::DoSetBuf(bool enabled)
return true; return true;
} }
bool ElasticSearch::DoHeartbeat(double network_time, double current_time)
{
if ( last_send > 0 &&
current_time-last_send > BifConst::LogElasticSearch::max_batch_interval )
{
BatchIndex();
}
return true;
}
// HTTP Functions start here. // HTTP Functions start here.
CURL* ElasticSearch::HTTPSetup() CURL* ElasticSearch::HTTPSetup()
@ -251,7 +270,8 @@ bool ElasticSearch::HTTPReceive(void* ptr, int size, int nmemb, void* userdata)
return true; return true;
} }
bool ElasticSearch::HTTPSend(){ bool ElasticSearch::HTTPSend()
{
CURLcode return_code; CURLcode return_code;
curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, curl_result); curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, curl_result);
@ -259,7 +279,8 @@ bool ElasticSearch::HTTPSend(){
curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE, buffer.Len()); curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE, buffer.Len());
return_code = curl_easy_perform(curl_handle); return_code = curl_easy_perform(curl_handle);
switch(return_code) { switch ( return_code )
{
case CURLE_COULDNT_CONNECT: case CURLE_COULDNT_CONNECT:
case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_WRITE_ERROR: case CURLE_WRITE_ERROR:
@ -268,6 +289,6 @@ bool ElasticSearch::HTTPSend(){
default: default:
return true; return true;
} }
} }
#endif #endif

View file

@ -32,10 +32,12 @@ protected:
double close, bool terminating); double close, bool terminating);
virtual bool DoFlush(); virtual bool DoFlush();
virtual bool DoFinish(); virtual bool DoFinish();
virtual bool DoHeartbeat(double network_time, double current_time);
private: private:
bool AddFieldToBuffer(threading::Value* val, const threading::Field* field); bool AddFieldToBuffer(threading::Value* val, const threading::Field* field);
bool AddFieldValueToBuffer(threading::Value* val, const threading::Field* field); bool AddFieldValueToBuffer(threading::Value* val, const threading::Field* field);
bool BatchIndex();
CURL* HTTPSetup(); CURL* HTTPSetup();
bool HTTPReceive(void* ptr, int size, int nmemb, void* userdata); bool HTTPReceive(void* ptr, int size, int nmemb, void* userdata);
@ -44,6 +46,7 @@ private:
// Buffers, etc. // Buffers, etc.
ODesc buffer; ODesc buffer;
uint64 counter; uint64 counter;
double last_send;
CURL* curl_handle; CURL* curl_handle;
char* curl_result; char* curl_result;