mirror of
https://github.com/zeek/zeek.git
synced 2025-10-04 23:58:20 +00:00
Some better elasticsearch reliability.
- Added a configurable option for timing out ES HTTP requests. - Stop sending reporter messages after one message for one failure.
This commit is contained in:
parent
485e473561
commit
1fa182c169
4 changed files with 35 additions and 9 deletions
|
@ -17,6 +17,9 @@ export {
|
||||||
## e.g. prefix = "bro_" would create types of bro_dns, bro_software, etc.
|
## e.g. prefix = "bro_" would create types of bro_dns, bro_software, etc.
|
||||||
const type_prefix = "" &redef;
|
const type_prefix = "" &redef;
|
||||||
|
|
||||||
|
## The time before an ElasticSearch transfer will timeout.
|
||||||
|
const transfer_timeout = 2secs;
|
||||||
|
|
||||||
## The batch size is the number of messages that will be queued up before
|
## The batch size is the number of messages that will be queued up before
|
||||||
## they are sent to be bulk indexed.
|
## they are sent to be bulk indexed.
|
||||||
## Note: this is mainly a memory usage parameter.
|
## Note: this is mainly a memory usage parameter.
|
||||||
|
|
|
@ -91,6 +91,7 @@ const server_host: string;
|
||||||
const server_port: count;
|
const server_port: count;
|
||||||
const index_prefix: string;
|
const index_prefix: string;
|
||||||
const type_prefix: string;
|
const type_prefix: string;
|
||||||
|
const transfer_timeout: interval;
|
||||||
const max_batch_size: count;
|
const max_batch_size: count;
|
||||||
const max_batch_interval: interval;
|
const max_batch_interval: interval;
|
||||||
const max_byte_size: count;
|
const max_byte_size: count;
|
||||||
|
|
|
@ -42,6 +42,9 @@ ElasticSearch::ElasticSearch(WriterFrontend* frontend) : WriterBackend(frontend)
|
||||||
current_index = string();
|
current_index = string();
|
||||||
prev_index = string();
|
prev_index = string();
|
||||||
last_send = current_time();
|
last_send = current_time();
|
||||||
|
failing = false;
|
||||||
|
|
||||||
|
transfer_timeout = BifConst::LogElasticSearch::transfer_timeout * 1000;
|
||||||
|
|
||||||
curl_handle = HTTPSetup();
|
curl_handle = HTTPSetup();
|
||||||
}
|
}
|
||||||
|
@ -77,8 +80,9 @@ bool ElasticSearch::BatchIndex()
|
||||||
curl_easy_setopt(curl_handle, CURLOPT_POST, 1);
|
curl_easy_setopt(curl_handle, CURLOPT_POST, 1);
|
||||||
curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t)buffer.Len());
|
curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t)buffer.Len());
|
||||||
curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, buffer.Bytes());
|
curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, buffer.Bytes());
|
||||||
HTTPSend(curl_handle);
|
failing = ! HTTPSend(curl_handle);
|
||||||
|
|
||||||
|
// We are currently throwing the data out regardless of if the send failed. Fire and forget!
|
||||||
buffer.Clear();
|
buffer.Clear();
|
||||||
counter = 0;
|
counter = 0;
|
||||||
last_send = current_time();
|
last_send = current_time();
|
||||||
|
@ -348,6 +352,8 @@ bool ElasticSearch::HTTPSend(CURL *handle)
|
||||||
// The best (only?) way to disable that is to just use HTTP 1.0
|
// The best (only?) way to disable that is to just use HTTP 1.0
|
||||||
curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0);
|
curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0);
|
||||||
|
|
||||||
|
curl_easy_setopt(handle, CURLOPT_TIMEOUT_MS, transfer_timeout);
|
||||||
|
|
||||||
CURLcode return_code = curl_easy_perform(handle);
|
CURLcode return_code = curl_easy_perform(handle);
|
||||||
|
|
||||||
switch ( return_code )
|
switch ( return_code )
|
||||||
|
@ -355,21 +361,35 @@ bool ElasticSearch::HTTPSend(CURL *handle)
|
||||||
case CURLE_COULDNT_CONNECT:
|
case CURLE_COULDNT_CONNECT:
|
||||||
case CURLE_COULDNT_RESOLVE_HOST:
|
case CURLE_COULDNT_RESOLVE_HOST:
|
||||||
case CURLE_WRITE_ERROR:
|
case CURLE_WRITE_ERROR:
|
||||||
return false;
|
case CURLE_RECV_ERROR:
|
||||||
|
{
|
||||||
|
if ( ! failing )
|
||||||
|
Error(Fmt("ElasticSearch server may not be accessible."));
|
||||||
|
}
|
||||||
|
|
||||||
|
case CURLE_OPERATION_TIMEDOUT:
|
||||||
|
{
|
||||||
|
if ( ! failing )
|
||||||
|
Warning(Fmt("HTTP operation with elasticsearch server timed out at %" PRIu64 " msecs.", transfer_timeout));
|
||||||
|
}
|
||||||
|
|
||||||
case CURLE_OK:
|
case CURLE_OK:
|
||||||
{
|
{
|
||||||
uint http_code = 0;
|
uint http_code = 0;
|
||||||
curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_code);
|
curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_code);
|
||||||
if ( http_code != 200 )
|
if ( http_code == 200 )
|
||||||
Error(Fmt("Received a non-successful status code back from ElasticSearch server."));
|
// Hopefully everything goes through here.
|
||||||
|
return true;
|
||||||
return true;
|
else if ( ! failing )
|
||||||
|
Error(Fmt("Received a non-successful status code back from ElasticSearch server, check the elasticsearch server log."));
|
||||||
}
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return true;
|
{
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
// The "successful" return happens above
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -65,6 +65,8 @@ private:
|
||||||
|
|
||||||
string path;
|
string path;
|
||||||
string index_prefix;
|
string index_prefix;
|
||||||
|
uint64 transfer_timeout;
|
||||||
|
bool failing;
|
||||||
|
|
||||||
uint64 batch_size;
|
uint64 batch_size;
|
||||||
};
|
};
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue