mirror of
https://github.com/zeek/zeek.git
synced 2025-10-07 09:08:20 +00:00
Move DataSeries and ElasticSearch into plugins.
This commit is contained in:
parent
8031da4ee7
commit
8737eae906
35 changed files with 9 additions and 2297 deletions
|
@ -1,6 +1,4 @@
|
|||
|
||||
add_subdirectory(ascii)
|
||||
add_subdirectory(dataseries)
|
||||
add_subdirectory(elasticsearch)
|
||||
add_subdirectory(none)
|
||||
add_subdirectory(sqlite)
|
||||
|
|
|
@ -1,26 +0,0 @@
|
|||
|
||||
include(BroPlugin)
|
||||
|
||||
find_package(Lintel)
|
||||
find_package(DataSeries)
|
||||
find_package(LibXML2)
|
||||
|
||||
if (NOT DISABLE_DATASERIES AND
|
||||
LINTEL_FOUND AND DATASERIES_FOUND AND LIBXML2_FOUND)
|
||||
|
||||
include_directories(BEFORE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
|
||||
|
||||
include_directories(BEFORE ${Lintel_INCLUDE_DIR})
|
||||
include_directories(BEFORE ${DataSeries_INCLUDE_DIR})
|
||||
include_directories(BEFORE ${LibXML2_INCLUDE_DIR})
|
||||
|
||||
bro_plugin_begin(Bro DataSeriesWriter)
|
||||
bro_plugin_cc(DataSeries.cc Plugin.cc)
|
||||
bro_plugin_bif(dataseries.bif)
|
||||
bro_plugin_link_library(${Lintel_LIBRARIES})
|
||||
bro_plugin_link_library(${DataSeries_LIBRARIES})
|
||||
bro_plugin_link_library(${LibXML2_LIBRARIES})
|
||||
bro_plugin_end()
|
||||
|
||||
endif()
|
||||
|
|
@ -1,459 +0,0 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
|
||||
#include "config.h"
|
||||
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <errno.h>
|
||||
|
||||
#include <DataSeries/GeneralField.hpp>
|
||||
|
||||
#include "NetVar.h"
|
||||
#include "threading/SerialTypes.h"
|
||||
|
||||
#include "DataSeries.h"
|
||||
#include "dataseries.bif.h"
|
||||
|
||||
using namespace logging;
|
||||
using namespace writer;
|
||||
|
||||
std::string DataSeries::LogValueToString(threading::Value *val)
|
||||
{
|
||||
// In some cases, no value is attached. If this is the case, return
|
||||
// an empty string.
|
||||
if( ! val->present )
|
||||
return "";
|
||||
|
||||
switch(val->type) {
|
||||
case TYPE_BOOL:
|
||||
return (val->val.int_val ? "true" : "false");
|
||||
|
||||
case TYPE_INT:
|
||||
{
|
||||
std::ostringstream ostr;
|
||||
ostr << val->val.int_val;
|
||||
return ostr.str();
|
||||
}
|
||||
|
||||
case TYPE_COUNT:
|
||||
case TYPE_COUNTER:
|
||||
case TYPE_PORT:
|
||||
{
|
||||
std::ostringstream ostr;
|
||||
ostr << val->val.uint_val;
|
||||
return ostr.str();
|
||||
}
|
||||
|
||||
case TYPE_SUBNET:
|
||||
return ascii->Render(val->val.subnet_val);
|
||||
|
||||
case TYPE_ADDR:
|
||||
return ascii->Render(val->val.addr_val);
|
||||
|
||||
// Note: These two cases are relatively special. We need to convert
|
||||
// these values into their integer equivalents to maximize precision.
|
||||
// At the moment, there won't be a noticeable effect (Bro uses the
|
||||
// double format everywhere internally, so we've already lost the
|
||||
// precision we'd gain here), but timestamps may eventually switch to
|
||||
// this representation within Bro.
|
||||
//
|
||||
// In the near-term, this *should* lead to better pack_relative (and
|
||||
// thus smaller output files).
|
||||
case TYPE_TIME:
|
||||
case TYPE_INTERVAL:
|
||||
if ( ds_use_integer_for_time )
|
||||
{
|
||||
std::ostringstream ostr;
|
||||
ostr << (uint64_t)(DataSeries::TIME_SCALE * val->val.double_val);
|
||||
return ostr.str();
|
||||
}
|
||||
else
|
||||
return ascii->Render(val->val.double_val);
|
||||
|
||||
case TYPE_DOUBLE:
|
||||
return ascii->Render(val->val.double_val);
|
||||
|
||||
case TYPE_ENUM:
|
||||
case TYPE_STRING:
|
||||
case TYPE_FILE:
|
||||
case TYPE_FUNC:
|
||||
if ( ! val->val.string_val.length )
|
||||
return "";
|
||||
|
||||
return string(val->val.string_val.data, val->val.string_val.length);
|
||||
|
||||
case TYPE_TABLE:
|
||||
{
|
||||
if ( ! val->val.set_val.size )
|
||||
return "";
|
||||
|
||||
string tmpString = "";
|
||||
|
||||
for ( int j = 0; j < val->val.set_val.size; j++ )
|
||||
{
|
||||
if ( j > 0 )
|
||||
tmpString += ds_set_separator;
|
||||
|
||||
tmpString += LogValueToString(val->val.set_val.vals[j]);
|
||||
}
|
||||
|
||||
return tmpString;
|
||||
}
|
||||
|
||||
case TYPE_VECTOR:
|
||||
{
|
||||
if ( ! val->val.vector_val.size )
|
||||
return "";
|
||||
|
||||
string tmpString = "";
|
||||
|
||||
for ( int j = 0; j < val->val.vector_val.size; j++ )
|
||||
{
|
||||
if ( j > 0 )
|
||||
tmpString += ds_set_separator;
|
||||
|
||||
tmpString += LogValueToString(val->val.vector_val.vals[j]);
|
||||
}
|
||||
|
||||
return tmpString;
|
||||
}
|
||||
|
||||
default:
|
||||
InternalError(Fmt("unknown type %s in DataSeries::LogValueToString", type_name(val->type)));
|
||||
return "cannot be reached";
|
||||
}
|
||||
}
|
||||
|
||||
string DataSeries::GetDSFieldType(const threading::Field *field)
|
||||
{
|
||||
switch(field->type) {
|
||||
case TYPE_BOOL:
|
||||
return "bool";
|
||||
|
||||
case TYPE_COUNT:
|
||||
case TYPE_COUNTER:
|
||||
case TYPE_PORT:
|
||||
case TYPE_INT:
|
||||
return "int64";
|
||||
|
||||
case TYPE_DOUBLE:
|
||||
return "double";
|
||||
|
||||
case TYPE_TIME:
|
||||
case TYPE_INTERVAL:
|
||||
return ds_use_integer_for_time ? "int64" : "double";
|
||||
|
||||
case TYPE_SUBNET:
|
||||
case TYPE_ADDR:
|
||||
case TYPE_ENUM:
|
||||
case TYPE_STRING:
|
||||
case TYPE_FILE:
|
||||
case TYPE_TABLE:
|
||||
case TYPE_VECTOR:
|
||||
case TYPE_FUNC:
|
||||
return "variable32";
|
||||
|
||||
default:
|
||||
InternalError(Fmt("unknown type %s in DataSeries::GetDSFieldType", type_name(field->type)));
|
||||
return "cannot be reached";
|
||||
}
|
||||
}
|
||||
|
||||
string DataSeries::BuildDSSchemaFromFieldTypes(const vector<SchemaValue>& vals, string sTitle)
|
||||
{
|
||||
if( ! sTitle.size() )
|
||||
sTitle = "GenericBroStream";
|
||||
|
||||
string xmlschema = "<ExtentType name=\""
|
||||
+ sTitle
|
||||
+ "\" version=\"1.0\" namespace=\"bro.org\">\n";
|
||||
|
||||
for( size_t i = 0; i < vals.size(); ++i )
|
||||
{
|
||||
xmlschema += "\t<field type=\""
|
||||
+ vals[i].ds_type
|
||||
+ "\" name=\""
|
||||
+ vals[i].field_name
|
||||
+ "\" " + vals[i].field_options
|
||||
+ "/>\n";
|
||||
}
|
||||
|
||||
xmlschema += "</ExtentType>\n";
|
||||
|
||||
for( size_t i = 0; i < vals.size(); ++i )
|
||||
{
|
||||
xmlschema += "<!-- " + vals[i].field_name
|
||||
+ " : " + vals[i].bro_type
|
||||
+ " -->\n";
|
||||
}
|
||||
|
||||
return xmlschema;
|
||||
}
|
||||
|
||||
std::string DataSeries::GetDSOptionsForType(const threading::Field *field)
|
||||
{
|
||||
switch( field->type ) {
|
||||
case TYPE_TIME:
|
||||
case TYPE_INTERVAL:
|
||||
{
|
||||
std::string s;
|
||||
s += "pack_relative=\"" + std::string(field->name) + "\"";
|
||||
|
||||
if ( ! ds_use_integer_for_time )
|
||||
s += " pack_scale=\"1e-6\" print_format=\"%.6f\" pack_scale_warn=\"no\"";
|
||||
else
|
||||
s += string(" units=\"") + TIME_UNIT() + "\" epoch=\"unix\"";
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
case TYPE_SUBNET:
|
||||
case TYPE_ADDR:
|
||||
case TYPE_ENUM:
|
||||
case TYPE_STRING:
|
||||
case TYPE_FILE:
|
||||
case TYPE_TABLE:
|
||||
case TYPE_VECTOR:
|
||||
return "pack_unique=\"yes\"";
|
||||
|
||||
default:
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
DataSeries::DataSeries(WriterFrontend* frontend) : WriterBackend(frontend)
|
||||
{
|
||||
ds_compression = string((const char *)BifConst::LogDataSeries::compression->Bytes(),
|
||||
BifConst::LogDataSeries::compression->Len());
|
||||
ds_dump_schema = BifConst::LogDataSeries::dump_schema;
|
||||
ds_extent_size = BifConst::LogDataSeries::extent_size;
|
||||
ds_num_threads = BifConst::LogDataSeries::num_threads;
|
||||
ds_use_integer_for_time = BifConst::LogDataSeries::use_integer_for_time;
|
||||
ds_set_separator = ",";
|
||||
|
||||
threading::formatter::Ascii::SeparatorInfo sep_info;
|
||||
ascii = new threading::formatter::Ascii(this, sep_info);
|
||||
|
||||
compress_type = Extent::compress_mode_none;
|
||||
log_file = 0;
|
||||
log_output = 0;
|
||||
}
|
||||
|
||||
DataSeries::~DataSeries()
|
||||
{
|
||||
delete ascii;
|
||||
}
|
||||
|
||||
bool DataSeries::OpenLog(string path)
|
||||
{
|
||||
log_file = new DataSeriesSink(path + ".ds", compress_type);
|
||||
log_file->writeExtentLibrary(log_types);
|
||||
|
||||
for( size_t i = 0; i < schema_list.size(); ++i )
|
||||
{
|
||||
string fn = schema_list[i].field_name;
|
||||
GeneralField* gf = 0;
|
||||
#ifdef USE_PERFTOOLS_DEBUG
|
||||
{
|
||||
// GeneralField isn't cleaning up some results of xml parsing, reported
|
||||
// here: https://github.com/dataseries/DataSeries/issues/1
|
||||
// Ignore for now to make leak tests pass. There's confidence that
|
||||
// we do clean up the GeneralField* since the ExtentSeries dtor for
|
||||
// member log_series would trigger an assert if dynamically allocated
|
||||
// fields aren't deleted beforehand.
|
||||
HeapLeakChecker::Disabler disabler;
|
||||
#endif
|
||||
gf = GeneralField::create(log_series, fn);
|
||||
#ifdef USE_PERFTOOLS_DEBUG
|
||||
}
|
||||
#endif
|
||||
extents.insert(std::make_pair(fn, gf));
|
||||
}
|
||||
|
||||
if ( ds_extent_size < ROW_MIN )
|
||||
{
|
||||
Warning(Fmt("%d is not a valid value for 'rows'. Using min of %d instead", (int)ds_extent_size, (int)ROW_MIN));
|
||||
ds_extent_size = ROW_MIN;
|
||||
}
|
||||
|
||||
else if( ds_extent_size > ROW_MAX )
|
||||
{
|
||||
Warning(Fmt("%d is not a valid value for 'rows'. Using max of %d instead", (int)ds_extent_size, (int)ROW_MAX));
|
||||
ds_extent_size = ROW_MAX;
|
||||
}
|
||||
|
||||
log_output = new OutputModule(*log_file, log_series, log_type, ds_extent_size);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool DataSeries::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const * fields)
|
||||
{
|
||||
// We first construct an XML schema thing (and, if ds_dump_schema is
|
||||
// set, dump it to path + ".ds.xml"). Assuming that goes well, we
|
||||
// use that schema to build our output logfile and prepare it to be
|
||||
// written to.
|
||||
|
||||
// Note: compressor count must be set *BEFORE* DataSeriesSink is
|
||||
// instantiated.
|
||||
if( ds_num_threads < THREAD_MIN && ds_num_threads != 0 )
|
||||
{
|
||||
Warning(Fmt("%d is too few threads! Using %d instead", (int)ds_num_threads, (int)THREAD_MIN));
|
||||
ds_num_threads = THREAD_MIN;
|
||||
}
|
||||
|
||||
if( ds_num_threads > THREAD_MAX )
|
||||
{
|
||||
Warning(Fmt("%d is too many threads! Dropping back to %d", (int)ds_num_threads, (int)THREAD_MAX));
|
||||
ds_num_threads = THREAD_MAX;
|
||||
}
|
||||
|
||||
if( ds_num_threads > 0 )
|
||||
DataSeriesSink::setCompressorCount(ds_num_threads);
|
||||
|
||||
for ( int i = 0; i < num_fields; i++ )
|
||||
{
|
||||
const threading::Field* field = fields[i];
|
||||
SchemaValue val;
|
||||
val.ds_type = GetDSFieldType(field);
|
||||
val.field_name = string(field->name);
|
||||
val.field_options = GetDSOptionsForType(field);
|
||||
val.bro_type = field->TypeName();
|
||||
schema_list.push_back(val);
|
||||
}
|
||||
|
||||
string schema = BuildDSSchemaFromFieldTypes(schema_list, info.path);
|
||||
|
||||
if( ds_dump_schema )
|
||||
{
|
||||
string name = string(info.path) + ".ds.xml";
|
||||
FILE* pFile = fopen(name.c_str(), "wb" );
|
||||
|
||||
if( pFile )
|
||||
{
|
||||
fwrite(schema.c_str(), 1, schema.length(), pFile);
|
||||
fclose(pFile);
|
||||
}
|
||||
|
||||
else
|
||||
Error(Fmt("cannot dump schema: %s", Strerror(errno)));
|
||||
}
|
||||
|
||||
compress_type = Extent::compress_all;
|
||||
|
||||
if( ds_compression == "lzf" )
|
||||
compress_type = Extent::compress_mode_lzf;
|
||||
|
||||
else if( ds_compression == "lzo" )
|
||||
compress_type = Extent::compress_mode_lzo;
|
||||
|
||||
else if( ds_compression == "zlib" )
|
||||
compress_type = Extent::compress_mode_zlib;
|
||||
|
||||
else if( ds_compression == "bz2" )
|
||||
compress_type = Extent::compress_mode_bz2;
|
||||
|
||||
else if( ds_compression == "none" )
|
||||
compress_type = Extent::compress_mode_none;
|
||||
|
||||
else if( ds_compression == "any" )
|
||||
compress_type = Extent::compress_all;
|
||||
|
||||
else
|
||||
Warning(Fmt("%s is not a valid compression type. Valid types are: 'lzf', 'lzo', 'zlib', 'bz2', 'none', 'any'. Defaulting to 'any'", ds_compression.c_str()));
|
||||
|
||||
log_type = log_types.registerTypePtr(schema);
|
||||
log_series.setType(log_type);
|
||||
|
||||
return OpenLog(info.path);
|
||||
}
|
||||
|
||||
bool DataSeries::DoFlush(double network_time)
|
||||
{
|
||||
// Flushing is handled by DataSeries automatically, so this function
|
||||
// doesn't do anything.
|
||||
return true;
|
||||
}
|
||||
|
||||
void DataSeries::CloseLog()
|
||||
{
|
||||
for( ExtentIterator iter = extents.begin(); iter != extents.end(); ++iter )
|
||||
delete iter->second;
|
||||
|
||||
extents.clear();
|
||||
|
||||
// Don't delete the file before you delete the output, or bad things
|
||||
// will happen.
|
||||
delete log_output;
|
||||
delete log_file;
|
||||
|
||||
log_output = 0;
|
||||
log_file = 0;
|
||||
}
|
||||
|
||||
bool DataSeries::DoFinish(double network_time)
|
||||
{
|
||||
CloseLog();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool DataSeries::DoWrite(int num_fields, const threading::Field* const * fields,
|
||||
threading::Value** vals)
|
||||
{
|
||||
log_output->newRecord();
|
||||
|
||||
for( size_t i = 0; i < (size_t)num_fields; ++i )
|
||||
{
|
||||
ExtentIterator iter = extents.find(fields[i]->name);
|
||||
assert(iter != extents.end());
|
||||
|
||||
if( iter != extents.end() )
|
||||
{
|
||||
GeneralField *cField = iter->second;
|
||||
|
||||
if( vals[i]->present )
|
||||
cField->set(LogValueToString(vals[i]));
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool DataSeries::DoRotate(const char* rotated_path, double open, double close, bool terminating)
|
||||
{
|
||||
// Note that if DS files are rotated too often, the aggregate log
|
||||
// size will be (much) larger.
|
||||
CloseLog();
|
||||
|
||||
string dsname = string(Info().path) + ".ds";
|
||||
string nname = string(rotated_path) + ".ds";
|
||||
|
||||
if ( rename(dsname.c_str(), nname.c_str()) != 0 )
|
||||
{
|
||||
char buf[256];
|
||||
strerror_r(errno, buf, sizeof(buf));
|
||||
Error(Fmt("failed to rename %s to %s: %s", dsname.c_str(),
|
||||
nname.c_str(), buf));
|
||||
FinishedRotation();
|
||||
return false;
|
||||
}
|
||||
|
||||
if ( ! FinishedRotation(nname.c_str(), dsname.c_str(), open, close, terminating) )
|
||||
{
|
||||
Error(Fmt("error rotating %s to %s", dsname.c_str(), nname.c_str()));
|
||||
return false;
|
||||
}
|
||||
|
||||
return OpenLog(Info().path);
|
||||
}
|
||||
|
||||
bool DataSeries::DoSetBuf(bool enabled)
|
||||
{
|
||||
// DataSeries is *always* buffered to some degree. This option is ignored.
|
||||
return true;
|
||||
}
|
||||
|
||||
bool DataSeries::DoHeartbeat(double network_time, double current_time)
|
||||
{
|
||||
return true;
|
||||
}
|
|
@ -1,128 +0,0 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
//
|
||||
// A binary log writer producing DataSeries output. See doc/data-series.rst
|
||||
// for more information.
|
||||
|
||||
#ifndef LOGGING_WRITER_DATA_SERIES_H
|
||||
#define LOGGING_WRITER_DATA_SERIES_H
|
||||
|
||||
#include <DataSeries/ExtentType.hpp>
|
||||
#include <DataSeries/DataSeriesFile.hpp>
|
||||
#include <DataSeries/DataSeriesModule.hpp>
|
||||
#include <DataSeries/GeneralField.hpp>
|
||||
|
||||
#include "logging/WriterBackend.h"
|
||||
#include "threading/formatters/Ascii.h"
|
||||
|
||||
namespace logging { namespace writer {
|
||||
|
||||
class DataSeries : public WriterBackend {
|
||||
public:
|
||||
DataSeries(WriterFrontend* frontend);
|
||||
~DataSeries();
|
||||
|
||||
static WriterBackend* Instantiate(WriterFrontend* frontend)
|
||||
{ return new DataSeries(frontend); }
|
||||
|
||||
protected:
|
||||
// Overidden from WriterBackend.
|
||||
|
||||
virtual bool DoInit(const WriterInfo& info, int num_fields,
|
||||
const threading::Field* const * fields);
|
||||
|
||||
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
|
||||
threading::Value** vals);
|
||||
virtual bool DoSetBuf(bool enabled);
|
||||
virtual bool DoRotate(const char* rotated_path, double open,
|
||||
double close, bool terminating);
|
||||
virtual bool DoFlush(double network_time);
|
||||
virtual bool DoFinish(double network_time);
|
||||
virtual bool DoHeartbeat(double network_time, double current_time);
|
||||
|
||||
private:
|
||||
static const size_t ROW_MIN = 2048; // Minimum extent size.
|
||||
static const size_t ROW_MAX = (1024 * 1024 * 100); // Maximum extent size.
|
||||
static const size_t THREAD_MIN = 1; // Minimum number of compression threads that DataSeries may spawn.
|
||||
static const size_t THREAD_MAX = 128; // Maximum number of compression threads that DataSeries may spawn.
|
||||
static const size_t TIME_SCALE = 1000000; // Fixed-point multiplier for time values when converted to integers.
|
||||
const char* TIME_UNIT() { return "microseconds"; } // DS name for time resolution when converted to integers. Must match TIME_SCALE.
|
||||
|
||||
struct SchemaValue
|
||||
{
|
||||
string ds_type;
|
||||
string bro_type;
|
||||
string field_name;
|
||||
string field_options;
|
||||
};
|
||||
|
||||
/**
|
||||
* Turns a log value into a std::string. Uses an ostringstream to do the
|
||||
* heavy lifting, but still need to switch on the type to know which value
|
||||
* in the union to give to the string string for processing.
|
||||
*
|
||||
* @param val The value we wish to convert to a string
|
||||
* @return the string value of val
|
||||
*/
|
||||
std::string LogValueToString(threading::Value *val);
|
||||
|
||||
/**
|
||||
* Takes a field type and converts it to a relevant DataSeries type.
|
||||
*
|
||||
* @param field We extract the type from this and convert it into a relevant DS type.
|
||||
* @return String representation of type that DataSeries can understand.
|
||||
*/
|
||||
string GetDSFieldType(const threading::Field *field);
|
||||
|
||||
/**
|
||||
* Are there any options we should put into the XML schema?
|
||||
*
|
||||
* @param field We extract the type from this and return any options that make sense for that type.
|
||||
* @return Options that can be added directly to the XML (e.g. "pack_relative=\"yes\"")
|
||||
*/
|
||||
std::string GetDSOptionsForType(const threading::Field *field);
|
||||
|
||||
/**
|
||||
* Takes a list of types, a list of names, and a title, and uses it to construct a valid DataSeries XML schema
|
||||
* thing, which is then returned as a std::string
|
||||
*
|
||||
* @param opts std::vector of strings containing a list of options to be appended to each field (e.g. "pack_relative=yes")
|
||||
* @param sTitle Name of this schema. Ideally, these schemas would be aggregated and re-used.
|
||||
*/
|
||||
string BuildDSSchemaFromFieldTypes(const vector<SchemaValue>& vals, string sTitle);
|
||||
|
||||
/** Closes the currently open file. */
|
||||
void CloseLog();
|
||||
|
||||
/** Opens a new file. */
|
||||
bool OpenLog(string path);
|
||||
|
||||
typedef std::map<string, GeneralField *> ExtentMap;
|
||||
typedef ExtentMap::iterator ExtentIterator;
|
||||
|
||||
// Internal DataSeries structures we need to keep track of.
|
||||
vector<SchemaValue> schema_list;
|
||||
ExtentTypeLibrary log_types;
|
||||
ExtentType::Ptr log_type;
|
||||
ExtentSeries log_series;
|
||||
ExtentMap extents;
|
||||
int compress_type;
|
||||
|
||||
DataSeriesSink* log_file;
|
||||
OutputModule* log_output;
|
||||
|
||||
// Options set from the script-level.
|
||||
uint64 ds_extent_size;
|
||||
uint64 ds_num_threads;
|
||||
string ds_compression;
|
||||
bool ds_dump_schema;
|
||||
bool ds_use_integer_for_time;
|
||||
string ds_set_separator;
|
||||
|
||||
threading::formatter::Ascii* ascii;
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
@ -1,25 +0,0 @@
|
|||
// See the file in the main distribution directory for copyright.
|
||||
|
||||
|
||||
#include "plugin/Plugin.h"
|
||||
|
||||
#include "DataSeries.h"
|
||||
|
||||
namespace plugin {
|
||||
namespace Bro_DataSeriesWriter {
|
||||
|
||||
class Plugin : public plugin::Plugin {
|
||||
public:
|
||||
plugin::Configuration Configure()
|
||||
{
|
||||
AddComponent(new ::logging::Component("DataSeries", ::logging::writer::DataSeries::Instantiate));
|
||||
|
||||
plugin::Configuration config;
|
||||
config.name = "Bro::DataSeriesWriter";
|
||||
config.description = "DataSeries log writer";
|
||||
return config;
|
||||
}
|
||||
} plugin;
|
||||
|
||||
}
|
||||
}
|
|
@ -1,10 +0,0 @@
|
|||
|
||||
# Options for the DataSeries writer.
|
||||
|
||||
module LogDataSeries;
|
||||
|
||||
const compression: string;
|
||||
const extent_size: count;
|
||||
const dump_schema: bool;
|
||||
const use_integer_for_time: bool;
|
||||
const num_threads: count;
|
|
@ -1,15 +0,0 @@
|
|||
|
||||
include(BroPlugin)
|
||||
|
||||
find_package(LibCURL)
|
||||
|
||||
if (NOT DISABLE_ELASTICSEARCH AND LIBCURL_FOUND)
|
||||
include_directories(BEFORE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
|
||||
bro_plugin_begin(Bro ElasticSearchWriter)
|
||||
bro_plugin_cc(ElasticSearch.cc Plugin.cc)
|
||||
bro_plugin_bif(elasticsearch.bif)
|
||||
bro_plugin_link_library(${LibCURL_LIBRARIES})
|
||||
bro_plugin_end()
|
||||
endif()
|
||||
|
||||
|
|
@ -1,290 +0,0 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
//
|
||||
// This is experimental code that is not yet ready for production usage.
|
||||
//
|
||||
|
||||
|
||||
#include "config.h"
|
||||
|
||||
#include "util.h" // Needs to come first for stdint.h
|
||||
|
||||
#include <string>
|
||||
#include <errno.h>
|
||||
#include <curl/curl.h>
|
||||
#include <curl/easy.h>
|
||||
|
||||
#include "BroString.h"
|
||||
#include "threading/SerialTypes.h"
|
||||
|
||||
#include "ElasticSearch.h"
|
||||
#include "elasticsearch.bif.h"
|
||||
|
||||
using namespace logging;
|
||||
using namespace writer;
|
||||
using threading::Value;
|
||||
using threading::Field;
|
||||
|
||||
ElasticSearch::ElasticSearch(WriterFrontend* frontend) : WriterBackend(frontend)
|
||||
{
|
||||
cluster_name_len = BifConst::LogElasticSearch::cluster_name->Len();
|
||||
cluster_name = new char[cluster_name_len + 1];
|
||||
memcpy(cluster_name, BifConst::LogElasticSearch::cluster_name->Bytes(), cluster_name_len);
|
||||
cluster_name[cluster_name_len] = 0;
|
||||
|
||||
index_prefix = string((const char*) BifConst::LogElasticSearch::index_prefix->Bytes(), BifConst::LogElasticSearch::index_prefix->Len());
|
||||
|
||||
es_server = string(Fmt("http://%s:%d", BifConst::LogElasticSearch::server_host->Bytes(),
|
||||
(int) BifConst::LogElasticSearch::server_port));
|
||||
bulk_url = string(Fmt("%s/_bulk", es_server.c_str()));
|
||||
|
||||
http_headers = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8");
|
||||
buffer.Clear();
|
||||
counter = 0;
|
||||
current_index = string();
|
||||
prev_index = string();
|
||||
last_send = current_time();
|
||||
failing = false;
|
||||
|
||||
transfer_timeout = static_cast<long>(BifConst::LogElasticSearch::transfer_timeout);
|
||||
|
||||
curl_handle = HTTPSetup();
|
||||
|
||||
json = new threading::formatter::JSON(this, threading::formatter::JSON::TS_MILLIS);
|
||||
}
|
||||
|
||||
ElasticSearch::~ElasticSearch()
|
||||
{
|
||||
delete [] cluster_name;
|
||||
delete json;
|
||||
}
|
||||
|
||||
bool ElasticSearch::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const* fields)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ElasticSearch::DoFlush(double network_time)
|
||||
{
|
||||
BatchIndex();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ElasticSearch::DoFinish(double network_time)
|
||||
{
|
||||
BatchIndex();
|
||||
curl_slist_free_all(http_headers);
|
||||
curl_easy_cleanup(curl_handle);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ElasticSearch::BatchIndex()
|
||||
{
|
||||
curl_easy_reset(curl_handle);
|
||||
curl_easy_setopt(curl_handle, CURLOPT_URL, bulk_url.c_str());
|
||||
curl_easy_setopt(curl_handle, CURLOPT_POST, 1);
|
||||
curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t)buffer.Len());
|
||||
curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, buffer.Bytes());
|
||||
failing = ! HTTPSend(curl_handle);
|
||||
|
||||
// We are currently throwing the data out regardless of if the send failed. Fire and forget!
|
||||
buffer.Clear();
|
||||
counter = 0;
|
||||
last_send = current_time();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ElasticSearch::DoWrite(int num_fields, const Field* const * fields,
|
||||
Value** vals)
|
||||
{
|
||||
if ( current_index.empty() )
|
||||
UpdateIndex(network_time, Info().rotation_interval, Info().rotation_base);
|
||||
|
||||
// Our action line looks like:
|
||||
buffer.AddRaw("{\"index\":{\"_index\":\"", 20);
|
||||
buffer.Add(current_index);
|
||||
buffer.AddRaw("\",\"_type\":\"", 11);
|
||||
buffer.Add(Info().path);
|
||||
buffer.AddRaw("\"}}\n", 4);
|
||||
|
||||
json->Describe(&buffer, num_fields, fields, vals);
|
||||
|
||||
buffer.AddRaw("\n", 1);
|
||||
|
||||
counter++;
|
||||
if ( counter >= BifConst::LogElasticSearch::max_batch_size ||
|
||||
uint(buffer.Len()) >= BifConst::LogElasticSearch::max_byte_size )
|
||||
BatchIndex();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ElasticSearch::UpdateIndex(double now, double rinterval, double rbase)
|
||||
{
|
||||
if ( rinterval == 0 )
|
||||
{
|
||||
// if logs aren't being rotated, don't use a rotation oriented index name.
|
||||
current_index = index_prefix;
|
||||
}
|
||||
else
|
||||
{
|
||||
double nr = calc_next_rotate(now, rinterval, rbase);
|
||||
double interval_beginning = now - (rinterval - nr);
|
||||
|
||||
struct tm tm;
|
||||
char buf[128];
|
||||
time_t teatime = (time_t)interval_beginning;
|
||||
localtime_r(&teatime, &tm);
|
||||
strftime(buf, sizeof(buf), "%Y%m%d%H%M", &tm);
|
||||
|
||||
prev_index = current_index;
|
||||
current_index = index_prefix + "-" + buf;
|
||||
|
||||
// Send some metadata about this index.
|
||||
buffer.AddRaw("{\"index\":{\"_index\":\"@", 21);
|
||||
buffer.Add(index_prefix);
|
||||
buffer.AddRaw("-meta\",\"_type\":\"index\",\"_id\":\"", 30);
|
||||
buffer.Add(current_index);
|
||||
buffer.AddRaw("-", 1);
|
||||
buffer.Add(Info().rotation_base);
|
||||
buffer.AddRaw("-", 1);
|
||||
buffer.Add(Info().rotation_interval);
|
||||
buffer.AddRaw("\"}}\n{\"name\":\"", 13);
|
||||
buffer.Add(current_index);
|
||||
buffer.AddRaw("\",\"start\":", 10);
|
||||
buffer.Add(interval_beginning);
|
||||
buffer.AddRaw(",\"end\":", 7);
|
||||
buffer.Add(interval_beginning+rinterval);
|
||||
buffer.AddRaw("}\n", 2);
|
||||
}
|
||||
|
||||
//printf("%s - prev:%s current:%s\n", Info().path.c_str(), prev_index.c_str(), current_index.c_str());
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool ElasticSearch::DoRotate(const char* rotated_path, double open, double close, bool terminating)
|
||||
{
|
||||
// Update the currently used index to the new rotation interval.
|
||||
UpdateIndex(close, Info().rotation_interval, Info().rotation_base);
|
||||
|
||||
// Only do this stuff if there was a previous index.
|
||||
if ( ! prev_index.empty() )
|
||||
{
|
||||
// FIXME: I think this section is taking too long and causing the thread to die.
|
||||
|
||||
// Compress the previous index
|
||||
//curl_easy_reset(curl_handle);
|
||||
//curl_easy_setopt(curl_handle, CURLOPT_URL, Fmt("%s/%s/_settings", es_server.c_str(), prev_index.c_str()));
|
||||
//curl_easy_setopt(curl_handle, CURLOPT_CUSTOMREQUEST, "PUT");
|
||||
//curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, "{\"index\":{\"store.compress.stored\":\"true\"}}");
|
||||
//curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) 42);
|
||||
//HTTPSend(curl_handle);
|
||||
|
||||
// Optimize the previous index.
|
||||
// TODO: make this into variables.
|
||||
//curl_easy_reset(curl_handle);
|
||||
//curl_easy_setopt(curl_handle, CURLOPT_URL, Fmt("%s/%s/_optimize?max_num_segments=1&wait_for_merge=false", es_server.c_str(), prev_index.c_str()));
|
||||
//HTTPSend(curl_handle);
|
||||
}
|
||||
|
||||
if ( ! FinishedRotation(current_index.c_str(), prev_index.c_str(), open, close, terminating) )
|
||||
Error(Fmt("error rotating %s to %s", prev_index.c_str(), current_index.c_str()));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ElasticSearch::DoSetBuf(bool enabled)
|
||||
{
|
||||
// Nothing to do.
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ElasticSearch::DoHeartbeat(double network_time, double current_time)
|
||||
{
|
||||
if ( last_send > 0 && buffer.Len() > 0 &&
|
||||
current_time-last_send > BifConst::LogElasticSearch::max_batch_interval )
|
||||
{
|
||||
BatchIndex();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
CURL* ElasticSearch::HTTPSetup()
|
||||
{
|
||||
CURL* handle = curl_easy_init();
|
||||
if ( ! handle )
|
||||
{
|
||||
Error("cURL did not initialize correctly.");
|
||||
return 0;
|
||||
}
|
||||
|
||||
return handle;
|
||||
}
|
||||
|
||||
size_t ElasticSearch::HTTPReceive(void* ptr, int size, int nmemb, void* userdata)
|
||||
{
|
||||
//TODO: Do some verification on the result?
|
||||
return size;
|
||||
}
|
||||
|
||||
bool ElasticSearch::HTTPSend(CURL *handle)
|
||||
{
|
||||
curl_easy_setopt(handle, CURLOPT_HTTPHEADER, http_headers);
|
||||
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, &logging::writer::ElasticSearch::HTTPReceive); // This gets called with the result.
|
||||
// HTTP 1.1 likes to use chunked encoded transfers, which aren't good for speed.
|
||||
// The best (only?) way to disable that is to just use HTTP 1.0
|
||||
curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0);
|
||||
|
||||
// Some timeout options. These will need more attention later.
|
||||
curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1);
|
||||
curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT, transfer_timeout);
|
||||
curl_easy_setopt(handle, CURLOPT_TIMEOUT, transfer_timeout);
|
||||
curl_easy_setopt(handle, CURLOPT_DNS_CACHE_TIMEOUT, 60*60);
|
||||
|
||||
CURLcode return_code = curl_easy_perform(handle);
|
||||
|
||||
switch ( return_code )
|
||||
{
|
||||
case CURLE_COULDNT_CONNECT:
|
||||
case CURLE_COULDNT_RESOLVE_HOST:
|
||||
case CURLE_WRITE_ERROR:
|
||||
case CURLE_RECV_ERROR:
|
||||
{
|
||||
if ( ! failing )
|
||||
Error(Fmt("ElasticSearch server may not be accessible."));
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case CURLE_OPERATION_TIMEDOUT:
|
||||
{
|
||||
if ( ! failing )
|
||||
Warning(Fmt("HTTP operation with elasticsearch server timed out at %" PRIu64 " msecs.", transfer_timeout));
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case CURLE_OK:
|
||||
{
|
||||
long http_code = 0;
|
||||
curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &http_code);
|
||||
if ( http_code == 200 )
|
||||
// Hopefully everything goes through here.
|
||||
return true;
|
||||
else if ( ! failing )
|
||||
Error(Fmt("Received a non-successful status code back from ElasticSearch server, check the elasticsearch server log."));
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
// The "successful" return happens above
|
||||
return false;
|
||||
}
|
|
@ -1,86 +0,0 @@
|
|||
// See the file "COPYING" in the main distribution directory for copyright.
|
||||
//
|
||||
// Log writer for writing to an ElasticSearch database
|
||||
//
|
||||
// This is experimental code that is not yet ready for production usage.
|
||||
//
|
||||
|
||||
#ifndef LOGGING_WRITER_ELASTICSEARCH_H
|
||||
#define LOGGING_WRITER_ELASTICSEARCH_H
|
||||
|
||||
#include <curl/curl.h>
|
||||
|
||||
#include "logging/WriterBackend.h"
|
||||
#include "threading/formatters/JSON.h"
|
||||
|
||||
namespace logging { namespace writer {
|
||||
|
||||
class ElasticSearch : public WriterBackend {
|
||||
public:
|
||||
ElasticSearch(WriterFrontend* frontend);
|
||||
~ElasticSearch();
|
||||
|
||||
static string LogExt();
|
||||
|
||||
static WriterBackend* Instantiate(WriterFrontend* frontend)
|
||||
{ return new ElasticSearch(frontend); }
|
||||
|
||||
protected:
|
||||
// Overidden from WriterBackend.
|
||||
|
||||
virtual bool DoInit(const WriterInfo& info, int num_fields,
|
||||
const threading::Field* const* fields);
|
||||
|
||||
virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
|
||||
threading::Value** vals);
|
||||
virtual bool DoSetBuf(bool enabled);
|
||||
virtual bool DoRotate(const char* rotated_path, double open,
|
||||
double close, bool terminating);
|
||||
virtual bool DoFlush(double network_time);
|
||||
virtual bool DoFinish(double network_time);
|
||||
virtual bool DoHeartbeat(double network_time, double current_time);
|
||||
|
||||
private:
|
||||
bool AddFieldToBuffer(ODesc *b, threading::Value* val, const threading::Field* field);
|
||||
bool AddValueToBuffer(ODesc *b, threading::Value* val);
|
||||
bool BatchIndex();
|
||||
bool SendMappings();
|
||||
bool UpdateIndex(double now, double rinterval, double rbase);
|
||||
|
||||
CURL* HTTPSetup();
|
||||
size_t HTTPReceive(void* ptr, int size, int nmemb, void* userdata);
|
||||
bool HTTPSend(CURL *handle);
|
||||
|
||||
// Buffers, etc.
|
||||
ODesc buffer;
|
||||
uint64 counter;
|
||||
double last_send;
|
||||
string current_index;
|
||||
string prev_index;
|
||||
|
||||
CURL* curl_handle;
|
||||
|
||||
// From scripts
|
||||
char* cluster_name;
|
||||
int cluster_name_len;
|
||||
|
||||
string es_server;
|
||||
string bulk_url;
|
||||
|
||||
struct curl_slist *http_headers;
|
||||
|
||||
string path;
|
||||
string index_prefix;
|
||||
long transfer_timeout;
|
||||
bool failing;
|
||||
|
||||
uint64 batch_size;
|
||||
|
||||
threading::formatter::JSON* json;
|
||||
};
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#endif
|
|
@ -1,37 +0,0 @@
|
|||
// See the file in the main distribution directory for copyright.
|
||||
|
||||
#include <curl/curl.h>
|
||||
|
||||
#include "plugin/Plugin.h"
|
||||
|
||||
#include "ElasticSearch.h"
|
||||
|
||||
namespace plugin {
|
||||
namespace Bro_ElasticSearchWriter {
|
||||
|
||||
class Plugin : public plugin::Plugin {
|
||||
public:
|
||||
plugin::Configuration Configure()
|
||||
{
|
||||
AddComponent(new ::logging::Component("ElasticSearch", ::logging::writer::ElasticSearch::Instantiate));
|
||||
|
||||
plugin::Configuration config;
|
||||
config.name = "Bro::ElasticSearchWriter";
|
||||
config.description = "ElasticSearch log writer";
|
||||
return config;
|
||||
}
|
||||
|
||||
virtual void InitPreScript()
|
||||
{
|
||||
curl_global_init(CURL_GLOBAL_ALL);
|
||||
}
|
||||
|
||||
virtual void Done()
|
||||
{
|
||||
curl_global_cleanup();
|
||||
}
|
||||
|
||||
} plugin;
|
||||
|
||||
}
|
||||
}
|
|
@ -1,14 +0,0 @@
|
|||
|
||||
# Options for the ElasticSearch writer.
|
||||
|
||||
module LogElasticSearch;
|
||||
|
||||
const cluster_name: string;
|
||||
const server_host: string;
|
||||
const server_port: count;
|
||||
const index_prefix: string;
|
||||
const type_prefix: string;
|
||||
const transfer_timeout: interval;
|
||||
const max_batch_size: count;
|
||||
const max_batch_interval: interval;
|
||||
const max_byte_size: count;
|
Loading…
Add table
Add a link
Reference in a new issue