diff --git a/scripts/base/init-bare.bro b/scripts/base/init-bare.bro index cfe845eb4f..6439ab7668 100644 --- a/scripts/base/init-bare.bro +++ b/scripts/base/init-bare.bro @@ -3207,6 +3207,11 @@ const forward_remote_events = F &redef; ## more sophisticated script-level communication framework. const forward_remote_state_changes = F &redef; +## The number of IO chunks allowed to be buffered between the child +## and parent process of remote communication before Bro starts dropping +## connections to remote peers in an attempt to catch up. +const chunked_io_buffer_soft_cap = 800000 &redef; + ## Place-holder constant indicating "no peer". const PEER_ID_NONE = 0; diff --git a/src/ChunkedIO.cc b/src/ChunkedIO.cc index 722b209bcd..1e581806d6 100644 --- a/src/ChunkedIO.cc +++ b/src/ChunkedIO.cc @@ -137,20 +137,6 @@ bool ChunkedIOFd::Write(Chunk* chunk) chunk->len, fmt_bytes(chunk->data, min((uint32)20, chunk->len))); #endif - // Reject if our queue of pending chunks is way too large. Otherwise, - // memory could fill up if the other side doesn't read. - if ( stats.pending > MAX_BUFFERED_CHUNKS ) - { - DBG_LOG(DBG_CHUNKEDIO, "write queue full"); - -#ifdef DEBUG_COMMUNICATION - AddToBuffer("", false); -#endif - - errno = ENOSPC; - return false; - } - #ifdef DEBUG_COMMUNICATION AddToBuffer(chunk, false); #endif @@ -627,7 +613,7 @@ bool ChunkedIOFd::IsIdle() bool ChunkedIOFd::IsFillingUp() { - return stats.pending > MAX_BUFFERED_CHUNKS_SOFT; + return stats.pending > chunked_io_buffer_soft_cap; } iosource::FD_Set ChunkedIOFd::ExtraReadFDs() const @@ -838,15 +824,6 @@ bool ChunkedIOSSL::Write(Chunk* chunk) chunk->len, fmt_bytes(chunk->data, 20)); #endif - // Reject if our queue of pending chunks is way too large. Otherwise, - // memory could fill up if the other side doesn't read. - if ( stats.pending > MAX_BUFFERED_CHUNKS ) - { - DBG_LOG(DBG_CHUNKEDIO, "write queue full"); - errno = ENOSPC; - return false; - } - // Queue it. ++stats.pending; Queue* q = new Queue; diff --git a/src/ChunkedIO.h b/src/ChunkedIO.h index b590453a72..afb239b325 100644 --- a/src/ChunkedIO.h +++ b/src/ChunkedIO.h @@ -221,13 +221,6 @@ private: // than BUFFER_SIZE. static const uint32 FLAG_PARTIAL = 0x80000000; - // We report that we're filling up when there are more than this number - // of pending chunks. - static const uint32 MAX_BUFFERED_CHUNKS_SOFT = 400000; - - // Maximum number of chunks we store in memory before rejecting writes. - static const uint32 MAX_BUFFERED_CHUNKS = 500000; - char* read_buffer; uint32 read_len; uint32 read_pos; @@ -275,8 +268,6 @@ public: virtual void Stats(char* buffer, int length); private: - // Maximum number of chunks we store in memory before rejecting writes. - static const uint32 MAX_BUFFERED_CHUNKS = 500000; // Only returns true if all data has been read. If not, call // it again with the same parameters as long as error is not diff --git a/src/NetVar.cc b/src/NetVar.cc index 7c66b55bc2..0db40ca9ef 100644 --- a/src/NetVar.cc +++ b/src/NetVar.cc @@ -178,6 +178,7 @@ RecordType* peer; int forward_remote_state_changes; int forward_remote_events; int remote_check_sync_consistency; +bro_uint_t chunked_io_buffer_soft_cap; StringVal* ssl_ca_certificate; StringVal* ssl_private_key; @@ -276,6 +277,7 @@ void init_general_global_var() forward_remote_events = opt_internal_int("forward_remote_events"); remote_check_sync_consistency = opt_internal_int("remote_check_sync_consistency"); + chunked_io_buffer_soft_cap = opt_internal_unsigned("chunked_io_buffer_soft_cap"); ssl_ca_certificate = internal_val("ssl_ca_certificate")->AsStringVal(); ssl_private_key = internal_val("ssl_private_key")->AsStringVal(); diff --git a/src/NetVar.h b/src/NetVar.h index edd70d1ea6..9779884096 100644 --- a/src/NetVar.h +++ b/src/NetVar.h @@ -181,6 +181,7 @@ extern RecordType* peer; extern int forward_remote_state_changes; extern int forward_remote_events; extern int remote_check_sync_consistency; +extern bro_uint_t chunked_io_buffer_soft_cap; extern StringVal* ssl_ca_certificate; extern StringVal* ssl_private_key; diff --git a/src/RemoteSerializer.cc b/src/RemoteSerializer.cc index 08b7fe9fbf..d359c1ea5d 100644 --- a/src/RemoteSerializer.cc +++ b/src/RemoteSerializer.cc @@ -3464,7 +3464,8 @@ void SocketComm::Run() int a = select(max_fd + 1, &fd_read, &fd_write, &fd_except, 0); if ( selects % 100000 == 0 ) - Log(fmt("selects=%ld canwrites=%ld", selects, canwrites)); + Log(fmt("selects=%ld canwrites=%ld pending=%lu", + selects, canwrites, io->Stats()->pending)); if ( a < 0 ) // Ignore errors for now.