Refactored formatters and updated the the writers a bit.

- Formatters have been abstracted similarly to readers and writers now.
 - The Ascii writer has a new option for writing out logs as JSON.
 - The Ascii writer now has all options availble as per-filter
   options as well as global.
This commit is contained in:
Seth Hall 2014-03-10 10:42:59 -04:00
parent 83ec05bb4a
commit a56c343715
25 changed files with 750 additions and 428 deletions

149
src/threading/Formatter.cc Normal file
View file

@ -0,0 +1,149 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "config.h"
#include <sstream>
#include <errno.h>
#include "Formatter.h"
#include "bro_inet_ntop.h"
using namespace threading;
using namespace formatter;
using threading::Value;
using threading::Field;
Formatter::Formatter(threading::MsgThread* t)
{
thread = t;
}
Formatter::~Formatter()
{
}
bool Formatter::CheckNumberError(const string& s, const char* end) const
{
// Do this check first, before executing s.c_str() or similar.
// otherwise the value to which *end is pointing at the moment might
// be gone ...
bool endnotnull = (*end != '\0');
if ( s.length() == 0 )
{
thread->Error("Got empty string for number field");
return true;
}
if ( end == s.c_str() ) {
thread->Error(thread->Fmt("String '%s' contained no parseable number", s.c_str()));
return true;
}
if ( endnotnull )
thread->Warning(thread->Fmt("Number '%s' contained non-numeric trailing characters. Ignored trailing characters '%s'", s.c_str(), end));
if ( errno == EINVAL )
{
thread->Error(thread->Fmt("String '%s' could not be converted to a number", s.c_str()));
return true;
}
else if ( errno == ERANGE )
{
thread->Error(thread->Fmt("Number '%s' out of supported range.", s.c_str()));
return true;
}
return false;
}
string Formatter::Render(const threading::Value::addr_t& addr) const
{
if ( addr.family == IPv4 )
{
char s[INET_ADDRSTRLEN];
if ( ! bro_inet_ntop(AF_INET, &addr.in.in4, s, INET_ADDRSTRLEN) )
return "<bad IPv4 address conversion>";
else
return s;
}
else
{
char s[INET6_ADDRSTRLEN];
if ( ! bro_inet_ntop(AF_INET6, &addr.in.in6, s, INET6_ADDRSTRLEN) )
return "<bad IPv6 address conversion>";
else
return s;
}
}
TransportProto Formatter::ParseProto(const string &proto) const
{
if ( proto == "unknown" )
return TRANSPORT_UNKNOWN;
else if ( proto == "tcp" )
return TRANSPORT_TCP;
else if ( proto == "udp" )
return TRANSPORT_UDP;
else if ( proto == "icmp" )
return TRANSPORT_ICMP;
thread->Error(thread->Fmt("Tried to parse invalid/unknown protocol: %s", proto.c_str()));
return TRANSPORT_UNKNOWN;
}
// More or less verbose copy from IPAddr.cc -- which uses reporter.
threading::Value::addr_t Formatter::ParseAddr(const string &s) const
{
threading::Value::addr_t val;
if ( s.find(':') == std::string::npos ) // IPv4.
{
val.family = IPv4;
if ( inet_aton(s.c_str(), &(val.in.in4)) <= 0 )
{
thread->Error(thread->Fmt("Bad address: %s", s.c_str()));
memset(&val.in.in4.s_addr, 0, sizeof(val.in.in4.s_addr));
}
}
else
{
val.family = IPv6;
if ( inet_pton(AF_INET6, s.c_str(), val.in.in6.s6_addr) <=0 )
{
thread->Error(thread->Fmt("Bad address: %s", s.c_str()));
memset(val.in.in6.s6_addr, 0, sizeof(val.in.in6.s6_addr));
}
}
return val;
}
string Formatter::Render(const threading::Value::subnet_t& subnet) const
{
char l[16];
if ( subnet.prefix.family == IPv4 )
modp_uitoa10(subnet.length - 96, l);
else
modp_uitoa10(subnet.length, l);
string s = Render(subnet.prefix) + "/" + l;
return s;
}
string Formatter::Render(double d) const
{
char buf[256];
modp_dtoa(d, buf, 6);
return buf;
}

View file

@ -1,42 +1,20 @@
// See the file "COPYING" in the main distribution directory for copyright.
#ifndef THREADING_ASCII_FORMATTER_H
#define THREADING_ASCII_FORMATTER_H
#ifndef THREADING_FORMATTER_H
#define THREADING_FORMATTER_H
#include "../Desc.h"
#include "MsgThread.h"
namespace threading { namespace formatter {
/**
* A thread-safe class for converting values into a readable ASCII
* representation, and vice versa. This is a utility class that factors out
* common rendering/parsing code needed by a number of input/output threads.
* A thread-safe class for converting values into some textual format.
* This is a base class that implements the interface forcommon
* rendering/parsing code needed by a number of input/output threads.
*/
class AsciiFormatter {
class Formatter {
public:
/**
* A struct to pass the necessary configuration values to the
* AsciiFormatter module on initialization.
*/
struct SeparatorInfo
{
string set_separator; // Separator between set elements.
string unset_field; // String marking an unset field.
string empty_field; // String marking an empty (but set) field.
/**
* Constructor that leaves separators etc unset to dummy
* values. Useful if you use only methods that don't need any
* of them, like StringToAddr, etc.
*/
SeparatorInfo();
/**
* Constructor that defines all the configuration options.
* Use if you need either ValToODesc or EntryToVal.
*/
SeparatorInfo(const string& set_separator, const string& unset_field, const string& empty_field);
};
/**
* Constructor.
*
@ -44,31 +22,69 @@ public:
* some of the thread's methods, e.g., for error reporting and
* internal formatting.
*
* @param info SeparatorInfo structure defining the necessary
* separators.
*/
AsciiFormatter(threading::MsgThread* t, const SeparatorInfo info);
Formatter(threading::MsgThread* t);
/**
* Destructor.
*/
~AsciiFormatter();
virtual ~Formatter();
/**
* Convert a threading value into a corresponding ASCII.
* representation.
* Convert a list of threading values into an implementation specific representation.
*
* @param desc The ODesc object to write to.
*
* @param num_fields The number of fields in the logging record.
*
* @param fields Information about the fields for each of the given log values.
*
* @param vals The field values.
*
* @return Returns true on success, false on error. Errors are also
* flagged via the reporter.
*/
virtual bool Describe(ODesc* desc, int num_fields, const threading::Field* const * fields,
threading::Value** vals) const = 0;
/**
* Convert a threading value into an implementation specific representation.
*
* @param desc The ODesc object to write to.
*
* @param val the Value to render to the ODesc object.
*
* @param The name of a field associated with the value. Used only
* for error reporting.
* @return Returns true on success, false on error. Errors are also
* flagged via the reporter.
*/
virtual bool Describe(ODesc* desc, threading::Value* val) const = 0;
/**
* Convert a threading value into an implementation specific representation.
*
* @param desc The ODesc object to write to.
*
* @param val the Value to render to the ODesc object.
*
* @param The name of a field associated with the value.
*
* @return Returns true on success, false on error. Errors are also
* flagged via the reporter.
*/
bool Describe(ODesc* desc, threading::Value* val, const string& name) const;
virtual bool Describe(ODesc* desc, threading::Value* val, const string& name) const = 0;
/**
* Convert a representation of a field into a value.
*
* @param s The string to parse.
*
* @param The name of a field associated with the value. Used only
* for error reporting.
*
* @return The new value, or null on error. Errors are also flagged
* via the reporter.
*/
virtual threading::Value* ParseValue(string s, string name, TypeTag type, TypeTag subtype = TYPE_ERROR) const = 0;
/**
* Convert an IP address into a string.
@ -98,19 +114,6 @@ public:
*/
string Render(double d) const;
/**
* Convert the ASCII representation of a field into a value.
*
* @param s The string to parse.
*
* @param The name of a field associated with the value. Used only
* for error reporting.
*
* @return The new value, or null on error. Errors are also flagged
* via the reporter.
*/
threading::Value* ParseValue(string s, string name, TypeTag type, TypeTag subtype = TYPE_ERROR) const;
/**
* Convert a string into a TransportProto. The string must be one of
* \c tcp, \c udp, \c icmp, or \c unknown.
@ -132,11 +135,12 @@ public:
*/
threading::Value::addr_t ParseAddr(const string &addr) const;
private:
protected:
bool CheckNumberError(const string& s, const char * end) const;
SeparatorInfo separators;
threading::MsgThread* thread;
};
#endif /* THREADING_ASCII_FORMATTER_H */
}}
#endif /* THREADING_FORMATTER_H */

View file

@ -5,35 +5,59 @@
#include <sstream>
#include <errno.h>
#include "AsciiFormatter.h"
#include "bro_inet_ntop.h"
#include "./Ascii.h"
AsciiFormatter::SeparatorInfo::SeparatorInfo()
using namespace threading::formatter;
Ascii::SeparatorInfo::SeparatorInfo()
{
this->separator = "SHOULD_NOT_BE_USED";
this->set_separator = "SHOULD_NOT_BE_USED";
this->unset_field = "SHOULD_NOT_BE_USED";
this->empty_field = "SHOULD_NOT_BE_USED";
}
AsciiFormatter::SeparatorInfo::SeparatorInfo(const string & set_separator,
const string & unset_field, const string & empty_field)
Ascii::SeparatorInfo::SeparatorInfo(const string & separator,
const string & set_separator,
const string & unset_field,
const string & empty_field)
{
this->separator = separator;
this->set_separator = set_separator;
this->unset_field = unset_field;
this->empty_field = empty_field;
}
AsciiFormatter::AsciiFormatter(threading::MsgThread* t, const SeparatorInfo info)
Ascii::Ascii(threading::MsgThread* t, const SeparatorInfo info) : Formatter(t)
{
thread = t;
this->separators = info;
}
AsciiFormatter::~AsciiFormatter()
Ascii::~Ascii()
{
}
bool AsciiFormatter::Describe(ODesc* desc, threading::Value* val, const string& name) const
bool Ascii::Describe(ODesc* desc, int num_fields, const threading::Field* const * fields,
threading::Value** vals) const
{
for ( int i = 0; i < num_fields; i++ )
{
if ( i > 0 )
desc->AddRaw(separators.separator);
if ( ! Describe(desc, vals[i], fields[i]->name) )
return false;
}
return true;
}
bool Ascii::Describe(ODesc* desc, threading::Value* val, const string& name) const
{
return Describe(desc, val);
}
bool Ascii::Describe(ODesc* desc, threading::Value* val) const
{
if ( ! val->present )
{
@ -133,7 +157,7 @@ bool AsciiFormatter::Describe(ODesc* desc, threading::Value* val, const string&
if ( j > 0 )
desc->AddRaw(separators.set_separator);
if ( ! Describe(desc, val->val.set_val.vals[j], name) )
if ( ! Describe(desc, val->val.set_val.vals[j]) )
{
desc->RemoveEscapeSequence(separators.set_separator);
return false;
@ -158,7 +182,7 @@ bool AsciiFormatter::Describe(ODesc* desc, threading::Value* val, const string&
if ( j > 0 )
desc->AddRaw(separators.set_separator);
if ( ! Describe(desc, val->val.vector_val.vals[j], name) )
if ( ! Describe(desc, val->val.vector_val.vals[j]) )
{
desc->RemoveEscapeSequence(separators.set_separator);
return false;
@ -170,7 +194,7 @@ bool AsciiFormatter::Describe(ODesc* desc, threading::Value* val, const string&
}
default:
thread->Error(thread->Fmt("unsupported field format %d for %s", val->type, name.c_str()));
thread->Error(thread->Fmt("Ascii writer unsupported field format %d", val->type));
return false;
}
@ -178,7 +202,7 @@ bool AsciiFormatter::Describe(ODesc* desc, threading::Value* val, const string&
}
threading::Value* AsciiFormatter::ParseValue(string s, string name, TypeTag type, TypeTag subtype) const
threading::Value* Ascii::ParseValue(string s, string name, TypeTag type, TypeTag subtype) const
{
if ( s.compare(separators.unset_field) == 0 ) // field is not set...
return new threading::Value(type, false);
@ -381,129 +405,3 @@ parse_error:
delete val;
return 0;
}
bool AsciiFormatter::CheckNumberError(const string& s, const char* end) const
{
// Do this check first, before executing s.c_str() or similar.
// otherwise the value to which *end is pointing at the moment might
// be gone ...
bool endnotnull = (*end != '\0');
if ( s.length() == 0 )
{
thread->Error("Got empty string for number field");
return true;
}
if ( end == s.c_str() ) {
thread->Error(thread->Fmt("String '%s' contained no parseable number", s.c_str()));
return true;
}
if ( endnotnull )
thread->Warning(thread->Fmt("Number '%s' contained non-numeric trailing characters. Ignored trailing characters '%s'", s.c_str(), end));
if ( errno == EINVAL )
{
thread->Error(thread->Fmt("String '%s' could not be converted to a number", s.c_str()));
return true;
}
else if ( errno == ERANGE )
{
thread->Error(thread->Fmt("Number '%s' out of supported range.", s.c_str()));
return true;
}
return false;
}
string AsciiFormatter::Render(const threading::Value::addr_t& addr) const
{
if ( addr.family == IPv4 )
{
char s[INET_ADDRSTRLEN];
if ( ! bro_inet_ntop(AF_INET, &addr.in.in4, s, INET_ADDRSTRLEN) )
return "<bad IPv4 address conversion>";
else
return s;
}
else
{
char s[INET6_ADDRSTRLEN];
if ( ! bro_inet_ntop(AF_INET6, &addr.in.in6, s, INET6_ADDRSTRLEN) )
return "<bad IPv6 address conversion>";
else
return s;
}
}
TransportProto AsciiFormatter::ParseProto(const string &proto) const
{
if ( proto == "unknown" )
return TRANSPORT_UNKNOWN;
else if ( proto == "tcp" )
return TRANSPORT_TCP;
else if ( proto == "udp" )
return TRANSPORT_UDP;
else if ( proto == "icmp" )
return TRANSPORT_ICMP;
thread->Error(thread->Fmt("Tried to parse invalid/unknown protocol: %s", proto.c_str()));
return TRANSPORT_UNKNOWN;
}
// More or less verbose copy from IPAddr.cc -- which uses reporter.
threading::Value::addr_t AsciiFormatter::ParseAddr(const string &s) const
{
threading::Value::addr_t val;
if ( s.find(':') == std::string::npos ) // IPv4.
{
val.family = IPv4;
if ( inet_aton(s.c_str(), &(val.in.in4)) <= 0 )
{
thread->Error(thread->Fmt("Bad address: %s", s.c_str()));
memset(&val.in.in4.s_addr, 0, sizeof(val.in.in4.s_addr));
}
}
else
{
val.family = IPv6;
if ( inet_pton(AF_INET6, s.c_str(), val.in.in6.s6_addr) <=0 )
{
thread->Error(thread->Fmt("Bad address: %s", s.c_str()));
memset(val.in.in6.s6_addr, 0, sizeof(val.in.in6.s6_addr));
}
}
return val;
}
string AsciiFormatter::Render(const threading::Value::subnet_t& subnet) const
{
char l[16];
if ( subnet.prefix.family == IPv4 )
modp_uitoa10(subnet.length - 96, l);
else
modp_uitoa10(subnet.length, l);
string s = Render(subnet.prefix) + "/" + l;
return s;
}
string AsciiFormatter::Render(double d) const
{
char buf[256];
modp_dtoa(d, buf, 6);
return buf;
}

View file

@ -0,0 +1,62 @@
// See the file "COPYING" in the main distribution directory for copyright.
#ifndef THREADING_FORMATTERS_ASCII_H
#define THREADING_FORMATTERS_ASCII_H
#include "../Formatter.h"
namespace threading { namespace formatter {
class Ascii : public Formatter {
public:
/**
* A struct to pass the necessary configuration values to the
* Ascii module on initialization.
*/
struct SeparatorInfo
{
string separator; // Separator between columns
string set_separator; // Separator between set elements.
string unset_field; // String marking an unset field.
string empty_field; // String marking an empty (but set) field.
/**
* Constructor that leaves separators etc unset to dummy
* values. Useful if you use only methods that don't need any
* of them, like StringToAddr, etc.
*/
SeparatorInfo();
/**
* Constructor that defines all the configuration options.
* Use if you need either ValToODesc or EntryToVal.
*/
SeparatorInfo(const string& separator, const string& set_separator, const string& unset_field, const string& empty_field);
};
/**
* Constructor.
*
* @param t The thread that uses this class instance. The class uses
* some of the thread's methods, e.g., for error reporting and
* internal formatting.
*
* @param info SeparatorInfo structure defining the necessary
* separators.
*/
Ascii(threading::MsgThread* t, const SeparatorInfo info);
virtual ~Ascii();
virtual bool Describe(ODesc* desc, threading::Value* val) const;
virtual bool Describe(ODesc* desc, threading::Value* val, const string& name) const;
virtual bool Describe(ODesc* desc, int num_fields, const threading::Field* const * fields,
threading::Value** vals) const;
virtual threading::Value* ParseValue(string s, string name, TypeTag type, TypeTag subtype = TYPE_ERROR) const;
private:
SeparatorInfo separators;
};
}}
#endif /* THREADING_FORMATTERS_ASCII_H */

View file

@ -0,0 +1,177 @@
// See the file "COPYING" in the main distribution directory for copyright.
#include "config.h"
#include <sstream>
#include <errno.h>
#include "./JSON.h"
using namespace threading::formatter;
JSON::JSON(MsgThread* t) : Formatter(t)
{
}
JSON::~JSON()
{
}
bool JSON::Describe(ODesc* desc, int num_fields, const Field* const * fields,
Value** vals) const
{
desc->AddRaw("{");
for ( int i = 0; i < num_fields; i++ )
{
if ( i > 0 &&
desc->Bytes()[desc->Len()] != ',' && vals[i]->present )
{
desc->AddRaw(",");
}
if ( ! Describe(desc, vals[i], fields[i]->name) )
return false;
}
desc->AddRaw("}");
return true;
}
bool JSON::Describe(ODesc* desc, Value* val, const string& name) const
{
if ( ! val->present )
return true;
desc->AddRaw("\"", 1);
desc->Add(name);
desc->AddRaw("\":", 2);
return Describe(desc, val);
}
bool JSON::Describe(ODesc* desc, Value* val) const
{
if ( ! val->present )
return true;
switch ( val->type )
{
case TYPE_BOOL:
desc->AddRaw(val->val.int_val == 0 ? "false" : "true");
break;
case TYPE_INT:
desc->Add(val->val.int_val);
break;
case TYPE_COUNT:
case TYPE_COUNTER:
{
// JSON doesn't support unsigned 64bit ints.
if ( val->val.uint_val >= INT64_MAX )
{
thread->Error(thread->Fmt("count value too large for JSON: %" PRIu64, val->val.uint_val));
desc->AddRaw("null", 4);
}
else
desc->Add(val->val.uint_val);
break;
}
case TYPE_PORT:
desc->Add(val->val.port_val.port);
break;
case TYPE_SUBNET:
desc->AddRaw("\"", 1);
desc->Add(Render(val->val.subnet_val));
desc->AddRaw("\"", 1);
break;
case TYPE_ADDR:
desc->AddRaw("\"", 1);
desc->Add(Render(val->val.addr_val));
desc->AddRaw("\"", 1);
break;
case TYPE_DOUBLE:
case TYPE_INTERVAL:
desc->Add(val->val.double_val);
break;
case TYPE_TIME:
{
// ElasticSearch uses milliseconds for timestamps and json only
// supports signed ints (uints can be too large).
uint64_t ts = (uint64_t) (val->val.double_val * 1000);
if ( ts >= INT64_MAX )
{
thread->Error(thread->Fmt("time value too large for JSON: %" PRIu64, ts));
desc->AddRaw("null", 4);
}
else
desc->Add(ts);
break;
}
case TYPE_ENUM:
case TYPE_STRING:
case TYPE_FILE:
case TYPE_FUNC:
{
desc->AddRaw("\"", 1);
for ( int i = 0; i < val->val.string_val.length; ++i )
{
char c = val->val.string_val.data[i];
// 2byte Unicode escape special characters.
if ( c < 32 || c > 126 || c == '\n' || c == '"' || c == '\'' || c == '\\' || c == '&' )
{
static const char hex_chars[] = "0123456789abcdef";
desc->AddRaw("\\u00", 4);
desc->AddRaw(&hex_chars[(c & 0xf0) >> 4], 1);
desc->AddRaw(&hex_chars[c & 0x0f], 1);
}
else
desc->AddRaw(&c, 1);
}
desc->AddRaw("\"", 1);
break;
}
case TYPE_TABLE:
{
desc->AddRaw("[", 1);
for ( int j = 0; j < val->val.set_val.size; j++ )
{
if ( j > 0 )
desc->AddRaw(",", 1);
Describe(desc, val->val.set_val.vals[j]);
}
desc->AddRaw("]", 1);
break;
}
case TYPE_VECTOR:
{
desc->AddRaw("[", 1);
for ( int j = 0; j < val->val.vector_val.size; j++ )
{
if ( j > 0 )
desc->AddRaw(",", 1);
Describe(desc, val->val.vector_val.vals[j]);
}
desc->AddRaw("]", 1);
break;
}
default:
return false;
}
return true;
}
threading::Value* JSON::ParseValue(string s, string name, TypeTag type, TypeTag subtype) const
{
thread->Error("JSON formatter does not support parsing yet.");
return NULL;
}

View file

@ -0,0 +1,30 @@
// See the file "COPYING" in the main distribution directory for copyright.
#ifndef THREADING_FORMATTERS_JSON_H
#define THREADING_FORMATTERS_JSON_H
#include "../Formatter.h"
namespace threading { namespace formatter {
/**
* A thread-safe class for converting values into a JSON representation
* and vice versa.
*/
class JSON : public Formatter {
public:
JSON(threading::MsgThread* t);
virtual ~JSON();
virtual bool Describe(ODesc* desc, threading::Value* val) const;
virtual bool Describe(ODesc* desc, threading::Value* val, const string& name) const;
virtual bool Describe(ODesc* desc, int num_fields, const threading::Field* const * fields,
threading::Value** vals) const;
virtual threading::Value* ParseValue(string s, string name, TypeTag type, TypeTag subtype = TYPE_ERROR) const;
private:
};
}}
#endif /* THREADING_FORMATTERS_JSON_H */