mirror of
https://github.com/zeek/zeek.git
synced 2025-10-17 05:58:20 +00:00
Bringing elasticsearch branch up to date with master.
This commit is contained in:
parent
601d1cf37e
commit
84e91b8b8d
5 changed files with 167 additions and 83 deletions
|
@ -2,4 +2,5 @@
|
||||||
@load ./postprocessors
|
@load ./postprocessors
|
||||||
@load ./writers/ascii
|
@load ./writers/ascii
|
||||||
@load ./writers/dataseries
|
@load ./writers/dataseries
|
||||||
@load ./writers/elasticsearch@load ./writers/none
|
@load ./writers/elasticsearch
|
||||||
|
@load ./writers/none
|
||||||
|
|
|
@ -11,7 +11,7 @@ export {
|
||||||
const server_port = 9200 &redef;
|
const server_port = 9200 &redef;
|
||||||
|
|
||||||
## Name of the ES index
|
## Name of the ES index
|
||||||
const index_name = "bro" &redef;
|
const index_prefix = "bro" &redef;
|
||||||
|
|
||||||
## The ES type prefix comes before the name of the related log.
|
## The ES type prefix comes before the name of the related log.
|
||||||
## e.g. prefix = "bro_" would create types of bro_dns, bro_software, etc.
|
## e.g. prefix = "bro_" would create types of bro_dns, bro_software, etc.
|
||||||
|
|
|
@ -89,7 +89,7 @@ module LogElasticSearch;
|
||||||
const cluster_name: string;
|
const cluster_name: string;
|
||||||
const server_host: string;
|
const server_host: string;
|
||||||
const server_port: count;
|
const server_port: count;
|
||||||
const index_name: string;
|
const index_prefix: string;
|
||||||
const type_prefix: string;
|
const type_prefix: string;
|
||||||
const max_batch_size: count;
|
const max_batch_size: count;
|
||||||
const max_batch_interval: interval;
|
const max_batch_interval: interval;
|
||||||
|
|
|
@ -30,8 +30,17 @@ ElasticSearch::ElasticSearch(WriterFrontend* frontend) : WriterBackend(frontend)
|
||||||
memcpy(cluster_name, BifConst::LogElasticSearch::cluster_name->Bytes(), cluster_name_len);
|
memcpy(cluster_name, BifConst::LogElasticSearch::cluster_name->Bytes(), cluster_name_len);
|
||||||
cluster_name[cluster_name_len] = 0;
|
cluster_name[cluster_name_len] = 0;
|
||||||
|
|
||||||
|
index_prefix = string((const char*) BifConst::LogElasticSearch::index_prefix->Bytes(), BifConst::LogElasticSearch::index_prefix->Len());
|
||||||
|
|
||||||
|
es_server = string(Fmt("http://%s:%d", BifConst::LogElasticSearch::server_host->Bytes(),
|
||||||
|
(int) BifConst::LogElasticSearch::server_port));
|
||||||
|
bulk_url = string(Fmt("%s/_bulk", es_server.c_str()));
|
||||||
|
|
||||||
|
http_headers = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8");
|
||||||
buffer.Clear();
|
buffer.Clear();
|
||||||
counter = 0;
|
counter = 0;
|
||||||
|
current_index = string();
|
||||||
|
prev_index = string();
|
||||||
last_send = current_time();
|
last_send = current_time();
|
||||||
|
|
||||||
curl_handle = HTTPSetup();
|
curl_handle = HTTPSetup();
|
||||||
|
@ -42,67 +51,84 @@ ElasticSearch::~ElasticSearch()
|
||||||
delete [] cluster_name;
|
delete [] cluster_name;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ElasticSearch::DoInit(string path, int num_fields, const Field* const * fields)
|
bool ElasticSearch::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const* fields)
|
||||||
{
|
{
|
||||||
//TODO: Determine what, if anything, needs to be done here.
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ElasticSearch::DoFlush()
|
bool ElasticSearch::DoFlush()
|
||||||
{
|
{
|
||||||
|
BatchIndex();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ElasticSearch::DoFinish()
|
bool ElasticSearch::DoFinish()
|
||||||
{
|
{
|
||||||
BatchIndex();
|
BatchIndex();
|
||||||
|
curl_slist_free_all(http_headers);
|
||||||
curl_easy_cleanup(curl_handle);
|
curl_easy_cleanup(curl_handle);
|
||||||
return WriterBackend::DoFinish();
|
return WriterBackend::DoFinish();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ElasticSearch::BatchIndex()
|
bool ElasticSearch::BatchIndex()
|
||||||
{
|
{
|
||||||
HTTPSend();
|
curl_easy_reset(curl_handle);
|
||||||
|
curl_easy_setopt(curl_handle, CURLOPT_URL, bulk_url.c_str());
|
||||||
|
curl_easy_setopt(curl_handle, CURLOPT_POST, 1);
|
||||||
|
curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t)buffer.Len());
|
||||||
|
curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, buffer.Bytes());
|
||||||
|
HTTPSend(curl_handle);
|
||||||
|
|
||||||
buffer.Clear();
|
buffer.Clear();
|
||||||
counter = 0;
|
counter = 0;
|
||||||
last_send = current_time();
|
last_send = current_time();
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ElasticSearch::AddFieldValueToBuffer(Value* val, const Field* field)
|
bool ElasticSearch::AddValueToBuffer(ODesc* b, Value* val)
|
||||||
{
|
{
|
||||||
switch ( val->type )
|
switch ( val->type )
|
||||||
{
|
{
|
||||||
// ES treats 0 as false and any other value as true so bool types go here.
|
// ES treats 0 as false and any other value as true so bool types go here.
|
||||||
case TYPE_BOOL:
|
case TYPE_BOOL:
|
||||||
case TYPE_INT:
|
case TYPE_INT:
|
||||||
buffer.Add(val->val.int_val);
|
b->Add(val->val.int_val);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case TYPE_COUNT:
|
case TYPE_COUNT:
|
||||||
case TYPE_COUNTER:
|
case TYPE_COUNTER:
|
||||||
buffer.Add(val->val.uint_val);
|
{
|
||||||
|
// ElasticSearch doesn't seem to support unsigned 64bit ints.
|
||||||
|
if ( val->val.uint_val >= INT64_MAX )
|
||||||
|
{
|
||||||
|
Error(Fmt("count value too large: %" PRIu64, val->val.uint_val));
|
||||||
|
b->AddRaw("null", 4);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
b->Add(val->val.uint_val);
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
case TYPE_PORT:
|
case TYPE_PORT:
|
||||||
buffer.Add(val->val.port_val.port);
|
b->Add(val->val.port_val.port);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case TYPE_SUBNET:
|
case TYPE_SUBNET:
|
||||||
buffer.AddRaw("\"", 1);
|
b->AddRaw("\"", 1);
|
||||||
buffer.Add(Render(val->val.subnet_val));
|
b->Add(Render(val->val.subnet_val));
|
||||||
buffer.AddRaw("\"", 1);
|
b->AddRaw("\"", 1);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case TYPE_ADDR:
|
case TYPE_ADDR:
|
||||||
buffer.AddRaw("\"", 1);
|
b->AddRaw("\"", 1);
|
||||||
buffer.Add(Render(val->val.addr_val));
|
b->Add(Render(val->val.addr_val));
|
||||||
buffer.AddRaw("\"", 1);
|
b->AddRaw("\"", 1);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case TYPE_DOUBLE:
|
case TYPE_DOUBLE:
|
||||||
case TYPE_INTERVAL:
|
case TYPE_INTERVAL:
|
||||||
buffer.Add(val->val.double_val);
|
b->Add(val->val.double_val);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case TYPE_TIME:
|
case TYPE_TIME:
|
||||||
|
@ -113,10 +139,10 @@ bool ElasticSearch::AddFieldValueToBuffer(Value* val, const Field* field)
|
||||||
if ( ts >= INT64_MAX )
|
if ( ts >= INT64_MAX )
|
||||||
{
|
{
|
||||||
Error(Fmt("time value too large: %" PRIu64, ts));
|
Error(Fmt("time value too large: %" PRIu64, ts));
|
||||||
buffer.AddRaw("null", 4);
|
b->AddRaw("null", 4);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
buffer.Add(ts);
|
b->Add(ts);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -125,51 +151,48 @@ bool ElasticSearch::AddFieldValueToBuffer(Value* val, const Field* field)
|
||||||
case TYPE_FILE:
|
case TYPE_FILE:
|
||||||
case TYPE_FUNC:
|
case TYPE_FUNC:
|
||||||
{
|
{
|
||||||
buffer.AddRaw("\"", 1);
|
b->AddRaw("\"", 1);
|
||||||
for ( uint i = 0; i < val->val.string_val->size(); ++i )
|
for ( uint i = 0; i < val->val.string_val->size(); ++i )
|
||||||
{
|
{
|
||||||
char c = val->val.string_val->data()[i];
|
char c = val->val.string_val->data()[i];
|
||||||
// HTML entity encode special characters.
|
// 2byte Unicode escape special characters.
|
||||||
if ( c < 32 || c > 126 || c == '\n' || c == '"' || c == '\'' || c == '\\' || c == '&' )
|
if ( c < 32 || c > 126 || c == '\n' || c == '"' || c == '\'' || c == '\\' || c == '&' )
|
||||||
{
|
{
|
||||||
static const char hex_chars[] = "0123456789abcdef";
|
static const char hex_chars[] = "0123456789abcdef";
|
||||||
buffer.AddRaw("\\u00", 4);
|
b->AddRaw("\\u00", 4);
|
||||||
buffer.AddRaw(&hex_chars[(c & 0xf0) >> 4], 1);
|
b->AddRaw(&hex_chars[(c & 0xf0) >> 4], 1);
|
||||||
buffer.AddRaw(&hex_chars[c & 0x0f], 1);
|
b->AddRaw(&hex_chars[c & 0x0f], 1);
|
||||||
//buffer.AddRaw("&#//", 2);
|
|
||||||
//buffer.Add((uint8_t) c);
|
|
||||||
//buffer.AddRaw(";", 1);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
buffer.AddRaw(&c, 1);
|
b->AddRaw(&c, 1);
|
||||||
}
|
}
|
||||||
buffer.AddRaw("\"", 1);
|
b->AddRaw("\"", 1);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case TYPE_TABLE:
|
case TYPE_TABLE:
|
||||||
{
|
{
|
||||||
buffer.AddRaw("[", 1);
|
b->AddRaw("[", 1);
|
||||||
for ( int j = 0; j < val->val.set_val.size; j++ )
|
for ( int j = 0; j < val->val.set_val.size; j++ )
|
||||||
{
|
{
|
||||||
if ( j > 0 )
|
if ( j > 0 )
|
||||||
buffer.AddRaw(",", 1);
|
b->AddRaw(",", 1);
|
||||||
AddFieldValueToBuffer(val->val.set_val.vals[j], field);
|
AddValueToBuffer(b, val->val.set_val.vals[j]);
|
||||||
}
|
}
|
||||||
buffer.AddRaw("]", 1);
|
b->AddRaw("]", 1);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case TYPE_VECTOR:
|
case TYPE_VECTOR:
|
||||||
{
|
{
|
||||||
buffer.AddRaw("[", 1);
|
b->AddRaw("[", 1);
|
||||||
for ( int j = 0; j < val->val.vector_val.size; j++ )
|
for ( int j = 0; j < val->val.vector_val.size; j++ )
|
||||||
{
|
{
|
||||||
if ( j > 0 )
|
if ( j > 0 )
|
||||||
buffer.AddRaw(",", 1);
|
b->AddRaw(",", 1);
|
||||||
AddFieldValueToBuffer(val->val.vector_val.vals[j], field);
|
AddValueToBuffer(b, val->val.vector_val.vals[j]);
|
||||||
}
|
}
|
||||||
buffer.AddRaw("]", 1);
|
b->AddRaw("]", 1);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -179,38 +202,37 @@ bool ElasticSearch::AddFieldValueToBuffer(Value* val, const Field* field)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ElasticSearch::AddFieldToBuffer(Value* val, const Field* field)
|
bool ElasticSearch::AddFieldToBuffer(ODesc *b, Value* val, const Field* field)
|
||||||
{
|
{
|
||||||
if ( ! val->present )
|
if ( ! val->present )
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
buffer.AddRaw("\"", 1);
|
b->AddRaw("\"", 1);
|
||||||
buffer.Add(field->name);
|
b->Add(field->name);
|
||||||
buffer.AddRaw("\":", 2);
|
b->AddRaw("\":", 2);
|
||||||
AddFieldValueToBuffer(val, field);
|
AddValueToBuffer(b, val);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ElasticSearch::DoWrite(int num_fields, const Field* const * fields,
|
bool ElasticSearch::DoWrite(int num_fields, const Field* const * fields,
|
||||||
Value** vals)
|
Value** vals)
|
||||||
{
|
{
|
||||||
|
if ( current_index.empty() )
|
||||||
|
UpdateIndex(network_time, Info().rotation_interval, Info().rotation_base);
|
||||||
|
|
||||||
// Our action line looks like:
|
// Our action line looks like:
|
||||||
// {"index":{"_index":"$index_name","_type":"$type_prefix$path"}}\n
|
|
||||||
buffer.AddRaw("{\"index\":{\"_index\":\"", 20);
|
buffer.AddRaw("{\"index\":{\"_index\":\"", 20);
|
||||||
buffer.AddN((const char*) BifConst::LogElasticSearch::index_name->Bytes(),
|
buffer.Add(current_index);
|
||||||
BifConst::LogElasticSearch::index_name->Len());
|
|
||||||
buffer.AddRaw("\",\"_type\":\"", 11);
|
buffer.AddRaw("\",\"_type\":\"", 11);
|
||||||
buffer.AddN((const char*) BifConst::LogElasticSearch::type_prefix->Bytes(),
|
buffer.Add(Info().path);
|
||||||
BifConst::LogElasticSearch::type_prefix->Len());
|
buffer.AddRaw("\"}}\n", 4);
|
||||||
buffer.Add(Path());
|
|
||||||
buffer.AddRaw("\"}\n", 3);
|
|
||||||
|
|
||||||
buffer.AddRaw("{", 1);
|
buffer.AddRaw("{", 1);
|
||||||
for ( int i = 0; i < num_fields; i++ )
|
for ( int i = 0; i < num_fields; i++ )
|
||||||
{
|
{
|
||||||
if ( i > 0 && buffer.Bytes()[buffer.Len()] != ',' && vals[i]->present )
|
if ( i > 0 && buffer.Bytes()[buffer.Len()] != ',' && vals[i]->present )
|
||||||
buffer.AddRaw(",", 1);
|
buffer.AddRaw(",", 1);
|
||||||
AddFieldToBuffer(vals[i], fields[i]);
|
AddFieldToBuffer(&buffer, vals[i], fields[i]);
|
||||||
}
|
}
|
||||||
buffer.AddRaw("}\n", 2);
|
buffer.AddRaw("}\n", 2);
|
||||||
|
|
||||||
|
@ -221,10 +243,63 @@ bool ElasticSearch::DoWrite(int num_fields, const Field* const * fields,
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool ElasticSearch::UpdateIndex(double now, double rinterval, double rbase)
|
||||||
|
{
|
||||||
|
if ( rinterval == 0 )
|
||||||
|
{
|
||||||
|
// if logs aren't being rotated, don't use a rotation oriented index name.
|
||||||
|
current_index = index_prefix;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
double nr = calc_next_rotate(now, rinterval, rbase);
|
||||||
|
double interval_beginning = now - (rinterval - nr);
|
||||||
|
|
||||||
|
struct tm tm;
|
||||||
|
char buf[128];
|
||||||
|
time_t teatime = (time_t)interval_beginning;
|
||||||
|
gmtime_r(&teatime, &tm);
|
||||||
|
strftime(buf, sizeof(buf), "%Y%m%d%H%M", &tm);
|
||||||
|
|
||||||
|
prev_index = current_index;
|
||||||
|
current_index = index_prefix + "-" + buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
//printf("%s - prev:%s current:%s\n", Info().path.c_str(), prev_index.c_str(), current_index.c_str());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
bool ElasticSearch::DoRotate(string rotated_path, double open, double close, bool terminating)
|
bool ElasticSearch::DoRotate(string rotated_path, double open, double close, bool terminating)
|
||||||
{
|
{
|
||||||
//TODO: Determine what, if anything, needs to be done here.
|
// Update the currently used index to the new rotation interval.
|
||||||
|
UpdateIndex(close, Info().rotation_interval, Info().rotation_base);
|
||||||
|
|
||||||
|
// Only do this stuff if there was a previous index.
|
||||||
|
if ( ! prev_index.empty() )
|
||||||
|
{
|
||||||
|
// FIXME: I think this section is taking too long and causing the thread to die.
|
||||||
|
|
||||||
|
// Compress the previous index
|
||||||
|
//curl_easy_reset(curl_handle);
|
||||||
|
//curl_easy_setopt(curl_handle, CURLOPT_URL, Fmt("%s/%s/_settings", es_server.c_str(), prev_index.c_str()));
|
||||||
|
//curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, "PUT");
|
||||||
|
//curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, "{\"index\":{\"store.compress.stored\":\"true\"}}");
|
||||||
|
//curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) 42);
|
||||||
|
//HTTPSend(curl_handle);
|
||||||
|
|
||||||
|
// Optimize the previous index.
|
||||||
|
// TODO: make this into variables.
|
||||||
|
//curl_easy_reset(curl_handle);
|
||||||
|
//curl_easy_setopt(curl_handle, CURLOPT_URL, Fmt("%s/%s/_optimize?max_num_segments=1&wait_for_merge=false", es_server.c_str(), prev_index.c_str()));
|
||||||
|
//HTTPSend(curl_handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
//if ( ! FinishedRotation(current_index, prev_index, open, close, terminating) )
|
||||||
|
// {
|
||||||
|
// Error(Fmt("error rotating %s to %s", prev_index.c_str(), current_index.c_str()));
|
||||||
|
// }
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -237,7 +312,7 @@ bool ElasticSearch::DoSetBuf(bool enabled)
|
||||||
|
|
||||||
bool ElasticSearch::DoHeartbeat(double network_time, double current_time)
|
bool ElasticSearch::DoHeartbeat(double network_time, double current_time)
|
||||||
{
|
{
|
||||||
if ( last_send > 0 &&
|
if ( last_send > 0 && buffer.Len() > 0 &&
|
||||||
current_time-last_send > BifConst::LogElasticSearch::max_batch_interval )
|
current_time-last_send > BifConst::LogElasticSearch::max_batch_interval )
|
||||||
{
|
{
|
||||||
BatchIndex();
|
BatchIndex();
|
||||||
|
@ -247,31 +322,15 @@ bool ElasticSearch::DoHeartbeat(double network_time, double current_time)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// HTTP Functions start here.
|
|
||||||
|
|
||||||
CURL* ElasticSearch::HTTPSetup()
|
CURL* ElasticSearch::HTTPSetup()
|
||||||
{
|
{
|
||||||
const char *URL = fmt("http://%s:%d/_bulk", BifConst::LogElasticSearch::server_host->CheckString(),
|
CURL* handle = curl_easy_init();
|
||||||
(int) BifConst::LogElasticSearch::server_port);;
|
|
||||||
CURL* handle;
|
|
||||||
struct curl_slist *headers=NULL;
|
|
||||||
|
|
||||||
handle = curl_easy_init();
|
|
||||||
if ( ! handle )
|
if ( ! handle )
|
||||||
return handle;
|
{
|
||||||
|
Error("cURL did not initialize correctly.");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
//sprintf(URL, "http://%s:%d/_bulk", BifConst::LogElasticSearch::server_host->CheckString(), (int) BifConst::LogElasticSearch::server_port);
|
|
||||||
curl_easy_setopt(handle, CURLOPT_URL, URL);
|
|
||||||
|
|
||||||
headers = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8");
|
|
||||||
curl_easy_setopt(handle, CURLOPT_HTTPHEADER, headers);
|
|
||||||
|
|
||||||
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, &logging::writer::ElasticSearch::HTTPReceive); // This gets called with the result.
|
|
||||||
curl_easy_setopt(handle, CURLOPT_POST, 1); // All requests are POSTs
|
|
||||||
|
|
||||||
// HTTP 1.1 likes to use chunked encoded transfers, which aren't good for speed. The best (only?) way to disable that is to
|
|
||||||
// just use HTTP 1.0
|
|
||||||
curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0);
|
|
||||||
return handle;
|
return handle;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -281,14 +340,16 @@ bool ElasticSearch::HTTPReceive(void* ptr, int size, int nmemb, void* userdata)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ElasticSearch::HTTPSend()
|
bool ElasticSearch::HTTPSend(CURL *handle)
|
||||||
{
|
{
|
||||||
CURLcode return_code;
|
curl_easy_setopt(handle, CURLOPT_HTTPHEADER, http_headers);
|
||||||
|
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, &logging::writer::ElasticSearch::HTTPReceive); // This gets called with the result.
|
||||||
|
// HTTP 1.1 likes to use chunked encoded transfers, which aren't good for speed.
|
||||||
|
// The best (only?) way to disable that is to just use HTTP 1.0
|
||||||
|
curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0);
|
||||||
|
|
||||||
curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE_LARGE, buffer.Len());
|
CURLcode return_code = curl_easy_perform(handle);
|
||||||
curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, buffer.Bytes());
|
|
||||||
|
|
||||||
return_code = curl_easy_perform(curl_handle);
|
|
||||||
switch ( return_code )
|
switch ( return_code )
|
||||||
{
|
{
|
||||||
case CURLE_COULDNT_CONNECT:
|
case CURLE_COULDNT_CONNECT:
|
||||||
|
@ -296,6 +357,16 @@ bool ElasticSearch::HTTPSend()
|
||||||
case CURLE_WRITE_ERROR:
|
case CURLE_WRITE_ERROR:
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
case CURLE_OK:
|
||||||
|
{
|
||||||
|
uint http_code = 0;
|
||||||
|
curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_code);
|
||||||
|
if ( http_code != 200 )
|
||||||
|
Error(Fmt("Received a non-successful status code back from ElasticSearch server."));
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,8 +22,8 @@ public:
|
||||||
protected:
|
protected:
|
||||||
// Overidden from WriterBackend.
|
// Overidden from WriterBackend.
|
||||||
|
|
||||||
virtual bool DoInit(string path, int num_fields,
|
virtual bool DoInit(const WriterInfo& info, int num_fields,
|
||||||
const threading::Field* const * fields);
|
const threading::Field* const* fields);
|
||||||
|
|
||||||
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
|
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
|
||||||
threading::Value** vals);
|
threading::Value** vals);
|
||||||
|
@ -35,18 +35,22 @@ protected:
|
||||||
virtual bool DoHeartbeat(double network_time, double current_time);
|
virtual bool DoHeartbeat(double network_time, double current_time);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool AddFieldToBuffer(threading::Value* val, const threading::Field* field);
|
bool AddFieldToBuffer(ODesc *b, threading::Value* val, const threading::Field* field);
|
||||||
bool AddFieldValueToBuffer(threading::Value* val, const threading::Field* field);
|
bool AddValueToBuffer(ODesc *b, threading::Value* val);
|
||||||
bool BatchIndex();
|
bool BatchIndex();
|
||||||
|
bool SendMappings();
|
||||||
|
bool UpdateIndex(double now, double rinterval, double rbase);
|
||||||
|
|
||||||
CURL* HTTPSetup();
|
CURL* HTTPSetup();
|
||||||
bool HTTPReceive(void* ptr, int size, int nmemb, void* userdata);
|
bool HTTPReceive(void* ptr, int size, int nmemb, void* userdata);
|
||||||
bool HTTPSend();
|
bool HTTPSend(CURL *handle);
|
||||||
|
|
||||||
// Buffers, etc.
|
// Buffers, etc.
|
||||||
ODesc buffer;
|
ODesc buffer;
|
||||||
uint64 counter;
|
uint64 counter;
|
||||||
double last_send;
|
double last_send;
|
||||||
|
string current_index;
|
||||||
|
string prev_index;
|
||||||
|
|
||||||
CURL* curl_handle;
|
CURL* curl_handle;
|
||||||
|
|
||||||
|
@ -54,6 +58,14 @@ private:
|
||||||
char* cluster_name;
|
char* cluster_name;
|
||||||
int cluster_name_len;
|
int cluster_name_len;
|
||||||
|
|
||||||
|
string es_server;
|
||||||
|
string bulk_url;
|
||||||
|
|
||||||
|
struct curl_slist *http_headers;
|
||||||
|
|
||||||
|
string path;
|
||||||
|
string index_prefix;
|
||||||
|
|
||||||
uint64 batch_size;
|
uint64 batch_size;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue