input/Raw: Rework GetLine()

This isn't a straightforward fix, unfortunately. The existing GetLine()
implementation didn't deal well with input that's incrementally produced
where individually read chunks wouldn't end with the separator.

The prior implementation increased the buffer each time it failed to find
a separator in the current buffer, but then also ended up not searching the
full new buffer size for the terminator, doing that endlessly.

This change reworks the Raw reader to rely only on bufpos for reading
and searching purposes and skip reallocation if the buffer size if it
wasn't actually exhausted.

Closes #3957
This commit is contained in:
Arne Welzel 2024-10-02 09:27:06 +02:00 committed by Christian Kreibich
parent 4656faed6c
commit ecfa03ea1d
10 changed files with 281 additions and 26 deletions

View file

@ -46,6 +46,7 @@ Raw::Raw(ReaderFrontend* frontend) : ReaderBackend(frontend), file(nullptr, fclo
sep_length = BifConst::InputRaw::record_separator->Len(); sep_length = BifConst::InputRaw::record_separator->Len();
bufpos = 0; bufpos = 0;
bufsize = 0;
stdin_fileno = fileno(stdin); stdin_fileno = fileno(stdin);
stdout_fileno = fileno(stdout); stdout_fileno = fileno(stdout);
@ -420,59 +421,74 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie
int64_t Raw::GetLine(FILE* arg_file) { int64_t Raw::GetLine(FILE* arg_file) {
errno = 0; errno = 0;
int pos = 0; // strstr_n only works on ints - so no use to use something different here
int offset = 0;
if ( ! buf ) if ( ! buf ) {
buf = std::unique_ptr<char[]>(new char[block_size]); buf = std::unique_ptr<char[]>(new char[block_size]);
bufpos = 0;
int repeats = 1; bufsize = block_size;
}
for ( ;; ) { for ( ;; ) {
size_t readbytes = fread(buf.get() + bufpos + offset, 1, block_size - bufpos, arg_file); size_t readbytes = fread(buf.get() + bufpos, 1, bufsize - bufpos, arg_file);
pos += bufpos + readbytes;
// printf("Pos: %d\n", pos);
bufpos = offset = 0; // read full block size in next read...
if ( pos == 0 && errno != 0 ) bufpos = bufpos + readbytes;
// Nothing in the buffer and errno set, yield.
if ( bufpos == 0 && errno != 0 )
break; break;
// researching everything each time is a bit... cpu-intensive. But otherwise we have // researching everything each time is a bit... cpu-intensive. But otherwise we have
// to deal with situations where the separator is multi-character and split over multiple // to deal with situations where the separator is multi-character and split over multiple
// reads... // reads...
int found = util::strstr_n(pos, (unsigned char*)buf.get(), separator.size(), (unsigned char*)separator.c_str()); //
// memmem() would be more appropriate, but not available on Windows.
int found = util::strstr_n(bufpos, reinterpret_cast<u_char*>(buf.get()), separator.size(),
reinterpret_cast<const u_char*>(separator.c_str()));
if ( found == -1 ) { if ( found == -1 ) {
// we did not find it and have to search again in the next try. resize buffer.... // we did not find it and have to search again in the next try.
// but first check if we encountered the file end - because if we did this was it. // but first check if we encountered the file end - because if we did this was it.
if ( feof(arg_file) != 0 ) { if ( feof(arg_file) != 0 ) {
if ( pos == 0 ) if ( bufpos == 0 )
return -1; // signal EOF - and that we had no more data. return -1; // signal EOF - and that we had no more data.
else { else {
outbuf = std::move(buf); // buf is null after this outbuf = std::move(buf); // buf is null after this
return pos; return bufpos; // flush out remaining buffered data as line
} }
} }
repeats++; // No separator found and buffer full, realloc and retry reading more right away.
// bah, we cannot use realloc because we would have to change the delete in the manager if ( bufpos == bufsize ) {
// to a free. std::unique_ptr<char[]> newbuf = std::unique_ptr<char[]>(new char[bufsize + block_size]);
std::unique_ptr<char[]> newbuf = std::unique_ptr<char[]>(new char[block_size * repeats]); memcpy(newbuf.get(), buf.get(), bufsize);
memcpy(newbuf.get(), buf.get(), block_size * (repeats - 1));
buf = std::move(newbuf); buf = std::move(newbuf);
offset = block_size * (repeats - 1); bufsize = bufsize + block_size;
} }
else { else {
// Short or empty read, some data in the buffer, but no separator found
// and also not EOF: This is likely reading from a pipe where the separator
// wasn't yet produced. Yield to retry on the next heartbeat.
return -2;
}
}
else {
size_t sep_idx = static_cast<size_t>(found);
assert(sep_idx <= bufsize - sep_length);
size_t remaining = bufpos - sep_idx - sep_length;
outbuf = std::move(buf); outbuf = std::move(buf);
if ( found < pos ) { if ( remaining > 0 ) {
// we have leftovers. copy them into the buffer for the next line // we have leftovers. copy them into the buffer for the next line
assert(remaining <= block_size);
buf = std::unique_ptr<char[]>(new char[block_size]); buf = std::unique_ptr<char[]>(new char[block_size]);
memcpy(buf.get(), outbuf.get() + found + sep_length, pos - found - sep_length); bufpos = remaining;
bufpos = pos - found - sep_length; bufsize = block_size;
memcpy(buf.get(), outbuf.get() + sep_idx + sep_length, remaining);
} }
return found; return sep_idx;
} }
} }

View file

@ -58,7 +58,8 @@ private:
std::string separator; std::string separator;
unsigned int sep_length; // length of the separator unsigned int sep_length; // length of the separator
int bufpos; size_t bufpos; // Where in buf to read more data.
size_t bufsize; // Currently allocated size of buf.
std::unique_ptr<char[]> buf; std::unique_ptr<char[]> buf;
std::unique_ptr<char[]> outbuf; std::unique_ptr<char[]> outbuf;

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Input::EVENT_NEW, aaa
Input::EVENT_NEW, bbb
Input::EVENT_NEW, final

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Input::EVENT_NEW, aaa
Input::EVENT_NEW, bbb
Input::EVENT_NEW, final

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Input::EVENT_NEW, 24612, binary start\x00\x00\x00\x00, \x00\x00\x00\x00\x00binary done
Input::EVENT_NEW, 3, ccc, ccc
Input::EVENT_NEW, 5, final, final

View file

@ -0,0 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Input::EVENT_NEW, aaa-bbb-ccc
Input::EVENT_NEW, aaa-bbb-ccc
Input::EVENT_NEW, final

View file

@ -0,0 +1,52 @@
# @TEST-DOC: Launching a program that produces output slowly and strangely separated.
# @TEST-EXEC: chmod +x run.sh
# @TEST-EXEC: btest-bg-run zeek zeek -b %INPUT
# @TEST-EXEC: btest-bg-wait 10
# @TEST-EXEC: btest-diff zeek/.stdout
redef exit_only_after_terminate = T;
redef Threading::heartbeat_interval = 0.01sec;
@TEST-START-FILE run.sh
#!/usr/bin/env bash
echo -e -n "aaa\nb"
sleep 0.1
echo -e -n "bb\nfi"
sleep 0.1
echo "nal"
sleep infinity
@TEST-END-FILE
module A;
type Val: record {
s: string;
};
global lines = 0;
event one_line(description: Input::EventDescription, tpe: Input::Event, s: string)
{
print tpe, s;
++lines;
if ( lines == 3 )
{
Input::remove("input");
terminate();
}
}
event zeek_init()
{
Input::add_event([
$name="run",
$source="../run.sh |",
$reader=Input::READER_RAW,
$mode=Input::STREAM,
$fields=Val,
$ev=one_line, $want_record=F,
]);
}

View file

@ -0,0 +1,53 @@
# @TEST-DOC: Launching a program that doesn't end it's final line with a \n
# @TEST-EXEC: chmod +x run.sh
# @TEST-EXEC: btest-bg-run zeek zeek -b %INPUT
# @TEST-EXEC: btest-bg-wait 10
# @TEST-EXEC: btest-diff zeek/.stdout
redef exit_only_after_terminate = T;
redef Threading::heartbeat_interval = 0.01sec;
@TEST-START-FILE run.sh
#!/usr/bin/env bash
sleep 0.1
echo "aaa"
sleep 0.1
echo "bbb"
sleep 0.1
echo -n "final"
sleep 0.1
exit 0
@TEST-END-FILE
module A;
type Val: record {
s: string;
};
global lines = 0;
event one_line(description: Input::EventDescription, tpe: Input::Event, s: string)
{
print tpe, s;
++lines;
if ( lines == 3 )
{
Input::remove("input");
terminate();
}
}
event zeek_init()
{
Input::add_event([
$name="run",
$source="../run.sh |",
$reader=Input::READER_RAW,
$mode=Input::STREAM,
$fields=Val,
$ev=one_line, $want_record=F,
]);
}

View file

@ -0,0 +1,62 @@
# @TEST-DOC: Launching a program that produces output slowly and exercises buffering.
# @TEST-EXEC: chmod +x run.sh
# @TEST-EXEC: btest-bg-run zeek zeek -b %INPUT
# @TEST-EXEC: btest-bg-wait 10
# @TEST-EXEC: btest-diff zeek/.stdout
redef exit_only_after_terminate = T;
redef Threading::heartbeat_interval = 0.01sec;
@TEST-START-FILE run.sh
#!/usr/bin/env bash
sleep 0.1
echo -n "binary start"
sleep 0.1
dd if=/dev/zero bs=1 count=8192
sleep 0.1
echo -n "binary middle"
sleep 0.1
dd if=/dev/zero bs=1 count=8192
sleep 0.1
dd if=/dev/zero bs=1 count=8192
sleep 0.1
echo "binary done"
sleep 0.1
echo "ccc"
sleep 0.1
echo "final"
sleep infinity
@TEST-END-FILE
module A;
type Val: record {
s: string;
};
global lines = 0;
event one_line(description: Input::EventDescription, tpe: Input::Event, s: string)
{
print tpe,|s|, s[:16], s[-16:];
++lines;
if ( lines == 3 )
{
Input::remove("input");
terminate();
}
}
event zeek_init()
{
Input::add_event([
$name="run",
$source="../run.sh |",
$reader=Input::READER_RAW,
$mode=Input::STREAM,
$fields=Val,
$ev=one_line, $want_record=F,
]);
}

View file

@ -0,0 +1,55 @@
# @TEST-DOC: Launching a program that produces output slowly puts the raw reader into an endless loop.
# @TEST-EXEC: chmod +x run.sh
# @TEST-EXEC: btest-bg-run zeek zeek -b %INPUT
# @TEST-EXEC: btest-bg-wait 10
# @TEST-EXEC: btest-diff zeek/.stdout
redef exit_only_after_terminate = T;
redef Threading::heartbeat_interval = 0.01sec;
@TEST-START-FILE run.sh
#!/usr/bin/env bash
sleep 0.1
echo -n "aaa-"
sleep 0.1
echo -n "bbb-"
sleep 0.1
echo "ccc"
sleep 0.1
echo "aaa-bbb-ccc"
echo "final"
sleep infinity
@TEST-END-FILE
module A;
type Val: record {
s: string;
};
global lines = 0;
event one_line(description: Input::EventDescription, tpe: Input::Event, s: string)
{
print tpe, s;
++lines;
if ( lines == 3 )
{
Input::remove("input");
terminate();
}
}
event zeek_init()
{
Input::add_event([
$name="run",
$source="../run.sh |",
$reader=Input::READER_RAW,
$mode=Input::STREAM,
$fields=Val,
$ev=one_line, $want_record=F,
]);
}