mirror of
https://github.com/zeek/zeek.git
synced 2025-10-04 15:48:19 +00:00
Added sending messages to ElasticSearch over HTTP.
This commit is contained in:
parent
95f000738b
commit
7bee0b0d8e
11 changed files with 266 additions and 127 deletions
|
@ -122,6 +122,14 @@ if (LINTEL_FOUND AND DATASERIES_FOUND AND LIBXML2_FOUND)
|
||||||
list(APPEND OPTLIBS ${LibXML2_LIBRARIES})
|
list(APPEND OPTLIBS ${LibXML2_LIBRARIES})
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
|
set(USE_LIBCURL false)
|
||||||
|
find_package(CURL)
|
||||||
|
if (CURL_FOUND)
|
||||||
|
set(USE_LIBCURL true)
|
||||||
|
include_directories(BEFORE ${CURL_INCLUDE_DIR})
|
||||||
|
list(APPEND OPTLIBS ${CURL_LIBRARIES})
|
||||||
|
endif()
|
||||||
|
|
||||||
if (ENABLE_PERFTOOLS_DEBUG)
|
if (ENABLE_PERFTOOLS_DEBUG)
|
||||||
# Just a no op to prevent CMake from complaining about manually-specified
|
# Just a no op to prevent CMake from complaining about manually-specified
|
||||||
# ENABLE_PERFTOOLS_DEBUG not being used if google perftools weren't found
|
# ENABLE_PERFTOOLS_DEBUG not being used if google perftools weren't found
|
||||||
|
@ -209,11 +217,13 @@ message(
|
||||||
"\nBroccoli: ${INSTALL_BROCCOLI}"
|
"\nBroccoli: ${INSTALL_BROCCOLI}"
|
||||||
"\nBroctl: ${INSTALL_BROCTL}"
|
"\nBroctl: ${INSTALL_BROCTL}"
|
||||||
"\nAux. Tools: ${INSTALL_AUX_TOOLS}"
|
"\nAux. Tools: ${INSTALL_AUX_TOOLS}"
|
||||||
|
"\nElasticSearch: ${INSTALL_ELASTICSEARCH}"
|
||||||
"\n"
|
"\n"
|
||||||
"\nGeoIP: ${USE_GEOIP}"
|
"\nGeoIP: ${USE_GEOIP}"
|
||||||
"\nGoogle perftools: ${USE_PERFTOOLS}"
|
"\nGoogle perftools: ${USE_PERFTOOLS}"
|
||||||
"\n debugging: ${USE_PERFTOOLS_DEBUG}"
|
"\n debugging: ${USE_PERFTOOLS_DEBUG}"
|
||||||
"\nDataSeries: ${USE_DATASERIES}"
|
"\nDataSeries: ${USE_DATASERIES}"
|
||||||
|
"\nlibCURL: ${USE_LIBCURL}"
|
||||||
"\n"
|
"\n"
|
||||||
"\n================================================================\n"
|
"\n================================================================\n"
|
||||||
)
|
)
|
||||||
|
|
|
@ -117,6 +117,9 @@
|
||||||
/* Use the DataSeries writer. */
|
/* Use the DataSeries writer. */
|
||||||
#cmakedefine USE_DATASERIES
|
#cmakedefine USE_DATASERIES
|
||||||
|
|
||||||
|
/* Build the ElasticSearch writer. */
|
||||||
|
#cmakedefine INSTALL_ELASTICSEARCH
|
||||||
|
|
||||||
/* Version number of package */
|
/* Version number of package */
|
||||||
#define VERSION "@VERSION@"
|
#define VERSION "@VERSION@"
|
||||||
|
|
||||||
|
|
5
configure
vendored
5
configure
vendored
|
@ -35,6 +35,7 @@ Usage: $0 [OPTION]... [VAR=VALUE]...
|
||||||
--disable-auxtools don't build or install auxiliary tools
|
--disable-auxtools don't build or install auxiliary tools
|
||||||
--disable-python don't try to build python bindings for broccoli
|
--disable-python don't try to build python bindings for broccoli
|
||||||
--disable-ruby don't try to build ruby bindings for broccoli
|
--disable-ruby don't try to build ruby bindings for broccoli
|
||||||
|
--enable-elasticsearch build the elasticsearch writer
|
||||||
|
|
||||||
Required Packages in Non-Standard Locations:
|
Required Packages in Non-Standard Locations:
|
||||||
--with-openssl=PATH path to OpenSSL install root
|
--with-openssl=PATH path to OpenSSL install root
|
||||||
|
@ -98,6 +99,7 @@ append_cache_entry BRO_SCRIPT_INSTALL_PATH STRING $prefix/share/bro
|
||||||
append_cache_entry BRO_ETC_INSTALL_DIR PATH $prefix/etc
|
append_cache_entry BRO_ETC_INSTALL_DIR PATH $prefix/etc
|
||||||
append_cache_entry ENABLE_DEBUG BOOL false
|
append_cache_entry ENABLE_DEBUG BOOL false
|
||||||
append_cache_entry ENABLE_PERFTOOLS_DEBUG BOOL false
|
append_cache_entry ENABLE_PERFTOOLS_DEBUG BOOL false
|
||||||
|
append_cache_entry INSTALL_ELASTICSEARCH BOOL false
|
||||||
append_cache_entry BinPAC_SKIP_INSTALL BOOL true
|
append_cache_entry BinPAC_SKIP_INSTALL BOOL true
|
||||||
append_cache_entry BUILD_SHARED_LIBS BOOL true
|
append_cache_entry BUILD_SHARED_LIBS BOOL true
|
||||||
append_cache_entry INSTALL_AUX_TOOLS BOOL true
|
append_cache_entry INSTALL_AUX_TOOLS BOOL true
|
||||||
|
@ -156,6 +158,9 @@ while [ $# -ne 0 ]; do
|
||||||
--disable-auxtools)
|
--disable-auxtools)
|
||||||
append_cache_entry INSTALL_AUX_TOOLS BOOL false
|
append_cache_entry INSTALL_AUX_TOOLS BOOL false
|
||||||
;;
|
;;
|
||||||
|
--enable-elasticsearch)
|
||||||
|
append_cache_entry INSTALL_ELASTICSEARCH BOOL true
|
||||||
|
;;
|
||||||
--disable-python)
|
--disable-python)
|
||||||
append_cache_entry DISABLE_PYTHON_BINDINGS BOOL true
|
append_cache_entry DISABLE_PYTHON_BINDINGS BOOL true
|
||||||
;;
|
;;
|
||||||
|
|
|
@ -2,3 +2,4 @@
|
||||||
@load ./postprocessors
|
@load ./postprocessors
|
||||||
@load ./writers/ascii
|
@load ./writers/ascii
|
||||||
@load ./writers/dataseries
|
@load ./writers/dataseries
|
||||||
|
@load ./writers/elasticsearch
|
25
scripts/base/frameworks/logging/writers/elasticsearch.bro
Normal file
25
scripts/base/frameworks/logging/writers/elasticsearch.bro
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
module LogElasticSearch;
|
||||||
|
|
||||||
|
export {
|
||||||
|
## Name of the ES cluster
|
||||||
|
const cluster_name = "elasticsearch" &redef;
|
||||||
|
|
||||||
|
## ES Server
|
||||||
|
const server_host = "127.0.0.1" &redef;
|
||||||
|
|
||||||
|
## ES Port
|
||||||
|
const server_port = 9200 &redef;
|
||||||
|
|
||||||
|
## Name of the ES index
|
||||||
|
const index_name = "bro-logs" &redef;
|
||||||
|
|
||||||
|
## The ES type prefix comes before the name of the related log.
|
||||||
|
## e.g. prefix = "bro_" would create types of bro_dns, bro_software, etc.
|
||||||
|
const type_prefix = "" &redef;
|
||||||
|
|
||||||
|
## The batch size is the number of messages that will be queued up before
|
||||||
|
## they are sent to be bulk indexed.
|
||||||
|
## Note: this is mainly a memory usage parameter.
|
||||||
|
const batch_size = 10000 &redef;
|
||||||
|
}
|
||||||
|
|
|
@ -419,6 +419,7 @@ set(bro_SRCS
|
||||||
logging/WriterFrontend.cc
|
logging/WriterFrontend.cc
|
||||||
logging/writers/Ascii.cc
|
logging/writers/Ascii.cc
|
||||||
logging/writers/DataSeries.cc
|
logging/writers/DataSeries.cc
|
||||||
|
logging/writers/ElasticSearch.cc
|
||||||
logging/writers/None.cc
|
logging/writers/None.cc
|
||||||
|
|
||||||
input/Manager.cc
|
input/Manager.cc
|
||||||
|
|
|
@ -81,3 +81,14 @@ const extent_size: count;
|
||||||
const dump_schema: bool;
|
const dump_schema: bool;
|
||||||
const use_integer_for_time: bool;
|
const use_integer_for_time: bool;
|
||||||
const num_threads: count;
|
const num_threads: count;
|
||||||
|
|
||||||
|
# Options for the ElasticSearch writer.
|
||||||
|
|
||||||
|
module LogElasticSearch;
|
||||||
|
|
||||||
|
const cluster_name: string;
|
||||||
|
const server_host: string;
|
||||||
|
const server_port: count;
|
||||||
|
const index_name: string;
|
||||||
|
const type_prefix: string;
|
||||||
|
const batch_size: count;
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
#include "writers/Ascii.h"
|
#include "writers/Ascii.h"
|
||||||
#include "writers/None.h"
|
#include "writers/None.h"
|
||||||
|
|
||||||
#ifdef USE_ELASTICSEARCH
|
#ifdef INSTALL_ELASTICSEARCH
|
||||||
#include "writers/ElasticSearch.h"
|
#include "writers/ElasticSearch.h"
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
@ -40,8 +40,8 @@ WriterDefinition log_writers[] = {
|
||||||
{ BifEnum::Log::WRITER_NONE, "None", 0, writer::None::Instantiate },
|
{ BifEnum::Log::WRITER_NONE, "None", 0, writer::None::Instantiate },
|
||||||
{ BifEnum::Log::WRITER_ASCII, "Ascii", 0, writer::Ascii::Instantiate },
|
{ BifEnum::Log::WRITER_ASCII, "Ascii", 0, writer::Ascii::Instantiate },
|
||||||
|
|
||||||
#ifdef USE_ELASTICSEARCH
|
#ifdef INSTALL_ELASTICSEARCH
|
||||||
{ BifEnum::Log::WRITER_ASCII, "ElasticSearch", 0, writer::ElasticSearch::Instantiate },
|
{ BifEnum::Log::WRITER_ELASTICSEARCH, "ElasticSearch", 0, writer::ElasticSearch::Instantiate },
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifdef USE_DATASERIES
|
#ifdef USE_DATASERIES
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
#include "config.h"
|
#include "config.h"
|
||||||
|
|
||||||
#ifdef USE_ELASTICSEARCH
|
#ifdef INSTALL_ELASTICSEARCH
|
||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
|
@ -12,6 +12,9 @@
|
||||||
#include "NetVar.h"
|
#include "NetVar.h"
|
||||||
#include "threading/SerialTypes.h"
|
#include "threading/SerialTypes.h"
|
||||||
|
|
||||||
|
#include <curl/curl.h>
|
||||||
|
#include <curl/easy.h>
|
||||||
|
|
||||||
#include "ElasticSearch.h"
|
#include "ElasticSearch.h"
|
||||||
|
|
||||||
using namespace logging;
|
using namespace logging;
|
||||||
|
@ -24,28 +27,35 @@ using threading::Field;
|
||||||
ElasticSearch::ElasticSearch(WriterFrontend* frontend) : WriterBackend(frontend)
|
ElasticSearch::ElasticSearch(WriterFrontend* frontend) : WriterBackend(frontend)
|
||||||
{
|
{
|
||||||
cluster_name_len = BifConst::LogElasticSearch::cluster_name->Len();
|
cluster_name_len = BifConst::LogElasticSearch::cluster_name->Len();
|
||||||
cluster_name = new char[cluster_name_len];
|
cluster_name = new char[cluster_name_len + 1];
|
||||||
memcpy(cluster_name, BifConst::LogElasticSearch::cluster_name->Bytes(), cluster_name_len);
|
memcpy(cluster_name, BifConst::LogElasticSearch::cluster_name->Bytes(), cluster_name_len);
|
||||||
|
cluster_name[cluster_name_len] = 0;
|
||||||
|
|
||||||
server_host_len = BifConst::LogElasticSearch::server_host->Len();
|
server_host_len = BifConst::LogElasticSearch::server_host->Len();
|
||||||
server_host = new char[server_host_len];
|
server_host = new char[server_host_len + 1];
|
||||||
memcpy(server_host, BifConst::LogElasticSearch::server_host->Bytes(), server_host_len);
|
memcpy(server_host, BifConst::LogElasticSearch::server_host->Bytes(), server_host_len);
|
||||||
|
server_host[server_host_len] = 0;
|
||||||
|
|
||||||
index_name_len = BifConst::LogElasticSearch::index_name->Len();
|
index_name_len = BifConst::LogElasticSearch::index_name->Len();
|
||||||
index_name = new char[index_name_len];
|
index_name = new char[index_name_len + 1];
|
||||||
memcpy(index_name, BifConst::LogElasticSearch::index_name->Bytes(), index_name_len);
|
memcpy(index_name, BifConst::LogElasticSearch::index_name->Bytes(), index_name_len);
|
||||||
|
index_name[index_name_len] = 0;
|
||||||
|
|
||||||
type_prefix_len = BifConst::LogElasticSearch::type_prefix->Len();
|
type_prefix_len = BifConst::LogElasticSearch::type_prefix->Len();
|
||||||
type_prefix = new char[type_prefix_len];
|
type_prefix = new char[type_prefix_len + 1];
|
||||||
memcpy(type_prefix, BifConst::LogElasticSearch::type_prefix->Bytes(), type_prefix_len);
|
memcpy(type_prefix, BifConst::LogElasticSearch::type_prefix->Bytes(), type_prefix_len);
|
||||||
|
type_prefix[type_prefix_len] = 0;
|
||||||
|
|
||||||
server_port = BifConst::LogElasticSearch::server_port;
|
server_port = BifConst::LogElasticSearch::server_port;
|
||||||
batch_size = BifConst::LogElasticSearch::batch_size;
|
batch_size = BifConst::LogElasticSearch::batch_size;
|
||||||
|
|
||||||
buffer = safe_malloc(MAX_EVENT_SIZE * batch_size);
|
buffer = (char *)safe_malloc(MAX_EVENT_SIZE * batch_size);
|
||||||
current_offset = 0;
|
current_offset = 0;
|
||||||
buffer[current_offset] = "\0";
|
buffer[current_offset] = 0;
|
||||||
counter = 0;
|
counter = 0;
|
||||||
|
|
||||||
|
curl_handle = HTTPSetup();
|
||||||
|
curl_result = new char[1024];
|
||||||
}
|
}
|
||||||
|
|
||||||
ElasticSearch::~ElasticSearch()
|
ElasticSearch::~ElasticSearch()
|
||||||
|
@ -74,58 +84,40 @@ bool ElasticSearch::DoFinish()
|
||||||
return WriterBackend::DoFinish();
|
return WriterBackend::DoFinish();
|
||||||
}
|
}
|
||||||
|
|
||||||
char* ElasticSearch::FormatField(const char* field_name, const char* field_value)
|
|
||||||
{
|
|
||||||
char* result = new char[MAX_EVENT_SIZE];
|
|
||||||
strcpy(result, "\"");
|
|
||||||
strcpy(result, field_name);
|
|
||||||
strcpy(result, "\":\"");
|
|
||||||
strcpy(result, field_value);
|
|
||||||
strcpy(result, "\"");
|
|
||||||
return result;
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
bool ElasticSearch::BatchIndex()
|
bool ElasticSearch::BatchIndex()
|
||||||
{
|
{
|
||||||
file = fopen("/tmp/batch.test", 'w');
|
return HTTPSend();
|
||||||
fwrite(buffer, current_offset, 1, file);
|
|
||||||
fclose(file);
|
|
||||||
file = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
char* ElasticSearch::AddFieldToBuffer(Value* val, const Field* field)
|
char* ElasticSearch::FieldToString(Value* val, const Field* field)
|
||||||
{
|
{
|
||||||
if ( ! val->present )
|
char* result = new char[MAX_EVENT_SIZE];
|
||||||
{
|
|
||||||
return "";
|
|
||||||
}
|
|
||||||
|
|
||||||
switch ( val->type ) {
|
switch ( val->type ) {
|
||||||
|
|
||||||
|
// ElasticSearch defines bools as: 0 == false, everything else == true. So we treat it as an int.
|
||||||
case TYPE_BOOL:
|
case TYPE_BOOL:
|
||||||
return FormatField(field->name, val->val.int_val ? "T" : "F");
|
|
||||||
|
|
||||||
case TYPE_INT:
|
case TYPE_INT:
|
||||||
return FormatField(field->name, val->val.int_val);
|
sprintf(result, "%d", (int) val->val.int_val); return result;
|
||||||
|
|
||||||
case TYPE_COUNT:
|
case TYPE_COUNT:
|
||||||
case TYPE_COUNTER:
|
case TYPE_COUNTER:
|
||||||
return FormatField(field->name, val->val.uint_val);
|
sprintf(result, "%d", (int) val->val.uint_val); return result;
|
||||||
|
|
||||||
case TYPE_PORT:
|
case TYPE_PORT:
|
||||||
return FormatField(field->name, val->val.port_val.port);
|
sprintf(result, "%d", (int) val->val.port_val.port); return result;
|
||||||
|
|
||||||
case TYPE_SUBNET:
|
case TYPE_SUBNET:
|
||||||
return FormatField(field->name, Render(val->val.subnet_val));
|
sprintf(result, "\"%s\"", Render(val->val.subnet_val).c_str()); return result;
|
||||||
|
|
||||||
case TYPE_ADDR:
|
case TYPE_ADDR:
|
||||||
return FormatField(field->name, Render(val->val.addr_val));
|
sprintf(result, "\"%s\"", Render(val->val.addr_val).c_str()); return result;
|
||||||
|
|
||||||
case TYPE_INTERVAL:
|
case TYPE_INTERVAL:
|
||||||
case TYPE_TIME:
|
case TYPE_TIME:
|
||||||
|
sprintf(result, "\"%d\"", (int) (val->val.double_val * 1000)); return result;
|
||||||
case TYPE_DOUBLE:
|
case TYPE_DOUBLE:
|
||||||
return FormatField(field->name, val->val.double_val);
|
sprintf(result, "\"%s\"", Render(val->val.double_val).c_str()); return result;
|
||||||
|
|
||||||
case TYPE_ENUM:
|
case TYPE_ENUM:
|
||||||
case TYPE_STRING:
|
case TYPE_STRING:
|
||||||
|
@ -136,93 +128,123 @@ char* ElasticSearch::AddFieldToBuffer(Value* val, const Field* field)
|
||||||
const char* data = val->val.string_val->data();
|
const char* data = val->val.string_val->data();
|
||||||
|
|
||||||
if ( ! size )
|
if ( ! size )
|
||||||
return "";
|
return 0;
|
||||||
return FormatField(field->name, val->val.string_val->data());
|
sprintf(result, "\"%s\"", data); return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
case TYPE_TABLE:
|
case TYPE_TABLE:
|
||||||
{
|
{
|
||||||
if ( ! val->val.set_val.size )
|
|
||||||
return "";
|
|
||||||
|
|
||||||
char* tmp = new char[MAX_EVENT_SIZE];
|
char* tmp = new char[MAX_EVENT_SIZE];
|
||||||
|
int tmp_offset = 0;
|
||||||
strcpy(tmp, "{");
|
strcpy(tmp, "{");
|
||||||
|
tmp_offset = 1;
|
||||||
|
bool result_seen = false;
|
||||||
for ( int j = 0; j < val->val.set_val.size; j++ )
|
for ( int j = 0; j < val->val.set_val.size; j++ )
|
||||||
{
|
{
|
||||||
char* result = AddFieldToBuffer(val->val.set_val.vals[j], field);
|
char* sub_field = FieldToString(val->val.set_val.vals[j], field);
|
||||||
bool resultSeen = false;
|
if ( sub_field ){
|
||||||
if ( result ){
|
|
||||||
if ( resultSeen )
|
if ( result_seen ){
|
||||||
strcpy(tmp, ",");
|
strcpy(tmp + tmp_offset, ",");
|
||||||
strcpy(tmp, result);
|
tmp_offset += 1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
result_seen = true;
|
||||||
|
|
||||||
|
sprintf(tmp + tmp_offset, "\"%s\":%s", field->name.c_str(), sub_field);
|
||||||
|
tmp_offset = strlen(tmp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return FormatField(field->name, tmp);
|
strcpy(tmp + tmp_offset, "}");
|
||||||
|
tmp_offset += 1;
|
||||||
|
sprintf(result, "%s", tmp);
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
case TYPE_VECTOR:
|
case TYPE_VECTOR:
|
||||||
{
|
{
|
||||||
if ( ! val->val.vector_val.size )
|
|
||||||
return "";
|
|
||||||
|
|
||||||
char* tmp = new char[MAX_EVENT_SIZE];
|
char* tmp = new char[MAX_EVENT_SIZE];
|
||||||
|
int tmp_offset = 0;
|
||||||
strcpy(tmp, "{");
|
strcpy(tmp, "{");
|
||||||
|
tmp_offset = 1;
|
||||||
|
bool result_seen = false;
|
||||||
for ( int j = 0; j < val->val.vector_val.size; j++ )
|
for ( int j = 0; j < val->val.vector_val.size; j++ )
|
||||||
{
|
{
|
||||||
char* result = AddFieldToBuffer(val->val.vector_val.vals[j], field);
|
char* sub_field = FieldToString(val->val.vector_val.vals[j], field);
|
||||||
bool resultSeen = false;
|
if ( sub_field ){
|
||||||
if ( result ){
|
|
||||||
if ( resultSeen )
|
if ( result_seen ){
|
||||||
strcpy(tmp, ",");
|
strcpy(tmp + tmp_offset, ",");
|
||||||
strcpy(tmp, result);
|
tmp_offset += 1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
result_seen = true;
|
||||||
|
|
||||||
|
sprintf(tmp + tmp_offset, "\"%s\":%s", field->name.c_str(), sub_field);
|
||||||
|
tmp_offset = strlen(tmp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return FormatField(field->name, tmp);
|
strcpy(tmp + tmp_offset, "}");
|
||||||
|
tmp_offset += 1;
|
||||||
|
sprintf(result, "%s", tmp);
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return "";
|
{
|
||||||
|
return (char *)"{}";
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
char* ElasticSearch::AddFieldToBuffer(Value* val, const Field* field)
|
||||||
|
{
|
||||||
|
if ( ! val->present )
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
char* result = new char[MAX_EVENT_SIZE];
|
||||||
|
sprintf(result, "\"%s\":%s", field->name.c_str(), FieldToString(val, field));
|
||||||
|
return result;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
bool ElasticSearch::DoWrite(int num_fields, const Field* const * fields,
|
bool ElasticSearch::DoWrite(int num_fields, const Field* const * fields,
|
||||||
Value** vals)
|
Value** vals)
|
||||||
{
|
{
|
||||||
// Our action line looks like:
|
// Our action line looks like:
|
||||||
// {"index":"$index_name","type":"$type_prefix$path"}\n{
|
// {"index":{"_index":"$index_name","_type":"$type_prefix$path"}}\n{
|
||||||
|
|
||||||
bool resultSeen = false;
|
bool resultSeen = false;
|
||||||
|
|
||||||
for ( int i = 0; i < num_fields; i++ )
|
for ( int i = 0; i < num_fields; i++ )
|
||||||
{
|
{
|
||||||
char* result = DoWriteOne(vals[i], fields[i]);
|
char* result = AddFieldToBuffer(vals[i], fields[i]);
|
||||||
if ( result ) {
|
if ( result ) {
|
||||||
if ( ! resultSeen ) {
|
if ( ! resultSeen ) {
|
||||||
strcpy(buffer[current_offset], "{\"index\":\"");
|
current_offset += sprintf(buffer + current_offset, "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s%s\"}\n{", index_name, type_prefix, Path().c_str());
|
||||||
strcat(buffer[current_offset], index_name);
|
|
||||||
strcat(buffer[current_offset], "\",\"type\":\"");
|
|
||||||
strcat(buffer[current_offset], type_prefix);
|
|
||||||
strcat(buffer[current_offset], Path());
|
|
||||||
strcat(buffer[current_offset], "\"}\n{");
|
|
||||||
current_offset = strlen(buffer);
|
|
||||||
resultSeen = true;
|
resultSeen = true;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
strcat(buffer[current_offset], ",");
|
strcat(buffer, ",");
|
||||||
current_offset += 1;
|
current_offset += 1;
|
||||||
}
|
}
|
||||||
strcat(buffer[current_offset], result);
|
strcat(buffer, result);
|
||||||
current_offset += strlen(result);
|
current_offset += strlen(result);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( resultSeen ) {
|
if ( resultSeen ) {
|
||||||
strcat(buffer[current_offset], "}\n");
|
strcat(buffer, "}\n");
|
||||||
current_offset += 2;
|
current_offset += 2;
|
||||||
counter += 1;
|
counter += 1;
|
||||||
if ( counter >= batch_size )
|
if ( counter >= batch_size ){
|
||||||
BatchIndex();
|
BatchIndex();
|
||||||
|
current_offset = 0;
|
||||||
|
buffer[current_offset] = 0;
|
||||||
|
counter = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -239,4 +261,55 @@ bool ElasticSearch::DoSetBuf(bool enabled)
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HTTP Functions start here.
|
||||||
|
|
||||||
|
CURL* ElasticSearch::HTTPSetup()
|
||||||
|
{
|
||||||
|
char URL[2048];
|
||||||
|
CURL* handle;
|
||||||
|
struct curl_slist *headers=NULL;
|
||||||
|
|
||||||
|
handle = curl_easy_init();
|
||||||
|
if ( ! handle )
|
||||||
|
return handle;
|
||||||
|
|
||||||
|
sprintf(URL, "http://%s:%d/_bulk", server_host, (int) server_port);
|
||||||
|
curl_easy_setopt(handle, CURLOPT_URL, URL);
|
||||||
|
|
||||||
|
headers = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8");
|
||||||
|
curl_easy_setopt(handle, CURLOPT_HTTPHEADER, headers);
|
||||||
|
|
||||||
|
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, &logging::writer::ElasticSearch::HTTPReceive); // This gets called with the result.
|
||||||
|
curl_easy_setopt(handle, CURLOPT_POST, 1); // All requests are POSTs
|
||||||
|
|
||||||
|
// HTTP 1.1 likes to use chunked encoded transfers, which aren't good for speed. The best (only?) way to disable that is to
|
||||||
|
// just use HTTP 1.0
|
||||||
|
curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0);
|
||||||
|
return handle;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ElasticSearch::HTTPReceive(void* ptr, int size, int nmemb, void* userdata){
|
||||||
|
//TODO: Do some verification on the result?
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ElasticSearch::HTTPSend(){
|
||||||
|
CURLcode return_code;
|
||||||
|
|
||||||
|
curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, curl_result);
|
||||||
|
curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, buffer);
|
||||||
|
curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE, current_offset);
|
||||||
|
|
||||||
|
return_code = curl_easy_perform(curl_handle);
|
||||||
|
switch(return_code) {
|
||||||
|
case CURLE_COULDNT_CONNECT:
|
||||||
|
case CURLE_COULDNT_RESOLVE_HOST:
|
||||||
|
case CURLE_WRITE_ERROR:
|
||||||
|
return false;
|
||||||
|
default:
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
#ifndef LOGGING_WRITER_ELASTICSEARCH_H
|
#ifndef LOGGING_WRITER_ELASTICSEARCH_H
|
||||||
#define LOGGING_WRITER_ELASTICSEARCH_H
|
#define LOGGING_WRITER_ELASTICSEARCH_H
|
||||||
|
|
||||||
|
#include <curl/curl.h>
|
||||||
#include "../WriterBackend.h"
|
#include "../WriterBackend.h"
|
||||||
|
|
||||||
namespace logging { namespace writer {
|
namespace logging { namespace writer {
|
||||||
|
@ -34,12 +35,20 @@ protected:
|
||||||
|
|
||||||
private:
|
private:
|
||||||
char* AddFieldToBuffer(threading::Value* val, const threading::Field* field);
|
char* AddFieldToBuffer(threading::Value* val, const threading::Field* field);
|
||||||
char* FormatField(const char* field_name, const char* field_value);
|
char* FieldToString(threading::Value* val, const threading::Field* field);
|
||||||
bool BatchIndex();
|
bool BatchIndex();
|
||||||
|
|
||||||
|
CURL* HTTPSetup();
|
||||||
|
bool HTTPReceive(void* ptr, int size, int nmemb, void* userdata);
|
||||||
|
bool HTTPSend();
|
||||||
|
|
||||||
|
// Buffers, etc.
|
||||||
char* buffer;
|
char* buffer;
|
||||||
int current_offset;
|
int current_offset;
|
||||||
int counter;
|
uint64 counter;
|
||||||
|
|
||||||
|
CURL* curl_handle;
|
||||||
|
char* curl_result;
|
||||||
|
|
||||||
// From scripts
|
// From scripts
|
||||||
char* cluster_name;
|
char* cluster_name;
|
||||||
|
|
|
@ -163,6 +163,7 @@ enum Writer %{
|
||||||
WRITER_NONE,
|
WRITER_NONE,
|
||||||
WRITER_ASCII,
|
WRITER_ASCII,
|
||||||
WRITER_DATASERIES,
|
WRITER_DATASERIES,
|
||||||
|
WRITER_ELASTICSEARCH,
|
||||||
%}
|
%}
|
||||||
|
|
||||||
enum ID %{
|
enum ID %{
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue