Fix attempt for "internal error: unknown msg type 115 in Poll()"

Under remote communication overload conditions, the child->parent
chunked IO may start rejecting chunks if over the hard cap.  Some
messages are made of two chunks, accepting the first part, but rejecting
the second can put the parent in a bad state and the next two chunks it
reads are likely to cause the error.

This patch just removes the rejecting functionality completely and so
now relies solely on shutting down remote peer connections to help
alleviate temporary overload conditions. The
"chunked_io_buffer_soft_cap" script variable can now tune when this
shutting down starts happening and the default setting is now double
what it used to be.  For constant overload conditions, communication.log
should keep stating "queue to parent filling up; shutting down heaviest
connection".

An alternative to completely removing the hard cap rejection code could
be ensuring that messages that involve a pair of chunks can never have
the second chunk be rejected when attempting to write it.

Addresses BIT-1376
This commit is contained in:
Jon Siwek 2015-04-16 17:02:44 -05:00
parent 8789d7f527
commit effeaa5b13
6 changed files with 11 additions and 34 deletions

View file

@ -3207,6 +3207,11 @@ const forward_remote_events = F &redef;
## more sophisticated script-level communication framework.
const forward_remote_state_changes = F &redef;
## The number of IO chunks allowed to be buffered between the child
## and parent process of remote communication before Bro starts dropping
## connections to remote peers in an attempt to catch up.
const chunked_io_buffer_soft_cap = 800000 &redef;
## Place-holder constant indicating "no peer".
const PEER_ID_NONE = 0;

View file

@ -137,20 +137,6 @@ bool ChunkedIOFd::Write(Chunk* chunk)
chunk->len, fmt_bytes(chunk->data, min((uint32)20, chunk->len)));
#endif
// Reject if our queue of pending chunks is way too large. Otherwise,
// memory could fill up if the other side doesn't read.
if ( stats.pending > MAX_BUFFERED_CHUNKS )
{
DBG_LOG(DBG_CHUNKEDIO, "write queue full");
#ifdef DEBUG_COMMUNICATION
AddToBuffer("<false:write-queue-full>", false);
#endif
errno = ENOSPC;
return false;
}
#ifdef DEBUG_COMMUNICATION
AddToBuffer(chunk, false);
#endif
@ -627,7 +613,7 @@ bool ChunkedIOFd::IsIdle()
bool ChunkedIOFd::IsFillingUp()
{
return stats.pending > MAX_BUFFERED_CHUNKS_SOFT;
return stats.pending > chunked_io_buffer_soft_cap;
}
iosource::FD_Set ChunkedIOFd::ExtraReadFDs() const
@ -838,15 +824,6 @@ bool ChunkedIOSSL::Write(Chunk* chunk)
chunk->len, fmt_bytes(chunk->data, 20));
#endif
// Reject if our queue of pending chunks is way too large. Otherwise,
// memory could fill up if the other side doesn't read.
if ( stats.pending > MAX_BUFFERED_CHUNKS )
{
DBG_LOG(DBG_CHUNKEDIO, "write queue full");
errno = ENOSPC;
return false;
}
// Queue it.
++stats.pending;
Queue* q = new Queue;

View file

@ -221,13 +221,6 @@ private:
// than BUFFER_SIZE.
static const uint32 FLAG_PARTIAL = 0x80000000;
// We report that we're filling up when there are more than this number
// of pending chunks.
static const uint32 MAX_BUFFERED_CHUNKS_SOFT = 400000;
// Maximum number of chunks we store in memory before rejecting writes.
static const uint32 MAX_BUFFERED_CHUNKS = 500000;
char* read_buffer;
uint32 read_len;
uint32 read_pos;
@ -275,8 +268,6 @@ public:
virtual void Stats(char* buffer, int length);
private:
// Maximum number of chunks we store in memory before rejecting writes.
static const uint32 MAX_BUFFERED_CHUNKS = 500000;
// Only returns true if all data has been read. If not, call
// it again with the same parameters as long as error is not

View file

@ -178,6 +178,7 @@ RecordType* peer;
int forward_remote_state_changes;
int forward_remote_events;
int remote_check_sync_consistency;
bro_uint_t chunked_io_buffer_soft_cap;
StringVal* ssl_ca_certificate;
StringVal* ssl_private_key;
@ -276,6 +277,7 @@ void init_general_global_var()
forward_remote_events = opt_internal_int("forward_remote_events");
remote_check_sync_consistency =
opt_internal_int("remote_check_sync_consistency");
chunked_io_buffer_soft_cap = opt_internal_unsigned("chunked_io_buffer_soft_cap");
ssl_ca_certificate = internal_val("ssl_ca_certificate")->AsStringVal();
ssl_private_key = internal_val("ssl_private_key")->AsStringVal();

View file

@ -181,6 +181,7 @@ extern RecordType* peer;
extern int forward_remote_state_changes;
extern int forward_remote_events;
extern int remote_check_sync_consistency;
extern bro_uint_t chunked_io_buffer_soft_cap;
extern StringVal* ssl_ca_certificate;
extern StringVal* ssl_private_key;

View file

@ -3464,7 +3464,8 @@ void SocketComm::Run()
int a = select(max_fd + 1, &fd_read, &fd_write, &fd_except, 0);
if ( selects % 100000 == 0 )
Log(fmt("selects=%ld canwrites=%ld", selects, canwrites));
Log(fmt("selects=%ld canwrites=%ld pending=%lu",
selects, canwrites, io->Stats()->pending));
if ( a < 0 )
// Ignore errors for now.