mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
Merge branch 'topic/awelzel/3957-raw-reader-spinning'
* topic/awelzel/3957-raw-reader-spinning:
input/Raw: Rework GetLine()
(cherry picked from commit 2a23e9fc19
)
This commit is contained in:
parent
f5fefd17df
commit
300b7a11ac
12 changed files with 292 additions and 27 deletions
10
CHANGES
10
CHANGES
|
@ -1,3 +1,13 @@
|
||||||
|
7.0.3-10 | 2024-11-14 11:30:00 -0700
|
||||||
|
|
||||||
|
* GH-3957: input/Raw: Rework GetLine() (Arne Welzel, Corelight)
|
||||||
|
|
||||||
|
(cherry picked from commit 2a23e9fc1962419e41133689c2a682455d24e35e)
|
||||||
|
|
||||||
|
* GH-215: POP3: Rework unbounded pending command fix (Arne Welzel, Corelight)
|
||||||
|
|
||||||
|
(cherry picked from commit 2a23e9fc1962419e41133689c2a682455d24e35e)
|
||||||
|
|
||||||
7.0.3-9 | 2024-11-14 10:21:55 -0700
|
7.0.3-9 | 2024-11-14 10:21:55 -0700
|
||||||
|
|
||||||
* import of GH-4022 BTest additions (Vern Paxson, Corelight)
|
* import of GH-4022 BTest additions (Vern Paxson, Corelight)
|
||||||
|
|
2
VERSION
2
VERSION
|
@ -1 +1 @@
|
||||||
7.0.3-9
|
7.0.3-10
|
||||||
|
|
|
@ -46,6 +46,7 @@ Raw::Raw(ReaderFrontend* frontend) : ReaderBackend(frontend), file(nullptr, fclo
|
||||||
sep_length = BifConst::InputRaw::record_separator->Len();
|
sep_length = BifConst::InputRaw::record_separator->Len();
|
||||||
|
|
||||||
bufpos = 0;
|
bufpos = 0;
|
||||||
|
bufsize = 0;
|
||||||
|
|
||||||
stdin_fileno = fileno(stdin);
|
stdin_fileno = fileno(stdin);
|
||||||
stdout_fileno = fileno(stdout);
|
stdout_fileno = fileno(stdout);
|
||||||
|
@ -420,59 +421,74 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie
|
||||||
|
|
||||||
int64_t Raw::GetLine(FILE* arg_file) {
|
int64_t Raw::GetLine(FILE* arg_file) {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
int pos = 0; // strstr_n only works on ints - so no use to use something different here
|
|
||||||
int offset = 0;
|
|
||||||
|
|
||||||
if ( ! buf )
|
if ( ! buf ) {
|
||||||
buf = std::unique_ptr<char[]>(new char[block_size]);
|
buf = std::unique_ptr<char[]>(new char[block_size]);
|
||||||
|
bufpos = 0;
|
||||||
int repeats = 1;
|
bufsize = block_size;
|
||||||
|
}
|
||||||
|
|
||||||
for ( ;; ) {
|
for ( ;; ) {
|
||||||
size_t readbytes = fread(buf.get() + bufpos + offset, 1, block_size - bufpos, arg_file);
|
size_t readbytes = fread(buf.get() + bufpos, 1, bufsize - bufpos, arg_file);
|
||||||
pos += bufpos + readbytes;
|
|
||||||
// printf("Pos: %d\n", pos);
|
|
||||||
bufpos = offset = 0; // read full block size in next read...
|
|
||||||
|
|
||||||
if ( pos == 0 && errno != 0 )
|
bufpos = bufpos + readbytes;
|
||||||
|
|
||||||
|
// Nothing in the buffer and errno set, yield.
|
||||||
|
if ( bufpos == 0 && errno != 0 )
|
||||||
break;
|
break;
|
||||||
|
|
||||||
// researching everything each time is a bit... cpu-intensive. But otherwise we have
|
// researching everything each time is a bit... cpu-intensive. But otherwise we have
|
||||||
// to deal with situations where the separator is multi-character and split over multiple
|
// to deal with situations where the separator is multi-character and split over multiple
|
||||||
// reads...
|
// reads...
|
||||||
int found = util::strstr_n(pos, (unsigned char*)buf.get(), separator.size(), (unsigned char*)separator.c_str());
|
//
|
||||||
|
// memmem() would be more appropriate, but not available on Windows.
|
||||||
|
int found = util::strstr_n(bufpos, reinterpret_cast<u_char*>(buf.get()), separator.size(),
|
||||||
|
reinterpret_cast<const u_char*>(separator.c_str()));
|
||||||
|
|
||||||
if ( found == -1 ) {
|
if ( found == -1 ) {
|
||||||
// we did not find it and have to search again in the next try. resize buffer....
|
// we did not find it and have to search again in the next try.
|
||||||
// but first check if we encountered the file end - because if we did this was it.
|
// but first check if we encountered the file end - because if we did this was it.
|
||||||
if ( feof(arg_file) != 0 ) {
|
if ( feof(arg_file) != 0 ) {
|
||||||
if ( pos == 0 )
|
if ( bufpos == 0 )
|
||||||
return -1; // signal EOF - and that we had no more data.
|
return -1; // signal EOF - and that we had no more data.
|
||||||
else {
|
else {
|
||||||
outbuf = std::move(buf); // buf is null after this
|
outbuf = std::move(buf); // buf is null after this
|
||||||
return pos;
|
return bufpos; // flush out remaining buffered data as line
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
repeats++;
|
// No separator found and buffer full, realloc and retry reading more right away.
|
||||||
// bah, we cannot use realloc because we would have to change the delete in the manager
|
if ( bufpos == bufsize ) {
|
||||||
// to a free.
|
std::unique_ptr<char[]> newbuf = std::unique_ptr<char[]>(new char[bufsize + block_size]);
|
||||||
std::unique_ptr<char[]> newbuf = std::unique_ptr<char[]>(new char[block_size * repeats]);
|
memcpy(newbuf.get(), buf.get(), bufsize);
|
||||||
memcpy(newbuf.get(), buf.get(), block_size * (repeats - 1));
|
|
||||||
buf = std::move(newbuf);
|
buf = std::move(newbuf);
|
||||||
offset = block_size * (repeats - 1);
|
bufsize = bufsize + block_size;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
// Short or empty read, some data in the buffer, but no separator found
|
||||||
|
// and also not EOF: This is likely reading from a pipe where the separator
|
||||||
|
// wasn't yet produced. Yield to retry on the next heartbeat.
|
||||||
|
return -2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
size_t sep_idx = static_cast<size_t>(found);
|
||||||
|
assert(sep_idx <= bufsize - sep_length);
|
||||||
|
size_t remaining = bufpos - sep_idx - sep_length;
|
||||||
|
|
||||||
outbuf = std::move(buf);
|
outbuf = std::move(buf);
|
||||||
|
|
||||||
if ( found < pos ) {
|
if ( remaining > 0 ) {
|
||||||
// we have leftovers. copy them into the buffer for the next line
|
// we have leftovers. copy them into the buffer for the next line
|
||||||
|
assert(remaining <= block_size);
|
||||||
buf = std::unique_ptr<char[]>(new char[block_size]);
|
buf = std::unique_ptr<char[]>(new char[block_size]);
|
||||||
memcpy(buf.get(), outbuf.get() + found + sep_length, pos - found - sep_length);
|
bufpos = remaining;
|
||||||
bufpos = pos - found - sep_length;
|
bufsize = block_size;
|
||||||
|
|
||||||
|
memcpy(buf.get(), outbuf.get() + sep_idx + sep_length, remaining);
|
||||||
}
|
}
|
||||||
|
|
||||||
return found;
|
return sep_idx;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,7 +58,8 @@ private:
|
||||||
std::string separator;
|
std::string separator;
|
||||||
unsigned int sep_length; // length of the separator
|
unsigned int sep_length; // length of the separator
|
||||||
|
|
||||||
int bufpos;
|
size_t bufpos; // Where in buf to read more data.
|
||||||
|
size_t bufsize; // Currently allocated size of buf.
|
||||||
std::unique_ptr<char[]> buf;
|
std::unique_ptr<char[]> buf;
|
||||||
std::unique_ptr<char[]> outbuf;
|
std::unique_ptr<char[]> outbuf;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
|
Input::EVENT_NEW, aaa
|
||||||
|
Input::EVENT_NEW, bbb
|
||||||
|
Input::EVENT_NEW, final
|
|
@ -0,0 +1,4 @@
|
||||||
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
|
Input::EVENT_NEW, aaa
|
||||||
|
Input::EVENT_NEW, bbb
|
||||||
|
Input::EVENT_NEW, final
|
|
@ -0,0 +1,4 @@
|
||||||
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
|
Input::EVENT_NEW, 24612, binary start\x00\x00\x00\x00, \x00\x00\x00\x00\x00binary done
|
||||||
|
Input::EVENT_NEW, 3, ccc, ccc
|
||||||
|
Input::EVENT_NEW, 5, final, final
|
|
@ -0,0 +1,4 @@
|
||||||
|
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
|
||||||
|
Input::EVENT_NEW, aaa-bbb-ccc
|
||||||
|
Input::EVENT_NEW, aaa-bbb-ccc
|
||||||
|
Input::EVENT_NEW, final
|
|
@ -0,0 +1,52 @@
|
||||||
|
# @TEST-DOC: Launching a program that produces output slowly and strangely separated.
|
||||||
|
# @TEST-EXEC: chmod +x run.sh
|
||||||
|
# @TEST-EXEC: btest-bg-run zeek zeek -b %INPUT
|
||||||
|
# @TEST-EXEC: btest-bg-wait 10
|
||||||
|
# @TEST-EXEC: btest-diff zeek/.stdout
|
||||||
|
|
||||||
|
redef exit_only_after_terminate = T;
|
||||||
|
|
||||||
|
redef Threading::heartbeat_interval = 0.01sec;
|
||||||
|
|
||||||
|
@TEST-START-FILE run.sh
|
||||||
|
#!/usr/bin/env bash
|
||||||
|
echo -e -n "aaa\nb"
|
||||||
|
sleep 0.1
|
||||||
|
echo -e -n "bb\nfi"
|
||||||
|
sleep 0.1
|
||||||
|
echo "nal"
|
||||||
|
|
||||||
|
sleep infinity
|
||||||
|
@TEST-END-FILE
|
||||||
|
|
||||||
|
module A;
|
||||||
|
|
||||||
|
type Val: record {
|
||||||
|
s: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
global lines = 0;
|
||||||
|
|
||||||
|
event one_line(description: Input::EventDescription, tpe: Input::Event, s: string)
|
||||||
|
{
|
||||||
|
print tpe, s;
|
||||||
|
++lines;
|
||||||
|
|
||||||
|
if ( lines == 3 )
|
||||||
|
{
|
||||||
|
Input::remove("input");
|
||||||
|
terminate();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
event zeek_init()
|
||||||
|
{
|
||||||
|
Input::add_event([
|
||||||
|
$name="run",
|
||||||
|
$source="../run.sh |",
|
||||||
|
$reader=Input::READER_RAW,
|
||||||
|
$mode=Input::STREAM,
|
||||||
|
$fields=Val,
|
||||||
|
$ev=one_line, $want_record=F,
|
||||||
|
]);
|
||||||
|
}
|
|
@ -0,0 +1,53 @@
|
||||||
|
# @TEST-DOC: Launching a program that doesn't end it's final line with a \n
|
||||||
|
# @TEST-EXEC: chmod +x run.sh
|
||||||
|
# @TEST-EXEC: btest-bg-run zeek zeek -b %INPUT
|
||||||
|
# @TEST-EXEC: btest-bg-wait 10
|
||||||
|
# @TEST-EXEC: btest-diff zeek/.stdout
|
||||||
|
|
||||||
|
redef exit_only_after_terminate = T;
|
||||||
|
|
||||||
|
redef Threading::heartbeat_interval = 0.01sec;
|
||||||
|
|
||||||
|
@TEST-START-FILE run.sh
|
||||||
|
#!/usr/bin/env bash
|
||||||
|
sleep 0.1
|
||||||
|
echo "aaa"
|
||||||
|
sleep 0.1
|
||||||
|
echo "bbb"
|
||||||
|
sleep 0.1
|
||||||
|
echo -n "final"
|
||||||
|
|
||||||
|
sleep 0.1
|
||||||
|
exit 0
|
||||||
|
@TEST-END-FILE
|
||||||
|
|
||||||
|
module A;
|
||||||
|
|
||||||
|
type Val: record {
|
||||||
|
s: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
global lines = 0;
|
||||||
|
|
||||||
|
event one_line(description: Input::EventDescription, tpe: Input::Event, s: string)
|
||||||
|
{
|
||||||
|
print tpe, s;
|
||||||
|
++lines;
|
||||||
|
if ( lines == 3 )
|
||||||
|
{
|
||||||
|
Input::remove("input");
|
||||||
|
terminate();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
event zeek_init()
|
||||||
|
{
|
||||||
|
Input::add_event([
|
||||||
|
$name="run",
|
||||||
|
$source="../run.sh |",
|
||||||
|
$reader=Input::READER_RAW,
|
||||||
|
$mode=Input::STREAM,
|
||||||
|
$fields=Val,
|
||||||
|
$ev=one_line, $want_record=F,
|
||||||
|
]);
|
||||||
|
}
|
|
@ -0,0 +1,62 @@
|
||||||
|
# @TEST-DOC: Launching a program that produces output slowly and exercises buffering.
|
||||||
|
# @TEST-EXEC: chmod +x run.sh
|
||||||
|
# @TEST-EXEC: btest-bg-run zeek zeek -b %INPUT
|
||||||
|
# @TEST-EXEC: btest-bg-wait 10
|
||||||
|
# @TEST-EXEC: btest-diff zeek/.stdout
|
||||||
|
|
||||||
|
redef exit_only_after_terminate = T;
|
||||||
|
|
||||||
|
redef Threading::heartbeat_interval = 0.01sec;
|
||||||
|
|
||||||
|
@TEST-START-FILE run.sh
|
||||||
|
#!/usr/bin/env bash
|
||||||
|
sleep 0.1
|
||||||
|
echo -n "binary start"
|
||||||
|
sleep 0.1
|
||||||
|
dd if=/dev/zero bs=1 count=8192
|
||||||
|
sleep 0.1
|
||||||
|
echo -n "binary middle"
|
||||||
|
sleep 0.1
|
||||||
|
dd if=/dev/zero bs=1 count=8192
|
||||||
|
sleep 0.1
|
||||||
|
dd if=/dev/zero bs=1 count=8192
|
||||||
|
sleep 0.1
|
||||||
|
echo "binary done"
|
||||||
|
sleep 0.1
|
||||||
|
echo "ccc"
|
||||||
|
sleep 0.1
|
||||||
|
echo "final"
|
||||||
|
|
||||||
|
sleep infinity
|
||||||
|
@TEST-END-FILE
|
||||||
|
|
||||||
|
module A;
|
||||||
|
|
||||||
|
type Val: record {
|
||||||
|
s: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
global lines = 0;
|
||||||
|
|
||||||
|
event one_line(description: Input::EventDescription, tpe: Input::Event, s: string)
|
||||||
|
{
|
||||||
|
print tpe,|s|, s[:16], s[-16:];
|
||||||
|
++lines;
|
||||||
|
if ( lines == 3 )
|
||||||
|
{
|
||||||
|
Input::remove("input");
|
||||||
|
terminate();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
event zeek_init()
|
||||||
|
{
|
||||||
|
Input::add_event([
|
||||||
|
$name="run",
|
||||||
|
$source="../run.sh |",
|
||||||
|
$reader=Input::READER_RAW,
|
||||||
|
$mode=Input::STREAM,
|
||||||
|
$fields=Val,
|
||||||
|
$ev=one_line, $want_record=F,
|
||||||
|
]);
|
||||||
|
}
|
|
@ -0,0 +1,55 @@
|
||||||
|
# @TEST-DOC: Launching a program that produces output slowly puts the raw reader into an endless loop.
|
||||||
|
# @TEST-EXEC: chmod +x run.sh
|
||||||
|
# @TEST-EXEC: btest-bg-run zeek zeek -b %INPUT
|
||||||
|
# @TEST-EXEC: btest-bg-wait 10
|
||||||
|
# @TEST-EXEC: btest-diff zeek/.stdout
|
||||||
|
|
||||||
|
redef exit_only_after_terminate = T;
|
||||||
|
|
||||||
|
redef Threading::heartbeat_interval = 0.01sec;
|
||||||
|
|
||||||
|
@TEST-START-FILE run.sh
|
||||||
|
#!/usr/bin/env bash
|
||||||
|
sleep 0.1
|
||||||
|
echo -n "aaa-"
|
||||||
|
sleep 0.1
|
||||||
|
echo -n "bbb-"
|
||||||
|
sleep 0.1
|
||||||
|
echo "ccc"
|
||||||
|
sleep 0.1
|
||||||
|
echo "aaa-bbb-ccc"
|
||||||
|
echo "final"
|
||||||
|
|
||||||
|
sleep infinity
|
||||||
|
@TEST-END-FILE
|
||||||
|
|
||||||
|
module A;
|
||||||
|
|
||||||
|
type Val: record {
|
||||||
|
s: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
global lines = 0;
|
||||||
|
|
||||||
|
event one_line(description: Input::EventDescription, tpe: Input::Event, s: string)
|
||||||
|
{
|
||||||
|
print tpe, s;
|
||||||
|
++lines;
|
||||||
|
if ( lines == 3 )
|
||||||
|
{
|
||||||
|
Input::remove("input");
|
||||||
|
terminate();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
event zeek_init()
|
||||||
|
{
|
||||||
|
Input::add_event([
|
||||||
|
$name="run",
|
||||||
|
$source="../run.sh |",
|
||||||
|
$reader=Input::READER_RAW,
|
||||||
|
$mode=Input::STREAM,
|
||||||
|
$fields=Val,
|
||||||
|
$ev=one_line, $want_record=F,
|
||||||
|
]);
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue