From ecfa03ea1d02cd1be01ec2dc07cc43027b599388 Mon Sep 17 00:00:00 2001 From: Arne Welzel Date: Wed, 2 Oct 2024 09:27:06 +0200 Subject: [PATCH] input/Raw: Rework GetLine() This isn't a straightforward fix, unfortunately. The existing GetLine() implementation didn't deal well with input that's incrementally produced where individually read chunks wouldn't end with the separator. The prior implementation increased the buffer each time it failed to find a separator in the current buffer, but then also ended up not searching the full new buffer size for the terminator, doing that endlessly. This change reworks the Raw reader to rely only on bufpos for reading and searching purposes and skip reallocation if the buffer size if it wasn't actually exhausted. Closes #3957 --- src/input/readers/raw/Raw.cc | 66 ++++++++++++------- src/input/readers/raw/Raw.h | 3 +- .../zeek..stdout | 4 ++ .../zeek..stdout | 4 ++ .../zeek..stdout | 4 ++ .../zeek..stdout | 4 ++ .../input/raw/executestream-leftover.zeek | 52 +++++++++++++++ .../raw/executestream-no-last-separator.zeek | 53 +++++++++++++++ .../input/raw/executestream-slow-long.zeek | 62 +++++++++++++++++ .../input/raw/executestream-slow.zeek | 55 ++++++++++++++++ 10 files changed, 281 insertions(+), 26 deletions(-) create mode 100644 testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream-leftover/zeek..stdout create mode 100644 testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream-no-last-separator/zeek..stdout create mode 100644 testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream-slow-long/zeek..stdout create mode 100644 testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream-slow/zeek..stdout create mode 100644 testing/btest/scripts/base/frameworks/input/raw/executestream-leftover.zeek create mode 100644 testing/btest/scripts/base/frameworks/input/raw/executestream-no-last-separator.zeek create mode 100644 testing/btest/scripts/base/frameworks/input/raw/executestream-slow-long.zeek create mode 100644 testing/btest/scripts/base/frameworks/input/raw/executestream-slow.zeek diff --git a/src/input/readers/raw/Raw.cc b/src/input/readers/raw/Raw.cc index 5161843af0..af8c434bbf 100644 --- a/src/input/readers/raw/Raw.cc +++ b/src/input/readers/raw/Raw.cc @@ -46,6 +46,7 @@ Raw::Raw(ReaderFrontend* frontend) : ReaderBackend(frontend), file(nullptr, fclo sep_length = BifConst::InputRaw::record_separator->Len(); bufpos = 0; + bufsize = 0; stdin_fileno = fileno(stdin); stdout_fileno = fileno(stdout); @@ -420,59 +421,74 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie int64_t Raw::GetLine(FILE* arg_file) { errno = 0; - int pos = 0; // strstr_n only works on ints - so no use to use something different here - int offset = 0; - if ( ! buf ) + if ( ! buf ) { buf = std::unique_ptr(new char[block_size]); - - int repeats = 1; + bufpos = 0; + bufsize = block_size; + } for ( ;; ) { - size_t readbytes = fread(buf.get() + bufpos + offset, 1, block_size - bufpos, arg_file); - pos += bufpos + readbytes; - // printf("Pos: %d\n", pos); - bufpos = offset = 0; // read full block size in next read... + size_t readbytes = fread(buf.get() + bufpos, 1, bufsize - bufpos, arg_file); - if ( pos == 0 && errno != 0 ) + bufpos = bufpos + readbytes; + + // Nothing in the buffer and errno set, yield. + if ( bufpos == 0 && errno != 0 ) break; // researching everything each time is a bit... cpu-intensive. But otherwise we have // to deal with situations where the separator is multi-character and split over multiple // reads... - int found = util::strstr_n(pos, (unsigned char*)buf.get(), separator.size(), (unsigned char*)separator.c_str()); + // + // memmem() would be more appropriate, but not available on Windows. + int found = util::strstr_n(bufpos, reinterpret_cast(buf.get()), separator.size(), + reinterpret_cast(separator.c_str())); if ( found == -1 ) { - // we did not find it and have to search again in the next try. resize buffer.... + // we did not find it and have to search again in the next try. // but first check if we encountered the file end - because if we did this was it. if ( feof(arg_file) != 0 ) { - if ( pos == 0 ) + if ( bufpos == 0 ) return -1; // signal EOF - and that we had no more data. else { outbuf = std::move(buf); // buf is null after this - return pos; + return bufpos; // flush out remaining buffered data as line } } - repeats++; - // bah, we cannot use realloc because we would have to change the delete in the manager - // to a free. - std::unique_ptr newbuf = std::unique_ptr(new char[block_size * repeats]); - memcpy(newbuf.get(), buf.get(), block_size * (repeats - 1)); - buf = std::move(newbuf); - offset = block_size * (repeats - 1); + // No separator found and buffer full, realloc and retry reading more right away. + if ( bufpos == bufsize ) { + std::unique_ptr newbuf = std::unique_ptr(new char[bufsize + block_size]); + memcpy(newbuf.get(), buf.get(), bufsize); + buf = std::move(newbuf); + bufsize = bufsize + block_size; + } + else { + // Short or empty read, some data in the buffer, but no separator found + // and also not EOF: This is likely reading from a pipe where the separator + // wasn't yet produced. Yield to retry on the next heartbeat. + return -2; + } } else { + size_t sep_idx = static_cast(found); + assert(sep_idx <= bufsize - sep_length); + size_t remaining = bufpos - sep_idx - sep_length; + outbuf = std::move(buf); - if ( found < pos ) { + if ( remaining > 0 ) { // we have leftovers. copy them into the buffer for the next line + assert(remaining <= block_size); buf = std::unique_ptr(new char[block_size]); - memcpy(buf.get(), outbuf.get() + found + sep_length, pos - found - sep_length); - bufpos = pos - found - sep_length; + bufpos = remaining; + bufsize = block_size; + + memcpy(buf.get(), outbuf.get() + sep_idx + sep_length, remaining); } - return found; + return sep_idx; } } diff --git a/src/input/readers/raw/Raw.h b/src/input/readers/raw/Raw.h index e7f788afda..1fc1b52097 100644 --- a/src/input/readers/raw/Raw.h +++ b/src/input/readers/raw/Raw.h @@ -58,7 +58,8 @@ private: std::string separator; unsigned int sep_length; // length of the separator - int bufpos; + size_t bufpos; // Where in buf to read more data. + size_t bufsize; // Currently allocated size of buf. std::unique_ptr buf; std::unique_ptr outbuf; diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream-leftover/zeek..stdout b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream-leftover/zeek..stdout new file mode 100644 index 0000000000..9097f9f60b --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream-leftover/zeek..stdout @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Input::EVENT_NEW, aaa +Input::EVENT_NEW, bbb +Input::EVENT_NEW, final diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream-no-last-separator/zeek..stdout b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream-no-last-separator/zeek..stdout new file mode 100644 index 0000000000..9097f9f60b --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream-no-last-separator/zeek..stdout @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Input::EVENT_NEW, aaa +Input::EVENT_NEW, bbb +Input::EVENT_NEW, final diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream-slow-long/zeek..stdout b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream-slow-long/zeek..stdout new file mode 100644 index 0000000000..94bf7b6faf --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream-slow-long/zeek..stdout @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Input::EVENT_NEW, 24612, binary start\x00\x00\x00\x00, \x00\x00\x00\x00\x00binary done +Input::EVENT_NEW, 3, ccc, ccc +Input::EVENT_NEW, 5, final, final diff --git a/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream-slow/zeek..stdout b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream-slow/zeek..stdout new file mode 100644 index 0000000000..c561637236 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.frameworks.input.raw.executestream-slow/zeek..stdout @@ -0,0 +1,4 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Input::EVENT_NEW, aaa-bbb-ccc +Input::EVENT_NEW, aaa-bbb-ccc +Input::EVENT_NEW, final diff --git a/testing/btest/scripts/base/frameworks/input/raw/executestream-leftover.zeek b/testing/btest/scripts/base/frameworks/input/raw/executestream-leftover.zeek new file mode 100644 index 0000000000..f40a7d52a3 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/raw/executestream-leftover.zeek @@ -0,0 +1,52 @@ +# @TEST-DOC: Launching a program that produces output slowly and strangely separated. +# @TEST-EXEC: chmod +x run.sh +# @TEST-EXEC: btest-bg-run zeek zeek -b %INPUT +# @TEST-EXEC: btest-bg-wait 10 +# @TEST-EXEC: btest-diff zeek/.stdout + +redef exit_only_after_terminate = T; + +redef Threading::heartbeat_interval = 0.01sec; + +@TEST-START-FILE run.sh +#!/usr/bin/env bash +echo -e -n "aaa\nb" +sleep 0.1 +echo -e -n "bb\nfi" +sleep 0.1 +echo "nal" + +sleep infinity +@TEST-END-FILE + +module A; + +type Val: record { + s: string; +}; + +global lines = 0; + +event one_line(description: Input::EventDescription, tpe: Input::Event, s: string) + { + print tpe, s; + ++lines; + + if ( lines == 3 ) + { + Input::remove("input"); + terminate(); + } + } + +event zeek_init() + { + Input::add_event([ + $name="run", + $source="../run.sh |", + $reader=Input::READER_RAW, + $mode=Input::STREAM, + $fields=Val, + $ev=one_line, $want_record=F, + ]); + } diff --git a/testing/btest/scripts/base/frameworks/input/raw/executestream-no-last-separator.zeek b/testing/btest/scripts/base/frameworks/input/raw/executestream-no-last-separator.zeek new file mode 100644 index 0000000000..eeb77ac884 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/raw/executestream-no-last-separator.zeek @@ -0,0 +1,53 @@ +# @TEST-DOC: Launching a program that doesn't end it's final line with a \n +# @TEST-EXEC: chmod +x run.sh +# @TEST-EXEC: btest-bg-run zeek zeek -b %INPUT +# @TEST-EXEC: btest-bg-wait 10 +# @TEST-EXEC: btest-diff zeek/.stdout + +redef exit_only_after_terminate = T; + +redef Threading::heartbeat_interval = 0.01sec; + +@TEST-START-FILE run.sh +#!/usr/bin/env bash +sleep 0.1 +echo "aaa" +sleep 0.1 +echo "bbb" +sleep 0.1 +echo -n "final" + +sleep 0.1 +exit 0 +@TEST-END-FILE + +module A; + +type Val: record { + s: string; +}; + +global lines = 0; + +event one_line(description: Input::EventDescription, tpe: Input::Event, s: string) + { + print tpe, s; + ++lines; + if ( lines == 3 ) + { + Input::remove("input"); + terminate(); + } + } + +event zeek_init() + { + Input::add_event([ + $name="run", + $source="../run.sh |", + $reader=Input::READER_RAW, + $mode=Input::STREAM, + $fields=Val, + $ev=one_line, $want_record=F, + ]); + } diff --git a/testing/btest/scripts/base/frameworks/input/raw/executestream-slow-long.zeek b/testing/btest/scripts/base/frameworks/input/raw/executestream-slow-long.zeek new file mode 100644 index 0000000000..da6e489b4e --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/raw/executestream-slow-long.zeek @@ -0,0 +1,62 @@ +# @TEST-DOC: Launching a program that produces output slowly and exercises buffering. +# @TEST-EXEC: chmod +x run.sh +# @TEST-EXEC: btest-bg-run zeek zeek -b %INPUT +# @TEST-EXEC: btest-bg-wait 10 +# @TEST-EXEC: btest-diff zeek/.stdout + +redef exit_only_after_terminate = T; + +redef Threading::heartbeat_interval = 0.01sec; + +@TEST-START-FILE run.sh +#!/usr/bin/env bash +sleep 0.1 +echo -n "binary start" +sleep 0.1 +dd if=/dev/zero bs=1 count=8192 +sleep 0.1 +echo -n "binary middle" +sleep 0.1 +dd if=/dev/zero bs=1 count=8192 +sleep 0.1 +dd if=/dev/zero bs=1 count=8192 +sleep 0.1 +echo "binary done" +sleep 0.1 +echo "ccc" +sleep 0.1 +echo "final" + +sleep infinity +@TEST-END-FILE + +module A; + +type Val: record { + s: string; +}; + +global lines = 0; + +event one_line(description: Input::EventDescription, tpe: Input::Event, s: string) + { + print tpe,|s|, s[:16], s[-16:]; + ++lines; + if ( lines == 3 ) + { + Input::remove("input"); + terminate(); + } + } + +event zeek_init() + { + Input::add_event([ + $name="run", + $source="../run.sh |", + $reader=Input::READER_RAW, + $mode=Input::STREAM, + $fields=Val, + $ev=one_line, $want_record=F, + ]); + } diff --git a/testing/btest/scripts/base/frameworks/input/raw/executestream-slow.zeek b/testing/btest/scripts/base/frameworks/input/raw/executestream-slow.zeek new file mode 100644 index 0000000000..d221805b33 --- /dev/null +++ b/testing/btest/scripts/base/frameworks/input/raw/executestream-slow.zeek @@ -0,0 +1,55 @@ +# @TEST-DOC: Launching a program that produces output slowly puts the raw reader into an endless loop. +# @TEST-EXEC: chmod +x run.sh +# @TEST-EXEC: btest-bg-run zeek zeek -b %INPUT +# @TEST-EXEC: btest-bg-wait 10 +# @TEST-EXEC: btest-diff zeek/.stdout + +redef exit_only_after_terminate = T; + +redef Threading::heartbeat_interval = 0.01sec; + +@TEST-START-FILE run.sh +#!/usr/bin/env bash +sleep 0.1 +echo -n "aaa-" +sleep 0.1 +echo -n "bbb-" +sleep 0.1 +echo "ccc" +sleep 0.1 +echo "aaa-bbb-ccc" +echo "final" + +sleep infinity +@TEST-END-FILE + +module A; + +type Val: record { + s: string; +}; + +global lines = 0; + +event one_line(description: Input::EventDescription, tpe: Input::Event, s: string) + { + print tpe, s; + ++lines; + if ( lines == 3 ) + { + Input::remove("input"); + terminate(); + } + } + +event zeek_init() + { + Input::add_event([ + $name="run", + $source="../run.sh |", + $reader=Input::READER_RAW, + $mode=Input::STREAM, + $fields=Val, + $ev=one_line, $want_record=F, + ]); + }