Streaming reads from external commands work without blocking anything.

This commit is contained in:
Bernhard Amann 2013-03-15 13:58:41 -07:00
parent f2d67b5829
commit fc42c71dfa
4 changed files with 235 additions and 15 deletions

View file

@ -11,6 +11,7 @@
#include <stdio.h> #include <stdio.h>
#include <fcntl.h> #include <fcntl.h>
#include <errno.h> #include <errno.h>
#include <signal.h>
using namespace input::reader; using namespace input::reader;
using threading::Value; using threading::Value;
@ -38,6 +39,8 @@ Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend)
assert(stdin_fileno == 0); assert(stdin_fileno == 0);
assert(stdout_fileno == 1); assert(stdout_fileno == 1);
assert(stderr_fileno == 2); assert(stderr_fileno == 2);
childpid = -1;
} }
Raw::~Raw() Raw::~Raw()
@ -49,12 +52,15 @@ void Raw::DoClose()
{ {
if ( file != 0 ) if ( file != 0 )
CloseInput(); CloseInput();
if ( execute && childpid > 0 )
// kill child process
kill(childpid, 9); // TERMINATOR
} }
bool Raw::Execute() bool Raw::Execute()
{ {
int stdout_pipe[2]; int stdout_pipe[2];
pid_t pid;
if (pipe(stdout_pipe) != 0) if (pipe(stdout_pipe) != 0)
{ {
@ -62,13 +68,13 @@ bool Raw::Execute()
return false; return false;
} }
pid = fork(); childpid = fork();
if ( pid < 0 ) if ( childpid < 0 )
{ {
Error(Fmt("Could not create child process: %d", errno)); Error(Fmt("Could not create child process: %d", errno));
return false; return false;
} }
else if ( pid == 0 ) else if ( childpid == 0 )
{ {
// we are the child. // we are the child.
close(stdout_pipe[stdin_fileno]); close(stdout_pipe[stdin_fileno]);
@ -82,6 +88,10 @@ bool Raw::Execute()
{ {
// we are the parent // we are the parent
close(stdout_pipe[stdout_fileno]); close(stdout_pipe[stdout_fileno]);
if ( Info().mode == MODE_STREAM )
fcntl(stdout_pipe[stdin_fileno], F_SETFL, O_NONBLOCK);
file = fdopen(stdout_pipe[stdin_fileno], "r"); file = fdopen(stdout_pipe[stdin_fileno], "r");
if ( file == 0 ) if ( file == 0 )
{ {
@ -102,6 +112,7 @@ bool Raw::OpenInput()
else else
{ {
file = fopen(fname.c_str(), "r"); file = fopen(fname.c_str(), "r");
fcntl(fileno(file), F_SETFD, FD_CLOEXEC);
if ( !file ) if ( !file )
{ {
Error(Fmt("Init: cannot open %s", fname.c_str())); Error(Fmt("Init: cannot open %s", fname.c_str()));
@ -177,15 +188,6 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie
execute = true; execute = true;
fname = source.substr(0, fname.length() - 1); fname = source.substr(0, fname.length() - 1);
/*
if ( (info.mode != MODE_MANUAL) )
{
Error(Fmt("Unsupported read mode %d for source %s in execution mode",
info.mode, fname.c_str()));
return false;
}
*/
result = OpenInput(); result = OpenInput();
} }
@ -229,7 +231,7 @@ int64_t Raw::GetLine()
pos += bufpos + readbytes; pos += bufpos + readbytes;
bufpos = 0; // read full block size in next read... bufpos = 0; // read full block size in next read...
if ( errno != 0 ) if ( pos == 0 && errno != 0 )
break; break;
char* token = strnstr(buf, separator.c_str(), block_size*repeats-pos); char* token = strnstr(buf, separator.c_str(), block_size*repeats-pos);
@ -279,7 +281,7 @@ int64_t Raw::GetLine()
if ( errno == 0 ) { if ( errno == 0 ) {
assert(false); assert(false);
} else if ( errno == EAGAIN || errno == EAGAIN || errno == EINTR ) { } else if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ) {
return -2; return -2;
} else { } else {
// an error code we did no expect. This probably is bad. // an error code we did no expect. This probably is bad.
@ -343,6 +345,8 @@ bool Raw::DoUpdate()
for ( ;; ) for ( ;; )
{ {
int64_t length = GetLine(); int64_t length = GetLine();
//printf("Read %lld bytes", length);
if ( length == -3 ) if ( length == -3 )
return false; return false;
else if ( length == -2 || length == -1 ) else if ( length == -2 || length == -1 )

View file

@ -50,6 +50,8 @@ private:
int stdin_fileno; int stdin_fileno;
int stdout_fileno; int stdout_fileno;
int stderr_fileno; int stderr_fileno;
pid_t childpid;
}; };
} }

View file

@ -0,0 +1,153 @@
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (8 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
}]
Input::EVENT_NEW
sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (8 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
}]
Input::EVENT_NEW
DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (8 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
}]
Input::EVENT_NEW
q3r3057fdf
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (8 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
}]
Input::EVENT_NEW
sdfs\d
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (8 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
}]
Input::EVENT_NEW
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (8 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
}]
Input::EVENT_NEW
dfsdf
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (8 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
}]
Input::EVENT_NEW
sdf
[source=tail -f ../input.log |, reader=Input::READER_RAW, mode=Input::STREAM, name=input, fields=<no value description>, want_record=F, ev=line
{
print A::outfile, A::description;
print A::outfile, A::tpe;
print A::outfile, A::s;
A::try = A::try + 1;
if (8 == A::try)
{
print A::outfile, done;
close(A::outfile);
Input::remove(input);
terminate();
}
}, config={
}]
Input::EVENT_NEW
3rw43wRRERLlL#RWERERERE.
done

View file

@ -0,0 +1,61 @@
# @TEST-EXEC: cp input1.log input.log
# @TEST-EXEC: btest-bg-run bro bro -b %INPUT
# @TEST-EXEC: sleep 3
# @TEST-EXEC: cat input2.log >> input.log
# @TEST-EXEC: sleep 3
# @TEST-EXEC: cat input3.log >> input.log
# @TEST-EXEC: btest-bg-wait -k 5
# @TEST-EXEC: btest-diff out
redef exit_only_after_terminate = T;
@TEST-START-FILE input1.log
sdfkh:KH;fdkncv;ISEUp34:Fkdj;YVpIODhfDF
@TEST-END-FILE
@TEST-START-FILE input2.log
DSF"DFKJ"SDFKLh304yrsdkfj@#(*U$34jfDJup3UF
q3r3057fdf
@TEST-END-FILE
@TEST-START-FILE input3.log
sdfs\d
dfsdf
sdf
3rw43wRRERLlL#RWERERERE.
@TEST-END-FILE
@load base/frameworks/communication # let network-time run
module A;
type Val: record {
s: string;
};
global try: count;
global outfile: file;
event line(description: Input::EventDescription, tpe: Input::Event, s: string)
{
print outfile, description;
print outfile, tpe;
print outfile, s;
try = try + 1;
if ( try == 8 )
{
print outfile, "done";
close(outfile);
Input::remove("input");
terminate();
}
}
event bro_init()
{
outfile = open("../out");
try = 0;
Input::add_event([$source="tail -f ../input.log |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input", $fields=Val, $ev=line, $want_record=F]);
}