Merge remote-tracking branch 'remotes/origin/topic/seth/elasticsearch' into topic/robin/master-test

I've only tested that it compiles, not whether it still works. The
fact that we don't have any tests for this makes me uneasy ...

* remotes/origin/topic/seth/elasticsearch: (35 commits)
  Some documentation updates for elasticsearch plugin.
  Temporarily removing the ES timeout because it works with signals and is incompatible with Bro threads.
  Changed ES index names to localtime and added a meta index.
  New script for easily duplicating logs to ElasticSearch.
  Some better elasticsearch reliability.
  Fixed small elasticsearch problem in configure output.
  Re-adding the needed call to FinishedRotation in the ES writer plugin.
  Tiny updates.
  Bringing elasticsearch branch up to date with master.
  Adding a define to make the stdint C macros available.
  Adding an extra header.
  Fixed a bug with messed up time value passing to elasticsearch.
  Small updates and a little standardization for config.h.in naming.
  Bug fixes.
  Bug fix and feature.
  Forgot to call the parent method for DoHeartBeat.
  Changed the escaping method.
  Flush logs to ES daemon as Bro is shutting down.
  Reduce the batch size to 1000 and add a maximum time interval for batches.
  Reworked bulk operation string construction to use ODesc and added json escaping.
  ...
This commit is contained in:
Robin Sommer 2012-07-20 07:43:05 -07:00
commit eef8b7d1c4
18 changed files with 750 additions and 4 deletions

View file

@ -124,6 +124,17 @@ if (LINTEL_FOUND AND DATASERIES_FOUND AND LIBXML2_FOUND)
list(APPEND OPTLIBS ${LibXML2_LIBRARIES}) list(APPEND OPTLIBS ${LibXML2_LIBRARIES})
endif() endif()
set(USE_ELASTICSEARCH false)
set(USE_CURL false)
find_package(CURL)
if (CURL_FOUND)
set(USE_ELASTICSEARCH true)
set(USE_CURL true)
include_directories(BEFORE ${CURL_INCLUDE_DIR})
list(APPEND OPTLIBS ${CURL_LIBRARIES})
endif()
if (ENABLE_PERFTOOLS_DEBUG) if (ENABLE_PERFTOOLS_DEBUG)
# Just a no op to prevent CMake from complaining about manually-specified # Just a no op to prevent CMake from complaining about manually-specified
# ENABLE_PERFTOOLS_DEBUG not being used if google perftools weren't found # ENABLE_PERFTOOLS_DEBUG not being used if google perftools weren't found
@ -215,7 +226,10 @@ message(
"\nGeoIP: ${USE_GEOIP}" "\nGeoIP: ${USE_GEOIP}"
"\nGoogle perftools: ${USE_PERFTOOLS}" "\nGoogle perftools: ${USE_PERFTOOLS}"
"\n debugging: ${USE_PERFTOOLS_DEBUG}" "\n debugging: ${USE_PERFTOOLS_DEBUG}"
"\ncURL: ${USE_CURL}"
"\n"
"\nDataSeries: ${USE_DATASERIES}" "\nDataSeries: ${USE_DATASERIES}"
"\nElasticSearch: ${USE_ELASTICSEARCH}"
"\n" "\n"
"\n================================================================\n" "\n================================================================\n"
) )

View file

@ -114,9 +114,15 @@
/* Analyze Mobile IPv6 traffic */ /* Analyze Mobile IPv6 traffic */
#cmakedefine ENABLE_MOBILE_IPV6 #cmakedefine ENABLE_MOBILE_IPV6
/* Use libCurl. */
#cmakedefine USE_CURL
/* Use the DataSeries writer. */ /* Use the DataSeries writer. */
#cmakedefine USE_DATASERIES #cmakedefine USE_DATASERIES
/* Use the ElasticSearch writer. */
#cmakedefine USE_ELASTICSEARCH
/* Version number of package */ /* Version number of package */
#define VERSION "@VERSION@" #define VERSION "@VERSION@"

View file

@ -0,0 +1,95 @@
========================================
Indexed Logging Output with ElasticSearch
========================================
.. rst-class:: opening
Bro's default ASCII log format is not exactly the most efficient
way for storing and searching large volumes of data. ElasticSearch
is a new and exciting technology for dealing with tons of data.
ElasticSearch is a search engine built on top of Apache's Lucene
project. It scales very well, both for distributed indexing and
distributed searching.
.. contents::
Installing ElasticSearch
------------------------
ElasticSearch requires a JRE to run. Please download the latest version
from: <http://www.elasticsearch.org/download/>. Once extracted, start
ElasticSearch with::
# ./bin/elasticsearch
Compiling Bro with ElasticSearch Support
----------------------------------------
First, ensure that you have libcurl installed the run configure.::
# ./configure
[...]
====================| Bro Build Summary |=====================
[...]
cURL: true
[...]
ElasticSearch: true
[...]
================================================================
Activating ElasticSearch
------------------------
The direct way to use ElasticSearch is to switch *all* log files over to
ElasticSearch. To do that, just add ``redef
Log::default_writer=Log::WRITER_ELASTICSEARCH;`` to your ``local.bro``.
For testing, you can also just pass that on the command line::
bro -r trace.pcap Log::default_writer=Log::WRITER_ELASTICSEARCH
With that, Bro will now write all its output into ElasticSearch. You can
inspect these using ElasticSearch's REST-ful interface. For more
information, see: <http://www.elasticsearch.org/guide/reference/api/>.
There is also a rudimentary web interface to ElasticSearch, available at:
<http://mobz.github.com/elasticsearch-head/>.
You can also switch only individual files over to ElasticSearch by adding
code like this to your ``local.bro``::
.. code::bro
event bro_init()
{
local f = Log::get_filter(Conn::LOG, "default"); # Get default filter for connection log.
f$writer = Log::WRITER_ELASTICSEARCH; # Change writer type.
Log::add_filter(Conn::LOG, f); # Replace filter with adapted version.
}
Configuring ElasticSearch
-------------------------
Bro's ElasticSearch writer comes with a few configuration options::
- cluster_name: Currently unused.
- server_host: Where to send the data. Default localhost.
- server_port: What port to send the data to. Default 9200.
- index_prefix: ElasticSearch indexes are like databases in a standard DB model.
This is the name of the index to which to send the data. Default bro.
- type_prefix: ElasticSearch types are like tables in a standard DB model. This is a prefix that gets prepended to Bro log names. Example: type_prefix = "bro_" would create types "bro_dns", "bro_http", etc. Default: none.
- batch_size: How many messages to buffer before sending to ElasticSearch. This is mainly a memory optimization - changing this doesn't seem to affect indexing performance that much. Default: 10,000.
TODO
----
Lots.
- Perform multicast discovery for server.
- Better error detection.
- Better defaults (don't index loaded-plugins, for instance).

View file

@ -42,6 +42,7 @@ rest_target(${psd} base/frameworks/logging/postprocessors/scp.bro)
rest_target(${psd} base/frameworks/logging/postprocessors/sftp.bro) rest_target(${psd} base/frameworks/logging/postprocessors/sftp.bro)
rest_target(${psd} base/frameworks/logging/writers/ascii.bro) rest_target(${psd} base/frameworks/logging/writers/ascii.bro)
rest_target(${psd} base/frameworks/logging/writers/dataseries.bro) rest_target(${psd} base/frameworks/logging/writers/dataseries.bro)
rest_target(${psd} base/frameworks/logging/writers/elasticsearch.bro)
rest_target(${psd} base/frameworks/logging/writers/none.bro) rest_target(${psd} base/frameworks/logging/writers/none.bro)
rest_target(${psd} base/frameworks/metrics/cluster.bro) rest_target(${psd} base/frameworks/metrics/cluster.bro)
rest_target(${psd} base/frameworks/metrics/main.bro) rest_target(${psd} base/frameworks/metrics/main.bro)
@ -145,6 +146,7 @@ rest_target(${psd} policy/protocols/ssl/known-certs.bro)
rest_target(${psd} policy/protocols/ssl/validate-certs.bro) rest_target(${psd} policy/protocols/ssl/validate-certs.bro)
rest_target(${psd} policy/tuning/defaults/packet-fragments.bro) rest_target(${psd} policy/tuning/defaults/packet-fragments.bro)
rest_target(${psd} policy/tuning/defaults/warnings.bro) rest_target(${psd} policy/tuning/defaults/warnings.bro)
rest_target(${psd} policy/tuning/logs-to-elasticsearch.bro)
rest_target(${psd} policy/tuning/track-all-assets.bro) rest_target(${psd} policy/tuning/track-all-assets.bro)
rest_target(${psd} site/local-manager.bro) rest_target(${psd} site/local-manager.bro)
rest_target(${psd} site/local-proxy.bro) rest_target(${psd} site/local-proxy.bro)

View file

@ -2,4 +2,5 @@
@load ./postprocessors @load ./postprocessors
@load ./writers/ascii @load ./writers/ascii
@load ./writers/dataseries @load ./writers/dataseries
@load ./writers/elasticsearch
@load ./writers/none @load ./writers/none

View file

@ -0,0 +1,46 @@
##! Log writer for sending logs to an ElasticSearch server.
##!
##! Note: This module is in testing and is not yet considered stable!
##!
##! There is one known memory issue. If your elasticsearch server is
##! running slowly and taking too long to return from bulk insert
##! requests, the message queue to the writer thread will continue
##! growing larger and larger giving the appearance of a memory leak.
module LogElasticSearch;
export {
## Name of the ES cluster
const cluster_name = "elasticsearch" &redef;
## ES Server
const server_host = "127.0.0.1" &redef;
## ES Port
const server_port = 9200 &redef;
## Name of the ES index
const index_prefix = "bro" &redef;
## The ES type prefix comes before the name of the related log.
## e.g. prefix = "bro_" would create types of bro_dns, bro_software, etc.
const type_prefix = "" &redef;
## The time before an ElasticSearch transfer will timeout.
## This is not working!
const transfer_timeout = 2secs;
## The batch size is the number of messages that will be queued up before
## they are sent to be bulk indexed.
const max_batch_size = 1000 &redef;
## The maximum amount of wall-clock time that is allowed to pass without
## finishing a bulk log send. This represents the maximum delay you
## would like to have with your logs before they are sent to ElasticSearch.
const max_batch_interval = 1min &redef;
## The maximum byte size for a buffered JSON string to send to the bulk
## insert API.
const max_byte_size = 1024 * 1024 &redef;
}

View file

@ -0,0 +1,45 @@
##! Load this script to enable global log output to an ElasticSearch database.
module LogElasticSearch;
export {
## An elasticsearch specific rotation interval.
const rotation_interval = 24hr &redef;
## Optionally ignore any :bro:enum:`Log::ID` from being sent to
## ElasticSearch with this script.
const excluded_log_ids: set[string] = set("Communication::LOG") &redef;
## If you want to explicitly only send certain :bro:enum:`Log::ID`
## streams, add them to this set. If the set remains empty, all will
## be sent. The :bro:id:`excluded_log_ids` option will remain in
## effect as well.
const send_logs: set[string] = set() &redef;
}
module Log;
event bro_init() &priority=-5
{
local my_filters: table[ID, string] of Filter = table();
for ( [id, name] in filters )
{
local filter = filters[id, name];
if ( fmt("%s", id) in LogElasticSearch::excluded_log_ids ||
(|LogElasticSearch::send_logs| > 0 && fmt("%s", id) !in LogElasticSearch::send_logs) )
next;
filter$name = cat(name, "-es");
filter$writer = Log::WRITER_ELASTICSEARCH;
filter$interv = LogElasticSearch::rotation_interval;
my_filters[id, name] = filter;
}
# This had to be done separately to avoid an ever growing filters list
# where the for loop would never end.
for ( [id, name] in my_filters )
{
Log::add_filter(id, filter);
}
}

View file

@ -60,4 +60,5 @@
@load tuning/defaults/__load__.bro @load tuning/defaults/__load__.bro
@load tuning/defaults/packet-fragments.bro @load tuning/defaults/packet-fragments.bro
@load tuning/defaults/warnings.bro @load tuning/defaults/warnings.bro
@load tuning/logs-to-elasticsearch.bro
@load tuning/track-all-assets.bro @load tuning/track-all-assets.bro

View file

@ -428,6 +428,7 @@ set(bro_SRCS
logging/WriterFrontend.cc logging/WriterFrontend.cc
logging/writers/Ascii.cc logging/writers/Ascii.cc
logging/writers/DataSeries.cc logging/writers/DataSeries.cc
logging/writers/ElasticSearch.cc
logging/writers/None.cc logging/writers/None.cc
input/Manager.cc input/Manager.cc

View file

@ -82,6 +82,20 @@ const dump_schema: bool;
const use_integer_for_time: bool; const use_integer_for_time: bool;
const num_threads: count; const num_threads: count;
# Options for the ElasticSearch writer.
module LogElasticSearch;
const cluster_name: string;
const server_host: string;
const server_port: count;
const index_prefix: string;
const type_prefix: string;
const transfer_timeout: interval;
const max_batch_size: count;
const max_batch_interval: interval;
const max_byte_size: count;
# Options for the None writer. # Options for the None writer.
module LogNone; module LogNone;

View file

@ -18,6 +18,10 @@
#include "writers/Ascii.h" #include "writers/Ascii.h"
#include "writers/None.h" #include "writers/None.h"
#ifdef USE_ELASTICSEARCH
#include "writers/ElasticSearch.h"
#endif
#ifdef USE_DATASERIES #ifdef USE_DATASERIES
#include "writers/DataSeries.h" #include "writers/DataSeries.h"
#endif #endif
@ -36,6 +40,11 @@ struct WriterDefinition {
WriterDefinition log_writers[] = { WriterDefinition log_writers[] = {
{ BifEnum::Log::WRITER_NONE, "None", 0, writer::None::Instantiate }, { BifEnum::Log::WRITER_NONE, "None", 0, writer::None::Instantiate },
{ BifEnum::Log::WRITER_ASCII, "Ascii", 0, writer::Ascii::Instantiate }, { BifEnum::Log::WRITER_ASCII, "Ascii", 0, writer::Ascii::Instantiate },
#ifdef USE_ELASTICSEARCH
{ BifEnum::Log::WRITER_ELASTICSEARCH, "ElasticSearch", 0, writer::ElasticSearch::Instantiate },
#endif
#ifdef USE_DATASERIES #ifdef USE_DATASERIES
{ BifEnum::Log::WRITER_DATASERIES, "DataSeries", 0, writer::DataSeries::Instantiate }, { BifEnum::Log::WRITER_DATASERIES, "DataSeries", 0, writer::DataSeries::Instantiate },
#endif #endif

View file

@ -0,0 +1,415 @@
// See the file "COPYING" in the main distribution directory for copyright.
//
// This is experimental code that is not yet ready for production usage.
//
#include "config.h"
#ifdef USE_ELASTICSEARCH
#include <string>
#include <errno.h>
#include "util.h"
#include "BroString.h"
#include "NetVar.h"
#include "threading/SerialTypes.h"
#include <curl/curl.h>
#include <curl/easy.h>
#include "ElasticSearch.h"
using namespace logging;
using namespace writer;
using threading::Value;
using threading::Field;
ElasticSearch::ElasticSearch(WriterFrontend* frontend) : WriterBackend(frontend)
{
cluster_name_len = BifConst::LogElasticSearch::cluster_name->Len();
cluster_name = new char[cluster_name_len + 1];
memcpy(cluster_name, BifConst::LogElasticSearch::cluster_name->Bytes(), cluster_name_len);
cluster_name[cluster_name_len] = 0;
index_prefix = string((const char*) BifConst::LogElasticSearch::index_prefix->Bytes(), BifConst::LogElasticSearch::index_prefix->Len());
es_server = string(Fmt("http://%s:%d", BifConst::LogElasticSearch::server_host->Bytes(),
(int) BifConst::LogElasticSearch::server_port));
bulk_url = string(Fmt("%s/_bulk", es_server.c_str()));
http_headers = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8");
buffer.Clear();
counter = 0;
current_index = string();
prev_index = string();
last_send = current_time();
failing = false;
transfer_timeout = BifConst::LogElasticSearch::transfer_timeout * 1000;
curl_handle = HTTPSetup();
}
ElasticSearch::~ElasticSearch()
{
delete [] cluster_name;
}
bool ElasticSearch::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const* fields)
{
return true;
}
bool ElasticSearch::DoFlush(double network_time)
{
BatchIndex();
return true;
}
bool ElasticSearch::DoFinish(double network_time)
{
BatchIndex();
curl_slist_free_all(http_headers);
curl_easy_cleanup(curl_handle);
return true;
}
bool ElasticSearch::BatchIndex()
{
curl_easy_reset(curl_handle);
curl_easy_setopt(curl_handle, CURLOPT_URL, bulk_url.c_str());
curl_easy_setopt(curl_handle, CURLOPT_POST, 1);
curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t)buffer.Len());
curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, buffer.Bytes());
failing = ! HTTPSend(curl_handle);
// We are currently throwing the data out regardless of if the send failed. Fire and forget!
buffer.Clear();
counter = 0;
last_send = current_time();
return true;
}
bool ElasticSearch::AddValueToBuffer(ODesc* b, Value* val)
{
switch ( val->type )
{
// ES treats 0 as false and any other value as true so bool types go here.
case TYPE_BOOL:
case TYPE_INT:
b->Add(val->val.int_val);
break;
case TYPE_COUNT:
case TYPE_COUNTER:
{
// ElasticSearch doesn't seem to support unsigned 64bit ints.
if ( val->val.uint_val >= INT64_MAX )
{
Error(Fmt("count value too large: %" PRIu64, val->val.uint_val));
b->AddRaw("null", 4);
}
else
b->Add(val->val.uint_val);
break;
}
case TYPE_PORT:
b->Add(val->val.port_val.port);
break;
case TYPE_SUBNET:
b->AddRaw("\"", 1);
b->Add(Render(val->val.subnet_val));
b->AddRaw("\"", 1);
break;
case TYPE_ADDR:
b->AddRaw("\"", 1);
b->Add(Render(val->val.addr_val));
b->AddRaw("\"", 1);
break;
case TYPE_DOUBLE:
case TYPE_INTERVAL:
b->Add(val->val.double_val);
break;
case TYPE_TIME:
{
// ElasticSearch uses milliseconds for timestamps and json only
// supports signed ints (uints can be too large).
uint64_t ts = (uint64_t) (val->val.double_val * 1000);
if ( ts >= INT64_MAX )
{
Error(Fmt("time value too large: %" PRIu64, ts));
b->AddRaw("null", 4);
}
else
b->Add(ts);
break;
}
case TYPE_ENUM:
case TYPE_STRING:
case TYPE_FILE:
case TYPE_FUNC:
{
b->AddRaw("\"", 1);
for ( int i = 0; i < val->val.string_val.length; ++i )
{
char c = val->val.string_val.data[i];
// 2byte Unicode escape special characters.
if ( c < 32 || c > 126 || c == '\n' || c == '"' || c == '\'' || c == '\\' || c == '&' )
{
static const char hex_chars[] = "0123456789abcdef";
b->AddRaw("\\u00", 4);
b->AddRaw(&hex_chars[(c & 0xf0) >> 4], 1);
b->AddRaw(&hex_chars[c & 0x0f], 1);
}
else
b->AddRaw(&c, 1);
}
b->AddRaw("\"", 1);
break;
}
case TYPE_TABLE:
{
b->AddRaw("[", 1);
for ( int j = 0; j < val->val.set_val.size; j++ )
{
if ( j > 0 )
b->AddRaw(",", 1);
AddValueToBuffer(b, val->val.set_val.vals[j]);
}
b->AddRaw("]", 1);
break;
}
case TYPE_VECTOR:
{
b->AddRaw("[", 1);
for ( int j = 0; j < val->val.vector_val.size; j++ )
{
if ( j > 0 )
b->AddRaw(",", 1);
AddValueToBuffer(b, val->val.vector_val.vals[j]);
}
b->AddRaw("]", 1);
break;
}
default:
return false;
}
return true;
}
bool ElasticSearch::AddFieldToBuffer(ODesc *b, Value* val, const Field* field)
{
if ( ! val->present )
return false;
b->AddRaw("\"", 1);
b->Add(field->name);
b->AddRaw("\":", 2);
AddValueToBuffer(b, val);
return true;
}
bool ElasticSearch::DoWrite(int num_fields, const Field* const * fields,
Value** vals)
{
if ( current_index.empty() )
UpdateIndex(network_time, Info().rotation_interval, Info().rotation_base);
// Our action line looks like:
buffer.AddRaw("{\"index\":{\"_index\":\"", 20);
buffer.Add(current_index);
buffer.AddRaw("\",\"_type\":\"", 11);
buffer.Add(Info().path);
buffer.AddRaw("\"}}\n", 4);
buffer.AddRaw("{", 1);
for ( int i = 0; i < num_fields; i++ )
{
if ( i > 0 && buffer.Bytes()[buffer.Len()] != ',' && vals[i]->present )
buffer.AddRaw(",", 1);
AddFieldToBuffer(&buffer, vals[i], fields[i]);
}
buffer.AddRaw("}\n", 2);
counter++;
if ( counter >= BifConst::LogElasticSearch::max_batch_size ||
uint(buffer.Len()) >= BifConst::LogElasticSearch::max_byte_size )
BatchIndex();
return true;
}
bool ElasticSearch::UpdateIndex(double now, double rinterval, double rbase)
{
if ( rinterval == 0 )
{
// if logs aren't being rotated, don't use a rotation oriented index name.
current_index = index_prefix;
}
else
{
double nr = calc_next_rotate(now, rinterval, rbase);
double interval_beginning = now - (rinterval - nr);
struct tm tm;
char buf[128];
time_t teatime = (time_t)interval_beginning;
localtime_r(&teatime, &tm);
strftime(buf, sizeof(buf), "%Y%m%d%H%M", &tm);
prev_index = current_index;
current_index = index_prefix + "-" + buf;
// Send some metadata about this index.
buffer.AddRaw("{\"index\":{\"_index\":\"@", 21);
buffer.Add(index_prefix);
buffer.AddRaw("-meta\",\"_type\":\"index\",\"_id\":\"", 30);
buffer.Add(current_index);
buffer.AddRaw("-", 1);
buffer.Add(Info().rotation_base);
buffer.AddRaw("-", 1);
buffer.Add(Info().rotation_interval);
buffer.AddRaw("\"}}\n{\"name\":\"", 13);
buffer.Add(current_index);
buffer.AddRaw("\",\"start\":", 10);
buffer.Add(interval_beginning);
buffer.AddRaw(",\"end\":", 7);
buffer.Add(interval_beginning+rinterval);
buffer.AddRaw("}\n", 2);
}
//printf("%s - prev:%s current:%s\n", Info().path.c_str(), prev_index.c_str(), current_index.c_str());
return true;
}
bool ElasticSearch::DoRotate(const char* rotated_path, double open, double close, bool terminating)
{
// Update the currently used index to the new rotation interval.
UpdateIndex(close, Info().rotation_interval, Info().rotation_base);
// Only do this stuff if there was a previous index.
if ( ! prev_index.empty() )
{
// FIXME: I think this section is taking too long and causing the thread to die.
// Compress the previous index
//curl_easy_reset(curl_handle);
//curl_easy_setopt(curl_handle, CURLOPT_URL, Fmt("%s/%s/_settings", es_server.c_str(), prev_index.c_str()));
//curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, "PUT");
//curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, "{\"index\":{\"store.compress.stored\":\"true\"}}");
//curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) 42);
//HTTPSend(curl_handle);
// Optimize the previous index.
// TODO: make this into variables.
//curl_easy_reset(curl_handle);
//curl_easy_setopt(curl_handle, CURLOPT_URL, Fmt("%s/%s/_optimize?max_num_segments=1&wait_for_merge=false", es_server.c_str(), prev_index.c_str()));
//HTTPSend(curl_handle);
}
if ( ! FinishedRotation(current_index.c_str(), prev_index.c_str(), open, close, terminating) )
{
Error(Fmt("error rotating %s to %s", prev_index.c_str(), current_index.c_str()));
}
return true;
}
bool ElasticSearch::DoSetBuf(bool enabled)
{
// Nothing to do.
return true;
}
bool ElasticSearch::DoHeartbeat(double network_time, double current_time)
{
if ( last_send > 0 && buffer.Len() > 0 &&
current_time-last_send > BifConst::LogElasticSearch::max_batch_interval )
{
BatchIndex();
}
return true;
}
CURL* ElasticSearch::HTTPSetup()
{
CURL* handle = curl_easy_init();
if ( ! handle )
{
Error("cURL did not initialize correctly.");
return 0;
}
return handle;
}
bool ElasticSearch::HTTPReceive(void* ptr, int size, int nmemb, void* userdata)
{
//TODO: Do some verification on the result?
return true;
}
bool ElasticSearch::HTTPSend(CURL *handle)
{
curl_easy_setopt(handle, CURLOPT_HTTPHEADER, http_headers);
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, &logging::writer::ElasticSearch::HTTPReceive); // This gets called with the result.
// HTTP 1.1 likes to use chunked encoded transfers, which aren't good for speed.
// The best (only?) way to disable that is to just use HTTP 1.0
curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0);
//curl_easy_setopt(handle, CURLOPT_TIMEOUT_MS, transfer_timeout);
CURLcode return_code = curl_easy_perform(handle);
switch ( return_code )
{
case CURLE_COULDNT_CONNECT:
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_WRITE_ERROR:
case CURLE_RECV_ERROR:
{
if ( ! failing )
Error(Fmt("ElasticSearch server may not be accessible."));
}
case CURLE_OPERATION_TIMEDOUT:
{
if ( ! failing )
Warning(Fmt("HTTP operation with elasticsearch server timed out at %" PRIu64 " msecs.", transfer_timeout));
}
case CURLE_OK:
{
uint http_code = 0;
curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_code);
if ( http_code == 200 )
// Hopefully everything goes through here.
return true;
else if ( ! failing )
Error(Fmt("Received a non-successful status code back from ElasticSearch server, check the elasticsearch server log."));
}
default:
{
}
}
// The "successful" return happens above
return false;
}
#endif

View file

@ -0,0 +1,81 @@
// See the file "COPYING" in the main distribution directory for copyright.
//
// Log writer for writing to an ElasticSearch database
//
// This is experimental code that is not yet ready for production usage.
//
#ifndef LOGGING_WRITER_ELASTICSEARCH_H
#define LOGGING_WRITER_ELASTICSEARCH_H
#include <curl/curl.h>
#include "../WriterBackend.h"
namespace logging { namespace writer {
class ElasticSearch : public WriterBackend {
public:
ElasticSearch(WriterFrontend* frontend);
~ElasticSearch();
static WriterBackend* Instantiate(WriterFrontend* frontend)
{ return new ElasticSearch(frontend); }
static string LogExt();
protected:
// Overidden from WriterBackend.
virtual bool DoInit(const WriterInfo& info, int num_fields,
const threading::Field* const* fields);
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
threading::Value** vals);
virtual bool DoSetBuf(bool enabled);
virtual bool DoRotate(const char* rotated_path, double open,
double close, bool terminating);
virtual bool DoFlush(double network_time);
virtual bool DoFinish(double network_time);
virtual bool DoHeartbeat(double network_time, double current_time);
private:
bool AddFieldToBuffer(ODesc *b, threading::Value* val, const threading::Field* field);
bool AddValueToBuffer(ODesc *b, threading::Value* val);
bool BatchIndex();
bool SendMappings();
bool UpdateIndex(double now, double rinterval, double rbase);
CURL* HTTPSetup();
bool HTTPReceive(void* ptr, int size, int nmemb, void* userdata);
bool HTTPSend(CURL *handle);
// Buffers, etc.
ODesc buffer;
uint64 counter;
double last_send;
string current_index;
string prev_index;
CURL* curl_handle;
// From scripts
char* cluster_name;
int cluster_name_len;
string es_server;
string bulk_url;
struct curl_slist *http_headers;
string path;
string index_prefix;
uint64 transfer_timeout;
bool failing;
uint64 batch_size;
};
}
}
#endif

View file

@ -12,6 +12,10 @@
#include <getopt.h> #include <getopt.h>
#endif #endif
#ifdef USE_CURL
#include <curl/curl.h>
#endif
#ifdef USE_IDMEF #ifdef USE_IDMEF
extern "C" { extern "C" {
#include <libidmef/idmefxml.h> #include <libidmef/idmefxml.h>
@ -712,6 +716,10 @@ int main(int argc, char** argv)
SSL_library_init(); SSL_library_init();
SSL_load_error_strings(); SSL_load_error_strings();
#ifdef USE_CURL
curl_global_init(CURL_GLOBAL_ALL);
#endif
// FIXME: On systems that don't provide /dev/urandom, OpenSSL doesn't // FIXME: On systems that don't provide /dev/urandom, OpenSSL doesn't
// seed the PRNG. We should do this here (but at least Linux, FreeBSD // seed the PRNG. We should do this here (but at least Linux, FreeBSD
// and Solaris provide /dev/urandom). // and Solaris provide /dev/urandom).
@ -1062,6 +1070,10 @@ int main(int argc, char** argv)
done_with_network(); done_with_network();
net_delete(); net_delete();
#ifdef USE_CURL
curl_global_cleanup();
#endif
terminate_bro(); terminate_bro();
// Close files after net_delete(), because net_delete() // Close files after net_delete(), because net_delete()

View file

@ -163,6 +163,7 @@ enum Writer %{
WRITER_NONE, WRITER_NONE,
WRITER_ASCII, WRITER_ASCII,
WRITER_DATASERIES, WRITER_DATASERIES,
WRITER_ELASTICSEARCH,
%} %}
enum ID %{ enum ID %{

View file

@ -13,6 +13,7 @@
// Expose C99 functionality from inttypes.h, which would otherwise not be // Expose C99 functionality from inttypes.h, which would otherwise not be
// available in C++. // available in C++.
#define __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS
#define __STDC_LIMIT_MACROS
#include <inttypes.h> #include <inttypes.h>
#if __STDC__ #if __STDC__

View file

@ -3,7 +3,7 @@
#empty_field (empty) #empty_field (empty)
#unset_field - #unset_field -
#path loaded_scripts #path loaded_scripts
#start 2012-07-20-01-49-31 #start 2012-07-20-14-34-11
#fields name #fields name
#types string #types string
scripts/base/init-bare.bro scripts/base/init-bare.bro
@ -21,6 +21,7 @@ scripts/base/init-bare.bro
scripts/base/frameworks/logging/./postprocessors/./sftp.bro scripts/base/frameworks/logging/./postprocessors/./sftp.bro
scripts/base/frameworks/logging/./writers/ascii.bro scripts/base/frameworks/logging/./writers/ascii.bro
scripts/base/frameworks/logging/./writers/dataseries.bro scripts/base/frameworks/logging/./writers/dataseries.bro
scripts/base/frameworks/logging/./writers/elasticsearch.bro
scripts/base/frameworks/logging/./writers/none.bro scripts/base/frameworks/logging/./writers/none.bro
scripts/base/frameworks/input/__load__.bro scripts/base/frameworks/input/__load__.bro
scripts/base/frameworks/input/./main.bro scripts/base/frameworks/input/./main.bro
@ -29,4 +30,4 @@ scripts/base/init-bare.bro
scripts/base/frameworks/input/./readers/raw.bro scripts/base/frameworks/input/./readers/raw.bro
scripts/base/frameworks/input/./readers/benchmark.bro scripts/base/frameworks/input/./readers/benchmark.bro
scripts/policy/misc/loaded-scripts.bro scripts/policy/misc/loaded-scripts.bro
#end 2012-07-20-01-49-31 #end 2012-07-20-14-34-11

View file

@ -3,7 +3,7 @@
#empty_field (empty) #empty_field (empty)
#unset_field - #unset_field -
#path loaded_scripts #path loaded_scripts
#start 2012-07-20-01-49-33 #start 2012-07-20-14-34-40
#fields name #fields name
#types string #types string
scripts/base/init-bare.bro scripts/base/init-bare.bro
@ -21,6 +21,7 @@ scripts/base/init-bare.bro
scripts/base/frameworks/logging/./postprocessors/./sftp.bro scripts/base/frameworks/logging/./postprocessors/./sftp.bro
scripts/base/frameworks/logging/./writers/ascii.bro scripts/base/frameworks/logging/./writers/ascii.bro
scripts/base/frameworks/logging/./writers/dataseries.bro scripts/base/frameworks/logging/./writers/dataseries.bro
scripts/base/frameworks/logging/./writers/elasticsearch.bro
scripts/base/frameworks/logging/./writers/none.bro scripts/base/frameworks/logging/./writers/none.bro
scripts/base/frameworks/input/__load__.bro scripts/base/frameworks/input/__load__.bro
scripts/base/frameworks/input/./main.bro scripts/base/frameworks/input/./main.bro
@ -109,4 +110,4 @@ scripts/base/init-default.bro
scripts/base/protocols/syslog/./consts.bro scripts/base/protocols/syslog/./consts.bro
scripts/base/protocols/syslog/./main.bro scripts/base/protocols/syslog/./main.bro
scripts/policy/misc/loaded-scripts.bro scripts/policy/misc/loaded-scripts.bro
#end 2012-07-20-01-49-33 #end 2012-07-20-14-34-40