allow sending data to stdin of child process

This commit is contained in:
Bernhard Amann 2013-03-15 15:47:20 -07:00
parent fc42c71dfa
commit 3aeec7ec14
5 changed files with 166 additions and 24 deletions

View file

@ -19,6 +19,7 @@ using threading::Field;
const int Raw::block_size = 512; // how big do we expect our chunks of data to be...
Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend)
{
file = 0;
@ -41,6 +42,8 @@ Raw::Raw(ReaderFrontend *frontend) : ReaderBackend(frontend)
assert(stderr_fileno == 2);
childpid = -1;
stdin_towrite = 0; // by default do not open stdin
}
Raw::~Raw()
@ -53,6 +56,7 @@ void Raw::DoClose()
if ( file != 0 )
CloseInput();
if ( execute && childpid > 0 )
// kill child process
kill(childpid, 9); // TERMINATOR
@ -60,9 +64,8 @@ void Raw::DoClose()
bool Raw::Execute()
{
int stdout_pipe[2];
if (pipe(stdout_pipe) != 0)
if (pipe(pipes) != 0 || pipe(pipes+2) || pipe(pipes+4) )
{
Error(Fmt("Could not open pipe: %d", errno));
return false;
@ -77,8 +80,15 @@ bool Raw::Execute()
else if ( childpid == 0 )
{
// we are the child.
close(stdout_pipe[stdin_fileno]);
dup2(stdout_pipe[stdout_fileno], stdout_fileno);
close(pipes[stdout_in]);
dup2(pipes[stdout_out], stdout_fileno);
if ( stdin_towrite )
{
close(pipes[stdin_out]);
dup2(pipes[stdin_in], stdin_fileno);
}
//execv("/usr/bin/uname",test);
execl("/bin/sh", "sh", "-c", fname.c_str(), NULL);
fprintf(stderr, "Exec failed :(......\n");
@ -87,12 +97,18 @@ bool Raw::Execute()
else
{
// we are the parent
close(stdout_pipe[stdout_fileno]);
close(pipes[stdout_out]);
if ( Info().mode == MODE_STREAM )
fcntl(stdout_pipe[stdin_fileno], F_SETFL, O_NONBLOCK);
fcntl(pipes[stdout_in], F_SETFL, O_NONBLOCK);
if ( stdin_towrite )
{
close(pipes[stdin_in]);
fcntl(pipes[stdin_out], F_SETFL, O_NONBLOCK);
}
file = fdopen(stdout_pipe[stdin_fileno], "r");
file = fdopen(pipes[stdout_in], "r");
if ( file == 0 )
{
Error("Could not convert fileno to file");
@ -106,8 +122,7 @@ bool Raw::OpenInput()
{
if ( execute )
{
if ( ! Execute() )
return false;
return Execute();
}
else
{
@ -120,10 +135,6 @@ bool Raw::OpenInput()
}
}
//if ( execute && Info().mode == MODE_STREAM )
// fcntl(fileno(file), F_SETFL, O_NONBLOCK);
//fcntl(fileno(file), F_SETFD, FD_CLOEXEC);
return true;
}
@ -138,8 +149,11 @@ bool Raw::CloseInput()
Debug(DBG_INPUT, "Raw reader starting close");
#endif
if ( execute )
pclose(file);
if ( execute ) // we do not care if any of those fails. They should all be defined.
{
for ( int i = 0; i < 6; i ++ )
close(pipes[i]);
}
else
fclose(file);
@ -166,6 +180,13 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie
return false;
}
map<const char*, const char*>::const_iterator it = info.config.find("stdin"); // data that is sent to the child process
if ( it != info.config.end() )
{
stdin_string = it->second;
stdin_towrite = stdin_string.length();
}
if ( num_fields != 1 )
{
Error("Filter for raw reader contains more than one field. "
@ -214,9 +235,8 @@ bool Raw::DoInit(const ReaderInfo& info, int num_fields, const Field* const* fie
}
int64_t Raw::GetLine()
int64_t Raw::GetLine(FILE* arg_file)
{
errno = 0;
uint64_t pos = 0;
@ -227,7 +247,7 @@ int64_t Raw::GetLine()
for (;;)
{
size_t readbytes = fread(buf+bufpos, 1, block_size-bufpos, file);
size_t readbytes = fread(buf+bufpos, 1, block_size-bufpos, arg_file);
pos += bufpos + readbytes;
bufpos = 0; // read full block size in next read...
@ -240,7 +260,7 @@ int64_t Raw::GetLine()
{
// we did not find it and have to search again in the next try. resize buffer....
// but first check if we encountered the file end - because if we did this was it.
if ( feof(file) != 0 )
if ( feof(arg_file) != 0 )
{
outbuf = buf;
buf = 0;
@ -291,6 +311,28 @@ int64_t Raw::GetLine()
}
// write to the stdin of the child process
void Raw::WriteToStdin()
{
assert(stdin_towrite <= stdin_string.length());
uint64_t pos = stdin_string.length() - stdin_towrite;
errno = 0;
ssize_t written = write(pipes[stdin_out], stdin_string.c_str() + pos, stdin_towrite);
stdin_towrite -= written;
if ( errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK )
{
Error(Fmt("Writing to child process stdin failed: %d. Stopping writing at position %d", errno, pos));
stdin_towrite = 0;
close(pipes[stdin_out]);
}
if ( stdin_towrite == 0 ) // send EOF when we are done.
printf("Closing %d\n", pipes[stdin_out]);
close(pipes[stdin_out]);
}
// read the entire file and send appropriate thingies back to InputMgr
bool Raw::DoUpdate()
{
@ -344,8 +386,11 @@ bool Raw::DoUpdate()
assert (NumFields() == 1);
for ( ;; )
{
int64_t length = GetLine();
//printf("Read %lld bytes", length);
if ( stdin_towrite > 0 )
WriteToStdin();
int64_t length = GetLine(file);
//printf("Read %lld bytes\n", length);
if ( length == -3 )
return false;

View file

@ -29,7 +29,9 @@ protected:
private:
bool OpenInput();
bool CloseInput();
int64_t GetLine();
int64_t GetLine(FILE* file);
bool Execute();
void WriteToStdin();
string fname; // Source with a potential "|" removed.
FILE* file;
@ -40,10 +42,9 @@ private:
// options set from the script-level.
string separator;
unsigned int sep_length; // length of the separator
bool Execute();
static const int block_size;
uint32_t bufpos;
uint64_t bufpos;
char* buf;
char* outbuf;
@ -51,7 +52,21 @@ private:
int stdout_fileno;
int stderr_fileno;
string stdin_string;
uint64_t stdin_towrite;
int pipes[6];
pid_t childpid;
enum IoChannels {
stdout_in = 0,
stdout_out = 1,
stdin_in = 2,
stdin_out = 3,
stderr_in = 4,
stderr_out = 5
};
};
}

View file

@ -0,0 +1,36 @@
[source=cat |, reader=Input::READER_RAW, mode=Input::STREAM, name=input2, fields=<no value description>, want_record=F, ev=line
{
print outfile, A::description;
print outfile, A::tpe;
print outfile, A::s;
try = try + 1;
if (2 == try)
{
Input::remove(input2);
close(outfile);
terminate();
}
}, config={
[stdin] = hello^Jthere^A^B^C^D^E^A^B^Cyay
}]
Input::EVENT_NEW
hello
[source=cat |, reader=Input::READER_RAW, mode=Input::STREAM, name=input2, fields=<no value description>, want_record=F, ev=line
{
print outfile, A::description;
print outfile, A::tpe;
print outfile, A::s;
try = try + 1;
if (2 == try)
{
Input::remove(input2);
close(outfile);
terminate();
}
}, config={
[stdin] = hello^Jthere^A^B^C^D^E^A^B^Cyay
}]
Input::EVENT_NEW
there^A^B^C^D^E^A^B^Cyay

View file

@ -0,0 +1,2 @@
hello
thereyay

View file

@ -0,0 +1,44 @@
# @TEST-EXEC: btest-bg-run bro bro -b %INPUT
# @TEST-EXEC: btest-bg-wait -k 5
# @TEST-EXEC: btest-diff test.txt
# @TEST-EXEC: btest-diff out
redef exit_only_after_terminate = T;
@load base/frameworks/communication # let network-time run. otherwise there are no heartbeats...
global outfile: file;
global try: count;
module A;
type Val: record {
s: string;
};
event line(description: Input::EventDescription, tpe: Input::Event, s: string)
{
print outfile, description;
print outfile, tpe;
print outfile, s;
try = try + 1;
if ( try == 2 )
{
Input::remove("input2");
close(outfile);
terminate();
}
}
event bro_init()
{
local config_strings: table[string] of string = {
["stdin"] = "hello\nthere\1\2\3\4\5\1\2\3yay"
#["stdin"] = "yay"
};
try = 0;
outfile = open("../out");
Input::add_event([$source="cat > ../test.txt |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input", $fields=Val, $ev=line, $want_record=F, $config=config_strings]);
Input::remove("input");
Input::add_event([$source="cat |", $reader=Input::READER_RAW, $mode=Input::STREAM, $name="input2", $fields=Val, $ev=line, $want_record=F, $config=config_strings]);
}