Reworked bulk operation string construction to use ODesc and added json escaping.

This commit is contained in:
Seth Hall 2012-06-15 16:30:54 -04:00
parent d3bb4617e9
commit 0bb8b69c95
2 changed files with 187 additions and 243 deletions

View file

@ -8,6 +8,7 @@
#include <errno.h> #include <errno.h>
#include "util.h" #include "util.h"
#include "BroString.h"
#include "NetVar.h" #include "NetVar.h"
#include "threading/SerialTypes.h" #include "threading/SerialTypes.h"
@ -22,8 +23,6 @@ using namespace writer;
using threading::Value; using threading::Value;
using threading::Field; using threading::Field;
#define MAX_EVENT_SIZE 1024
ElasticSearch::ElasticSearch(WriterFrontend* frontend) : WriterBackend(frontend) ElasticSearch::ElasticSearch(WriterFrontend* frontend) : WriterBackend(frontend)
{ {
cluster_name_len = BifConst::LogElasticSearch::cluster_name->Len(); cluster_name_len = BifConst::LogElasticSearch::cluster_name->Len();
@ -31,27 +30,7 @@ ElasticSearch::ElasticSearch(WriterFrontend* frontend) : WriterBackend(frontend)
memcpy(cluster_name, BifConst::LogElasticSearch::cluster_name->Bytes(), cluster_name_len); memcpy(cluster_name, BifConst::LogElasticSearch::cluster_name->Bytes(), cluster_name_len);
cluster_name[cluster_name_len] = 0; cluster_name[cluster_name_len] = 0;
server_host_len = BifConst::LogElasticSearch::server_host->Len(); buffer.Clear();
server_host = new char[server_host_len + 1];
memcpy(server_host, BifConst::LogElasticSearch::server_host->Bytes(), server_host_len);
server_host[server_host_len] = 0;
index_name_len = BifConst::LogElasticSearch::index_name->Len();
index_name = new char[index_name_len + 1];
memcpy(index_name, BifConst::LogElasticSearch::index_name->Bytes(), index_name_len);
index_name[index_name_len] = 0;
type_prefix_len = BifConst::LogElasticSearch::type_prefix->Len();
type_prefix = new char[type_prefix_len + 1];
memcpy(type_prefix, BifConst::LogElasticSearch::type_prefix->Bytes(), type_prefix_len);
type_prefix[type_prefix_len] = 0;
server_port = BifConst::LogElasticSearch::server_port;
batch_size = BifConst::LogElasticSearch::batch_size;
buffer = (char *)safe_malloc(MAX_EVENT_SIZE * batch_size);
current_offset = 0;
buffer[current_offset] = 0;
counter = 0; counter = 0;
curl_handle = HTTPSetup(); curl_handle = HTTPSetup();
@ -61,10 +40,6 @@ ElasticSearch::ElasticSearch(WriterFrontend* frontend) : WriterBackend(frontend)
ElasticSearch::~ElasticSearch() ElasticSearch::~ElasticSearch()
{ {
delete [] cluster_name; delete [] cluster_name;
delete [] server_host;
delete [] index_name;
delete [] type_prefix;
delete [] buffer;
} }
bool ElasticSearch::DoInit(string path, int num_fields, const Field* const * fields) bool ElasticSearch::DoInit(string path, int num_fields, const Field* const * fields)
@ -84,168 +59,149 @@ bool ElasticSearch::DoFinish()
return WriterBackend::DoFinish(); return WriterBackend::DoFinish();
} }
bool ElasticSearch::BatchIndex() bool ElasticSearch::AddFieldValueToBuffer(Value* val, const Field* field)
{ {
return HTTPSend(); switch ( val->type )
} {
char* ElasticSearch::FieldToString(Value* val, const Field* field)
{
char* result = new char[MAX_EVENT_SIZE];
switch ( val->type ) {
// ElasticSearch defines bools as: 0 == false, everything else == true. So we treat it as an int. // ElasticSearch defines bools as: 0 == false, everything else == true. So we treat it as an int.
case TYPE_BOOL: case TYPE_BOOL:
case TYPE_INT: case TYPE_INT:
sprintf(result, "%d", (int) val->val.int_val); return result; buffer.Add(val->val.int_val);
break;
case TYPE_COUNT: case TYPE_COUNT:
case TYPE_COUNTER: case TYPE_COUNTER:
sprintf(result, "%d", (int) val->val.uint_val); return result; buffer.Add(val->val.uint_val);
break;
case TYPE_PORT: case TYPE_PORT:
sprintf(result, "%d", (int) val->val.port_val.port); return result; buffer.Add(val->val.port_val.port);
break;
case TYPE_SUBNET: case TYPE_SUBNET:
sprintf(result, "\"%s\"", Render(val->val.subnet_val).c_str()); return result; buffer.AddRaw("\"", 1);
buffer.Add(Render(val->val.subnet_val));
buffer.AddRaw("\"", 1);
break;
case TYPE_ADDR: case TYPE_ADDR:
sprintf(result, "\"%s\"", Render(val->val.addr_val).c_str()); return result; buffer.AddRaw("\"", 1);
buffer.Add(Render(val->val.addr_val));
buffer.AddRaw("\"", 1);
break;
case TYPE_DOUBLE:
buffer.Add(val->val.double_val);
break;
case TYPE_INTERVAL: case TYPE_INTERVAL:
case TYPE_TIME: case TYPE_TIME:
sprintf(result, "%"PRIu64"", (uint64) (val->val.double_val * 1000)); return result; // ElasticSearch uses milliseconds for timestamps
case TYPE_DOUBLE: buffer.Add((uint64_t) (val->val.double_val * 1000));
sprintf(result, "%s", Render(val->val.double_val).c_str()); return result; break;
case TYPE_ENUM: case TYPE_ENUM:
case TYPE_STRING: case TYPE_STRING:
case TYPE_FILE: case TYPE_FILE:
case TYPE_FUNC: case TYPE_FUNC:
{ {
int size = val->val.string_val->size(); buffer.AddRaw("\"", 1);
const char* data = val->val.string_val->data(); for ( uint i = 0; i < val->val.string_val->size(); ++i )
{
if ( ! size ) char c = val->val.string_val->data()[i];
return 0; // HTML entity encode special characters.
sprintf(result, "\"%s\"", data); return result; if ( c < 32 || c > 126 || c == '\n' || c == '"' || c == '\'' || c == '\\' )
{
buffer.AddRaw("&#", 2);
buffer.Add((uint8_t) c);
buffer.AddRaw(";", 1);
}
else
buffer.AddRaw(&c, 1);
}
buffer.AddRaw("\"", 1);
break;
} }
case TYPE_TABLE: case TYPE_TABLE:
{ {
char* tmp = new char[MAX_EVENT_SIZE]; buffer.AddRaw("[", 1);
int tmp_offset = 0;
strcpy(tmp, "{");
tmp_offset = 1;
bool result_seen = false;
for ( int j = 0; j < val->val.set_val.size; j++ ) for ( int j = 0; j < val->val.set_val.size; j++ )
{ {
char* sub_field = FieldToString(val->val.set_val.vals[j], field); if ( j > 0 )
if ( sub_field ){ buffer.AddRaw(",", 1);
AddFieldValueToBuffer(val->val.set_val.vals[j], field);
if ( result_seen ){
strcpy(tmp + tmp_offset, ",");
tmp_offset += 1;
} }
else buffer.AddRaw("]", 1);
result_seen = true; break;
sprintf(tmp + tmp_offset, "\"%s\":%s", field->name.c_str(), sub_field);
tmp_offset = strlen(tmp);
}
}
strcpy(tmp + tmp_offset, "}");
tmp_offset += 1;
sprintf(result, "%s", tmp);
return result;
} }
case TYPE_VECTOR: case TYPE_VECTOR:
{ {
char* tmp = new char[MAX_EVENT_SIZE]; buffer.AddRaw("[", 1);
int tmp_offset = 0;
strcpy(tmp, "{");
tmp_offset = 1;
bool result_seen = false;
for ( int j = 0; j < val->val.vector_val.size; j++ ) for ( int j = 0; j < val->val.vector_val.size; j++ )
{ {
char* sub_field = FieldToString(val->val.vector_val.vals[j], field); if ( j > 0 )
if ( sub_field ){ buffer.AddRaw(",", 1);
AddFieldValueToBuffer(val->val.vector_val.vals[j], field);
if ( result_seen ){
strcpy(tmp + tmp_offset, ",");
tmp_offset += 1;
} }
else buffer.AddRaw("]", 1);
result_seen = true; break;
sprintf(tmp + tmp_offset, "\"%s\":%s", field->name.c_str(), sub_field);
tmp_offset = strlen(tmp);
}
}
strcpy(tmp + tmp_offset, "}");
tmp_offset += 1;
sprintf(result, "%s", tmp);
return result;
} }
default: default:
{ return false;
return (char *)"{}"; }
return true;
} }
} bool ElasticSearch::AddFieldToBuffer(Value* val, const Field* field)
}
char* ElasticSearch::AddFieldToBuffer(Value* val, const Field* field)
{ {
if ( ! val->present ) if ( ! val->present )
return 0; return false;
char* result = new char[MAX_EVENT_SIZE];
sprintf(result, "\"%s\":%s", field->name.c_str(), FieldToString(val, field));
return result;
buffer.AddRaw("\"", 1);
buffer.Add(field->name);
buffer.AddRaw("\":", 2);
AddFieldValueToBuffer(val, field);
return true;
} }
bool ElasticSearch::DoWrite(int num_fields, const Field* const * fields, bool ElasticSearch::DoWrite(int num_fields, const Field* const * fields,
Value** vals) Value** vals)
{ {
// Our action line looks like: // Our action line looks like:
// {"index":{"_index":"$index_name","_type":"$type_prefix$path"}}\n{ // {"index":{"_index":"$index_name","_type":"$type_prefix$path"}}\n
if ( counter == 0 )
bool resultSeen = false; {
buffer.AddRaw("{\"index\":{\"_index\":\"", 20);
buffer.AddN((const char*) BifConst::LogElasticSearch::index_name->Bytes(),
BifConst::LogElasticSearch::index_name->Len());
buffer.AddRaw("\",\"_type\":\"", 11);
buffer.AddN((const char*) BifConst::LogElasticSearch::type_prefix->Bytes(),
BifConst::LogElasticSearch::type_prefix->Len());
buffer.Add(Path());
buffer.AddRaw("\"}\n", 3);
}
for ( int i = 0; i < num_fields; i++ ) for ( int i = 0; i < num_fields; i++ )
{ {
char* result = AddFieldToBuffer(vals[i], fields[i]); if ( i == 0 )
if ( result ) { buffer.AddRaw("{", 1);
if ( ! resultSeen ) { else if ( buffer.Bytes()[buffer.Len()] != ',' && vals[i]->present )
current_offset += sprintf(buffer + current_offset, "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s%s\"}\n{", index_name, type_prefix, Path().c_str()); buffer.AddRaw(",", 1);
resultSeen = true; AddFieldToBuffer(vals[i], fields[i]);
}
else {
strcat(buffer, ",");
current_offset += 1;
}
strcat(buffer, result);
current_offset += strlen(result);
}
} }
if ( resultSeen ) { buffer.AddRaw("}\n", 2);
strcat(buffer, "}\n");
current_offset += 2; counter++;
counter += 1; if ( counter >= BifConst::LogElasticSearch::batch_size )
if ( counter >= batch_size ){ {
BatchIndex(); HTTPSend();
current_offset = 0; buffer.Clear();
buffer[current_offset] = 0;
counter = 0; counter = 0;
} }
}
return true; return true;
} }
@ -264,8 +220,9 @@ bool ElasticSearch::DoSetBuf(bool enabled)
// HTTP Functions start here. // HTTP Functions start here.
CURL* ElasticSearch::HTTPSetup() CURL* ElasticSearch::HTTPSetup()
{ {
char URL[2048]; const char *URL = fmt("http://%s:%d/_bulk", BifConst::LogElasticSearch::server_host->CheckString(),
(int) BifConst::LogElasticSearch::server_port);;
CURL* handle; CURL* handle;
struct curl_slist *headers=NULL; struct curl_slist *headers=NULL;
@ -273,7 +230,7 @@ CURL* ElasticSearch::HTTPSetup()
if ( ! handle ) if ( ! handle )
return handle; return handle;
sprintf(URL, "http://%s:%d/_bulk", server_host, (int) server_port); //sprintf(URL, "http://%s:%d/_bulk", BifConst::LogElasticSearch::server_host->CheckString(), (int) BifConst::LogElasticSearch::server_port);
curl_easy_setopt(handle, CURLOPT_URL, URL); curl_easy_setopt(handle, CURLOPT_URL, URL);
headers = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8"); headers = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8");
@ -286,20 +243,20 @@ CURL* ElasticSearch::HTTPSetup()
// just use HTTP 1.0 // just use HTTP 1.0
curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0); curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_0);
return handle; return handle;
}
} bool ElasticSearch::HTTPReceive(void* ptr, int size, int nmemb, void* userdata)
{
bool ElasticSearch::HTTPReceive(void* ptr, int size, int nmemb, void* userdata){
//TODO: Do some verification on the result? //TODO: Do some verification on the result?
return true; return true;
} }
bool ElasticSearch::HTTPSend(){ bool ElasticSearch::HTTPSend(){
CURLcode return_code; CURLcode return_code;
curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, curl_result); curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, curl_result);
curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, buffer); curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDS, buffer.Bytes());
curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE, current_offset); curl_easy_setopt(curl_handle, CURLOPT_POSTFIELDSIZE, buffer.Len());
return_code = curl_easy_perform(curl_handle); return_code = curl_easy_perform(curl_handle);
switch(return_code) { switch(return_code) {
@ -307,6 +264,7 @@ bool ElasticSearch::HTTPSend(){
case CURLE_COULDNT_RESOLVE_HOST: case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_WRITE_ERROR: case CURLE_WRITE_ERROR:
return false; return false;
default: default:
return true; return true;
} }

View file

@ -34,17 +34,15 @@ protected:
virtual bool DoFinish(); virtual bool DoFinish();
private: private:
char* AddFieldToBuffer(threading::Value* val, const threading::Field* field); bool AddFieldToBuffer(threading::Value* val, const threading::Field* field);
char* FieldToString(threading::Value* val, const threading::Field* field); bool AddFieldValueToBuffer(threading::Value* val, const threading::Field* field);
bool BatchIndex();
CURL* HTTPSetup(); CURL* HTTPSetup();
bool HTTPReceive(void* ptr, int size, int nmemb, void* userdata); bool HTTPReceive(void* ptr, int size, int nmemb, void* userdata);
bool HTTPSend(); bool HTTPSend();
// Buffers, etc. // Buffers, etc.
char* buffer; ODesc buffer;
int current_offset;
uint64 counter; uint64 counter;
CURL* curl_handle; CURL* curl_handle;
@ -54,19 +52,7 @@ private:
char* cluster_name; char* cluster_name;
int cluster_name_len; int cluster_name_len;
char* server_host;
int server_host_len;
uint64 server_port;
char* index_name;
int index_name_len;
char* type_prefix;
int type_prefix_len;
uint64 batch_size; uint64 batch_size;
}; };
} }