Merge remote-tracking branch 'origin/topic/jsiwek/gh-1432-file-extraction-buffered-io'

* origin/topic/jsiwek/gh-1432-file-extraction-buffered-io:
  GH-1432: Use buffered IO for file extraction
This commit is contained in:
Jon Siwek 2021-03-10 12:29:13 -08:00
commit 6a0eae28a2
4 changed files with 70 additions and 11 deletions

View file

@ -1,4 +1,12 @@
4.1.0-dev.320 | 2021-03-10 12:29:13 -0800
* GH-1432: Use buffered IO for file extraction (Jon Siwek, Corelight)
This can improve performance significantly: ~3.5x faster when tested on
a large file passing data to the file analysis framework in small chunks
of 20 bytes.
4.1.0-dev.318 | 2021-03-10 12:05:47 -0800 4.1.0-dev.318 | 2021-03-10 12:05:47 -0800
* Avoid searching a directory for dynamic plugins multiple times (Jon Siwek, Corelight) * Avoid searching a directory for dynamic plugins multiple times (Jon Siwek, Corelight)

View file

@ -1 +1 @@
4.1.0-dev.318 4.1.0-dev.320

View file

@ -17,12 +17,21 @@ Extract::Extract(RecordValPtr args, file_analysis::File* file,
std::move(args), file), std::move(args), file),
filename(arg_filename), limit(arg_limit), depth(0) filename(arg_filename), limit(arg_limit), depth(0)
{ {
fd = open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC | O_APPEND, 0666);
if ( fd < 0 )
{
fd = 0;
char buf[128]; char buf[128];
file_stream = fopen(filename.data(), "w");
if ( file_stream )
{
// Try to ensure full buffering.
if ( setvbuf(file_stream, nullptr, _IOFBF, BUFSIZ) )
{
util::zeek_strerror_r(errno, buf, sizeof(buf));
reporter->Warning("cannot set buffering mode for %s: %s",
filename.data(), buf);
}
}
else
{
util::zeek_strerror_r(errno, buf, sizeof(buf)); util::zeek_strerror_r(errno, buf, sizeof(buf));
reporter->Error("cannot open %s: %s", filename.c_str(), buf); reporter->Error("cannot open %s: %s", filename.c_str(), buf);
} }
@ -30,8 +39,12 @@ Extract::Extract(RecordValPtr args, file_analysis::File* file,
Extract::~Extract() Extract::~Extract()
{ {
if ( fd ) if ( file_stream && fclose(file_stream) )
util::safe_close(fd); {
char buf[128];
util::zeek_strerror_r(errno, buf, sizeof(buf));
reporter->Error("cannot close %s: %s", filename.data(), buf);
}
} }
static const ValPtr& get_extract_field_val(const RecordValPtr& args, static const ValPtr& get_extract_field_val(const RecordValPtr& args,
@ -86,7 +99,7 @@ static bool check_limit_exceeded(uint64_t lim, uint64_t depth, uint64_t len, uin
bool Extract::DeliverStream(const u_char* data, uint64_t len) bool Extract::DeliverStream(const u_char* data, uint64_t len)
{ {
if ( ! fd ) if ( ! file_stream )
return false; return false;
uint64_t towrite = 0; uint64_t towrite = 0;
@ -106,21 +119,58 @@ bool Extract::DeliverStream(const u_char* data, uint64_t len)
limit_exceeded = check_limit_exceeded(limit, depth, len, &towrite); limit_exceeded = check_limit_exceeded(limit, depth, len, &towrite);
} }
char buf[128];
if ( towrite > 0 ) if ( towrite > 0 )
{ {
util::safe_write(fd, reinterpret_cast<const char*>(data), towrite); if ( fwrite(data, towrite, 1, file_stream) != 1 )
{
util::zeek_strerror_r(errno, buf, sizeof(buf));
reporter->Error("failed to write to extracted file %s: %s",
filename.data(), buf);
fclose(file_stream);
file_stream = nullptr;
return false;
}
depth += towrite; depth += towrite;
} }
// Assume we may not try to write anything more for a while due to reaching
// the extraction limit and the file analysis File still proceeding to
// do other analysis without destructing/closing this one until the very end,
// so flush anything currently buffered.
if ( limit_exceeded && fflush(file_stream) )
{
util::zeek_strerror_r(errno, buf, sizeof(buf));
reporter->Warning("cannot fflush extracted file %s: %s",
filename.data(), buf);
}
return ( ! limit_exceeded ); return ( ! limit_exceeded );
} }
bool Extract::Undelivered(uint64_t offset, uint64_t len) bool Extract::Undelivered(uint64_t offset, uint64_t len)
{ {
if ( ! file_stream )
return false;
if ( depth == offset ) if ( depth == offset )
{ {
char* tmp = new char[len](); char* tmp = new char[len]();
util::safe_write(fd, tmp, len);
if ( fwrite(tmp, len, 1, file_stream) != 1 )
{
char buf[128];
util::zeek_strerror_r(errno, buf, sizeof(buf));
reporter->Error("failed to write to extracted file %s: %s",
filename.data(), buf);
fclose(file_stream);
file_stream = nullptr;
delete [] tmp;
return false;
}
delete [] tmp; delete [] tmp;
depth += len; depth += len;
} }

View file

@ -3,6 +3,7 @@
#pragma once #pragma once
#include <string> #include <string>
#include <cstdio>
#include "zeek/Val.h" #include "zeek/Val.h"
#include "zeek/file_analysis/File.h" #include "zeek/file_analysis/File.h"
@ -72,7 +73,7 @@ protected:
private: private:
std::string filename; std::string filename;
int fd; FILE* file_stream;
uint64_t limit; uint64_t limit;
uint64_t depth; uint64_t depth;
}; };