binpac: Add FlowBuffer policy mechanisms

This allows for tunability of the following behaviors:

* Minimum flowbuffer capacity to use when parsing a new unit

* Threshold at which flowbuffer capacity is contracted back to the
  minimum after parsing a complete unit and before parsing the next

* Maximum flowbuffer capacity to allow when parsing a given unit

Failed flowbuffer allocations due to reaching maximum capacity or any
other reason now throw ExceptionFlowBufferAlloc.
This commit is contained in:
Jon Siwek 2019-04-05 12:21:42 -07:00 committed by Tim Wojtulewicz
parent 7e6e24a4d8
commit b4b229acf7
4 changed files with 73 additions and 13 deletions

View file

@ -16,6 +16,15 @@ namespace {
const unsigned char LF = '\n'; const unsigned char LF = '\n';
} }
binpac::FlowBuffer::Policy binpac::FlowBuffer::policy = {
// max_capacity
10 * 1024 * 1024,
// min_capacity
512,
// contract_threshold
2 * 1024 * 1024,
};
FlowBuffer::FlowBuffer(LineBreakStyle linebreak_style) FlowBuffer::FlowBuffer(LineBreakStyle linebreak_style)
{ {
buffer_length_ = 0; buffer_length_ = 0;
@ -75,6 +84,7 @@ void FlowBuffer::NewMessage()
buffer_n_ = 0; buffer_n_ = 0;
message_complete_ = false; message_complete_ = false;
ContractBuffer();
} }
void FlowBuffer::ResetLineState() void FlowBuffer::ResetLineState()
@ -99,23 +109,41 @@ void FlowBuffer::ExpandBuffer(int length)
{ {
if ( buffer_length_ >= length ) if ( buffer_length_ >= length )
return; return;
// So length > 0
if ( length < 512 ) if ( length < policy.min_capacity )
length = 512; length = policy.min_capacity;
if ( length < buffer_length_ * 2 ) if ( length < buffer_length_ * 2 )
length = buffer_length_ * 2; length = buffer_length_ * 2;
if ( length > policy.max_capacity )
{
std::string reason = strfmt("expand past max capacity %d/%zu",
length, policy.max_capacity);
throw ExceptionFlowBufferAlloc(reason.c_str());
}
// Allocate a new buffer and copy the existing contents // Allocate a new buffer and copy the existing contents
buffer_length_ = length; buffer_length_ = length;
unsigned char* new_buf = (unsigned char *) realloc(buffer_, buffer_length_); unsigned char* new_buf = (unsigned char *) realloc(buffer_, buffer_length_);
BINPAC_ASSERT(new_buf);
#if 0 if ( ! new_buf )
unsigned char* new_buf = new unsigned char[buffer_length_]; throw ExceptionFlowBufferAlloc("expand realloc OOM");
if ( buffer_ && buffer_n_ > 0 )
memcpy(new_buf, buffer_, buffer_n_); buffer_ = new_buf;
delete [] buffer_; }
#endif
void FlowBuffer::ContractBuffer()
{
if ( buffer_length_ < policy.contract_threshold )
return;
buffer_length_ = policy.min_capacity;
unsigned char* new_buf = (unsigned char *) realloc(buffer_, buffer_length_);
if ( ! new_buf )
throw ExceptionFlowBufferAlloc("contract realloc OOM");
buffer_ = new_buf; buffer_ = new_buf;
} }
@ -186,6 +214,7 @@ void FlowBuffer::DiscardData()
buffer_n_ = 0; buffer_n_ = 0;
frame_length_ = 0; frame_length_ = 0;
ContractBuffer();
} }
void FlowBuffer::set_eof() void FlowBuffer::set_eof()

View file

@ -8,6 +8,12 @@ namespace binpac {
class FlowBuffer { class FlowBuffer {
public: public:
struct Policy {
int max_capacity;
int min_capacity;
int contract_threshold;
};
enum LineBreakStyle { enum LineBreakStyle {
CR_OR_LF, // CR or LF or CRLF CR_OR_LF, // CR or LF or CRLF
STRICT_CRLF, // CR followed by LF STRICT_CRLF, // CR followed by LF
@ -95,6 +101,9 @@ public:
bool have_pending_request() const { return have_pending_request_; } bool have_pending_request() const { return have_pending_request_; }
static void init(Policy p)
{ policy = p; }
protected: protected:
// Reset the buffer for a new message // Reset the buffer for a new message
void NewMessage(); void NewMessage();
@ -106,6 +115,13 @@ protected:
// buffer. // buffer.
void ExpandBuffer(int length); void ExpandBuffer(int length);
// Contract the buffer to some minimum capacity.
// Existing contents in the buffer are preserved (but only usage
// at the time of creation this function is when the contents
// are being discarded due to parsing exception or have already been
// copied out after parsing a complete unit).
void ContractBuffer();
// Reset line state when transit from frame mode to line mode. // Reset line state when transit from frame mode to line mode.
void ResetLineState(); void ResetLineState();
@ -153,6 +169,8 @@ protected:
int data_seq_at_orig_data_end_; int data_seq_at_orig_data_end_;
bool eof_; bool eof_;
bool have_pending_request_; bool have_pending_request_;
static Policy policy;
}; };
typedef FlowBuffer *flow_buffer_t; typedef FlowBuffer *flow_buffer_t;

View file

@ -116,6 +116,15 @@ public:
} }
}; };
class ExceptionFlowBufferAlloc : public Exception
{
public:
ExceptionFlowBufferAlloc(const char* reason)
{
append(binpac_fmt("flowbuffer allocation failed: %s", reason));
}
};
} }
#endif // binpac_exception_h #endif // binpac_exception_h

View file

@ -13,7 +13,8 @@ namespace binpac
// //
// Note, this must be declared/defined here, and inline, because the RE // Note, this must be declared/defined here, and inline, because the RE
// functionality can only be used when compiling from inside Bro. // functionality can only be used when compiling from inside Bro.
inline void init(); // A copy is made of any FlowBuffer policy struct data passed.
inline void init(FlowBuffer::Policy* fbp = 0);
// Internal vector recording not yet compiled matchers. // Internal vector recording not yet compiled matchers.
extern std::vector<RE_Matcher*>* uncompiled_re_matchers; extern std::vector<RE_Matcher*>* uncompiled_re_matchers;
@ -42,7 +43,7 @@ public:
} }
private: private:
friend void ::binpac::init(); friend void ::binpac::init(FlowBuffer::Policy*);
// Function, and state, for compiling matchers. // Function, and state, for compiling matchers.
static void init(); static void init();
@ -68,9 +69,12 @@ inline void RegExMatcher::init()
uncompiled_re_matchers->clear(); uncompiled_re_matchers->clear();
} }
inline void init() inline void init(FlowBuffer::Policy* fbp)
{ {
RegExMatcher::init(); RegExMatcher::init();
if ( fbp )
FlowBuffer::init(*fbp);
} }
} // namespace binpac } // namespace binpac