Removing FlowSrc.

We could bring this back, now derived from PktSrc (though strickly
speaking it's of course not *packets). But not sure if we want that,
as the input framework seems the better place to host it. Then it
would turns into a reader.
This commit is contained in:
Robin Sommer 2014-08-22 16:33:55 -07:00
parent 93e6a4a9db
commit ecf1e32f60
7 changed files with 6 additions and 333 deletions

View file

@ -279,7 +279,6 @@ set(bro_SRCS
EventRegistry.cc EventRegistry.cc
Expr.cc Expr.cc
File.cc File.cc
FlowSrc.cc
Frag.cc Frag.cc
Frame.cc Frame.cc
Func.cc Func.cc

View file

@ -1,227 +0,0 @@
// See the file "COPYING" in the main distribution directory for copyright.
//
// Written by Bernhard Ager, TU Berlin (2006/2007).
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <netdb.h>
#include "FlowSrc.h"
#include "Net.h"
#include "analyzer/protocol/netflow/netflow_pac.h"
#include <errno.h>
FlowSrc::FlowSrc()
{ // TODO: v9.
selectable_fd = -1;
data = 0;
pdu_len = -1;
exporter_ip = 0;
current_timestamp = next_timestamp = 0.0;
netflow_analyzer = new binpac::NetFlow::NetFlow_Analyzer();
}
FlowSrc::~FlowSrc()
{
delete netflow_analyzer;
}
void FlowSrc::GetFds(int* read, int* write, int* except)
{
if ( selectable_fd >= 0 )
*read = selectable_fd;
}
double FlowSrc::NextTimestamp(double* network_time)
{
if ( ! data && ! ExtractNextPDU() )
return -1.0;
else
return next_timestamp;
}
void FlowSrc::Process()
{
if ( ! data && ! ExtractNextPDU() )
return;
// This is normally done by calling net_packet_dispatch(),
// but as we don't have a packet to dispatch ...
net_update_time(next_timestamp);
expire_timers();
netflow_analyzer->downflow()->set_exporter_ip(exporter_ip);
// We handle exceptions in NewData (might have changed w/ new binpac).
netflow_analyzer->NewData(0, data, data + pdu_len);
data = 0;
}
void FlowSrc::Close()
{
safe_close(selectable_fd);
}
FlowSocketSrc::~FlowSocketSrc()
{
}
int FlowSocketSrc::ExtractNextPDU()
{
sockaddr_in from;
socklen_t fromlen = sizeof(from);
pdu_len = recvfrom(selectable_fd, buffer, NF_MAX_PKT_SIZE, 0,
(struct sockaddr*) &from, &fromlen);
if ( pdu_len < 0 )
{
reporter->Error("problem reading NetFlow data from socket");
data = 0;
next_timestamp = -1.0;
SetClosed(true);
return 0;
}
if ( fromlen != sizeof(from) )
{
reporter->Error("malformed NetFlow PDU");
return 0;
}
data = buffer;
exporter_ip = from.sin_addr.s_addr;
next_timestamp = current_time();
if ( next_timestamp < current_timestamp )
next_timestamp = current_timestamp;
else
current_timestamp = next_timestamp;
return 1;
}
FlowSocketSrc::FlowSocketSrc(const char* listen_parms)
{
int n = strlen(listen_parms) + 1;
char laddr[n], port[n], ident[n];
laddr[0] = port[0] = ident[0] = '\0';
int ret = sscanf(listen_parms, "%[^:]:%[^=]=%s", laddr, port, ident);
if ( ret < 2 )
{
snprintf(errbuf, BRO_FLOW_ERRBUF_SIZE,
"parsing your listen-spec went nuts: laddr='%s', port='%s'\n",
laddr[0] ? laddr : "", port[0] ? port : "");
SetClosed(true);
return;
}
const char* id = (ret == 3) ? ident : listen_parms;
netflow_analyzer->downflow()->set_identifier(id);
struct addrinfo aiprefs = {
0, PF_INET, SOCK_DGRAM, IPPROTO_UDP, 0, NULL, NULL, NULL
};
struct addrinfo* ainfo = 0;
if ( (ret = getaddrinfo(laddr, port, &aiprefs, &ainfo)) != 0 )
{
snprintf(errbuf, BRO_FLOW_ERRBUF_SIZE,
"getaddrinfo(%s, %s, ...): %s",
laddr, port, gai_strerror(ret));
SetClosed(true);
return;
}
if ( (selectable_fd = socket (PF_INET, SOCK_DGRAM, 0)) < 0 )
{
snprintf(errbuf, BRO_FLOW_ERRBUF_SIZE,
"socket: %s", strerror(errno));
SetClosed(true);
goto cleanup;
}
if ( bind (selectable_fd, ainfo->ai_addr, ainfo->ai_addrlen) < 0 )
{
snprintf(errbuf, BRO_FLOW_ERRBUF_SIZE,
"bind: %s", strerror(errno));
SetClosed(true);
goto cleanup;
}
cleanup:
freeaddrinfo(ainfo);
}
FlowFileSrc::~FlowFileSrc()
{
delete [] readfile;
}
int FlowFileSrc::ExtractNextPDU()
{
FlowFileSrcPDUHeader pdu_header;
if ( read(selectable_fd, &pdu_header, sizeof(pdu_header)) <
int(sizeof(pdu_header)) )
return Error(errno, "read header");
if ( pdu_header.pdu_length > NF_MAX_PKT_SIZE )
{
reporter->Error("NetFlow packet too long");
// Safely skip over the too-long PDU.
if ( lseek(selectable_fd, pdu_header.pdu_length, SEEK_CUR) < 0 )
return Error(errno, "lseek");
return 0;
}
if ( read(selectable_fd, buffer, pdu_header.pdu_length) <
pdu_header.pdu_length )
return Error(errno, "read data");
if ( next_timestamp < pdu_header.network_time )
{
next_timestamp = pdu_header.network_time;
current_timestamp = pdu_header.network_time;
}
else
current_timestamp = next_timestamp;
data = buffer;
pdu_len = pdu_header.pdu_length;
exporter_ip = pdu_header.ipaddr;
return 1;
}
FlowFileSrc::FlowFileSrc(const char* readfile)
{
int n = strlen(readfile) + 1;
char ident[n];
this->readfile = new char[n];
int ret = sscanf(readfile, "%[^=]=%s", this->readfile, ident);
const char* id = (ret == 2) ? ident : this->readfile;
netflow_analyzer->downflow()->set_identifier(id);
selectable_fd = open(this->readfile, O_RDONLY);
if ( selectable_fd < 0 )
{
SetClosed(true);
snprintf(errbuf, BRO_FLOW_ERRBUF_SIZE,
"open: %s", strerror(errno));
}
}
int FlowFileSrc::Error(int errlvl, const char* errmsg)
{
snprintf(errbuf, BRO_FLOW_ERRBUF_SIZE,
"%s: %s", errmsg, strerror(errlvl));
data = 0;
next_timestamp = -1.0;
SetClosed(true);
return 0;
}

View file

@ -1,84 +0,0 @@
// See the file "COPYING" in the main distribution directory for copyright.
//
// Written by Bernhard Ager, TU Berlin (2006/2007).
#ifndef flowsrc_h
#define flowsrc_h
#include "iosource/IOSource.h"
#include "NetVar.h"
#include "binpac.h"
#define BRO_FLOW_ERRBUF_SIZE 512
// TODO: 1500 is enough for v5 - how about the others?
// 65536 would be enough for any UDP packet.
#define NF_MAX_PKT_SIZE 8192
struct FlowFileSrcPDUHeader {
double network_time;
int pdu_length;
uint32 ipaddr;
};
// Avoid including netflow_pac.h by explicitly declaring the NetFlow_Analyzer.
namespace binpac {
namespace NetFlow {
class NetFlow_Analyzer;
}
}
class FlowSrc : public iosource::IOSource {
public:
virtual ~FlowSrc();
// IOSource interface:
bool IsReady();
void GetFds(int* read, int* write, int* except);
double NextTimestamp(double* network_time);
void Process();
const char* Tag() { return "FlowSrc"; }
const char* ErrorMsg() const { return errbuf; }
protected:
FlowSrc();
virtual int ExtractNextPDU() = 0;
virtual void Close();
int selectable_fd;
double current_timestamp;
double next_timestamp;
binpac::NetFlow::NetFlow_Analyzer* netflow_analyzer;
u_char buffer[NF_MAX_PKT_SIZE];
u_char* data;
int pdu_len;
uint32 exporter_ip; // in network byte order
char errbuf[BRO_FLOW_ERRBUF_SIZE];
};
class FlowSocketSrc : public FlowSrc {
public:
FlowSocketSrc(const char* listen_parms);
virtual ~FlowSocketSrc();
int ExtractNextPDU();
};
class FlowFileSrc : public FlowSrc {
public:
FlowFileSrc(const char* readfile);
~FlowFileSrc();
int ExtractNextPDU();
protected:
int Error(int errlvl, const char* errmsg);
char* readfile;
};
#endif

View file

@ -154,7 +154,6 @@ void net_update_time(double new_network_time)
} }
void net_init(name_list& interfaces, name_list& readfiles, void net_init(name_list& interfaces, name_list& readfiles,
name_list& netflows, name_list& flowfiles,
const char* writefile, const char* filter, const char* writefile, const char* filter,
int do_watchdog) int do_watchdog)
{ {

View file

@ -6,7 +6,6 @@
#include "net_util.h" #include "net_util.h"
#include "util.h" #include "util.h"
#include "List.h" #include "List.h"
#include "FlowSrc.h"
#include "Func.h" #include "Func.h"
#include "RemoteSerializer.h" #include "RemoteSerializer.h"
#include "iosource/IOSource.h" #include "iosource/IOSource.h"
@ -14,7 +13,6 @@
#include "iosource/pktsrc/PktDumper.h" #include "iosource/pktsrc/PktDumper.h"
extern void net_init(name_list& interfaces, name_list& readfiles, extern void net_init(name_list& interfaces, name_list& readfiles,
name_list& netflows, name_list& flowfiles,
const char* writefile, const char* filter, const char* writefile, const char* filter,
int do_watchdog); int do_watchdog);
extern void net_run(); extern void net_run();

View file

@ -4,6 +4,7 @@
#include "Analyzer.h" #include "Analyzer.h"
#include "Manager.h" #include "Manager.h"
#include "binpac.h"
#include "analyzer/protocol/pia/PIA.h" #include "analyzer/protocol/pia/PIA.h"
#include "../Event.h" #include "../Event.h"

View file

@ -449,8 +449,6 @@ int main(int argc, char** argv)
name_list interfaces; name_list interfaces;
name_list read_files; name_list read_files;
name_list netflows;
name_list flow_files;
name_list rule_files; name_list rule_files;
char* bst_file = 0; char* bst_file = 0;
char* id_name = 0; char* id_name = 0;
@ -552,7 +550,7 @@ int main(int argc, char** argv)
opterr = 0; opterr = 0;
char opts[256]; char opts[256];
safe_strncpy(opts, "B:D:e:f:I:i:K:l:n:p:R:r:s:T:t:U:w:x:X:y:Y:z:CFGLNOPSWabdghvZQ", safe_strncpy(opts, "B:D:e:f:I:i:K:l:n:p:R:r:s:T:t:U:w:x:X:z:CFGLNOPSWabdghvZQ",
sizeof(opts)); sizeof(opts));
#ifdef USE_PERFTOOLS_DEBUG #ifdef USE_PERFTOOLS_DEBUG
@ -612,10 +610,6 @@ int main(int argc, char** argv)
writefile = optarg; writefile = optarg;
break; break;
case 'y':
flow_files.append(optarg);
break;
case 'z': case 'z':
if ( streq(optarg, "notice") ) if ( streq(optarg, "notice") )
do_notice_analysis = 1; do_notice_analysis = 1;
@ -709,10 +703,6 @@ int main(int argc, char** argv)
do_watchdog = 1; do_watchdog = 1;
break; break;
case 'Y':
netflows.append(optarg);
break;
case 'h': case 'h':
usage(); usage();
break; break;
@ -800,8 +790,7 @@ int main(int argc, char** argv)
// seed the PRNG. We should do this here (but at least Linux, FreeBSD // seed the PRNG. We should do this here (but at least Linux, FreeBSD
// and Solaris provide /dev/urandom). // and Solaris provide /dev/urandom).
if ( (interfaces.length() > 0 || netflows.length() > 0) && if ( interfaces.length() > 0 && read_files.length() > 0 )
(read_files.length() > 0 || flow_files.length() > 0 ))
usage(); usage();
#ifdef USE_IDMEF #ifdef USE_IDMEF
@ -824,7 +813,7 @@ int main(int argc, char** argv)
plugin_mgr->SearchDynamicPlugins(bro_plugin_path()); plugin_mgr->SearchDynamicPlugins(bro_plugin_path());
if ( optind == argc && if ( optind == argc &&
read_files.length() == 0 && flow_files.length() == 0 && read_files.length() == 0 &&
interfaces.length() == 0 && interfaces.length() == 0 &&
! (id_name || bst_file) && ! command_line_policy && ! print_plugins ) ! (id_name || bst_file) && ! command_line_policy && ! print_plugins )
add_input_file("-"); add_input_file("-");
@ -983,8 +972,7 @@ int main(int argc, char** argv)
// ### Add support for debug command file. // ### Add support for debug command file.
dbg_init_debugger(0); dbg_init_debugger(0);
if ( (flow_files.length() == 0 || read_files.length() == 0) && if ( read_files.length() == 0 && interfaces.length() == 0 )
(netflows.length() == 0 || interfaces.length() == 0) )
{ {
Val* interfaces_val = internal_val("interfaces"); Val* interfaces_val = internal_val("interfaces");
if ( interfaces_val ) if ( interfaces_val )
@ -1002,8 +990,7 @@ int main(int argc, char** argv)
snaplen = internal_val("snaplen")->AsCount(); snaplen = internal_val("snaplen")->AsCount();
if ( dns_type != DNS_PRIME ) if ( dns_type != DNS_PRIME )
net_init(interfaces, read_files, netflows, flow_files, net_init(interfaces, read_files, writefile, "", do_watchdog);
writefile, "", do_watchdog);
BroFile::SetDefaultRotation(log_rotate_interval, log_max_size); BroFile::SetDefaultRotation(log_rotate_interval, log_max_size);