diff --git a/TODO.iosources b/TODO.iosources new file mode 100644 index 0000000000..7380c89b92 --- /dev/null +++ b/TODO.iosources @@ -0,0 +1,9 @@ +- Move the current_{iosrc,pkt_src,etc.} into manager +- Remove all 2ndary path code +- Remove all flow src code. +- Move pktsrc/*.{h,cc} up a level? Or create a subsublibrary there? +- Create a global Packet data structure and pass that around instead + of the pcap_* stuff? +- PktDumper: Move Dump() to public and remove Record() +- Wrap BPF_Program into namespace and clean up +- Tests, in particular the packet dumping needs testing. diff --git a/aux/broccoli b/aux/broccoli index e02ccc0a27..17ec437752 160000 --- a/aux/broccoli +++ b/aux/broccoli @@ -1 +1 @@ -Subproject commit e02ccc0a27e64b147f01e4c7deb5b897864d59d5 +Subproject commit 17ec437752837fb4214abfb0a2da49df74668d5d diff --git a/aux/broctl b/aux/broctl index 2e07720b4f..6e01d6972f 160000 --- a/aux/broctl +++ b/aux/broctl @@ -1 +1 @@ -Subproject commit 2e07720b4f129802e07ca99498e2aff4542c737a +Subproject commit 6e01d6972f02d68ee82d05f392d1a00725595b7f diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 8e22b504e4..1aede44934 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -151,10 +151,12 @@ list(APPEND BINPAC_OUTPUTS "${BINPAC_OUTPUT_CC}") set(bro_SUBDIR_LIBS CACHE INTERNAL "subdir libraries" FORCE) set(bro_PLUGIN_LIBS CACHE INTERNAL "plugin libraries" FORCE) +add_subdirectory(iosource) add_subdirectory(analyzer) add_subdirectory(file_analysis) add_subdirectory(probabilistic) add_subdirectory(broxygen) +add_subdirectory(iosource) set(bro_SUBDIRS ${bro_SUBDIR_LIBS} @@ -249,7 +251,8 @@ set(bro_SRCS Anon.cc Attr.cc Base64.cc - BPF_Program.cc + BroDoc.cc + BroDocObj.cc Brofiler.cc BroString.cc CCL.cc @@ -281,7 +284,6 @@ set(bro_SRCS Hash.cc ID.cc IntSet.cc - IOSource.cc IP.cc IPAddr.cc List.cc @@ -295,7 +297,6 @@ set(bro_SRCS PacketFilter.cc PacketSort.cc PersistenceSerializer.cc - PktSrc.cc PolicyFile.cc PrefixTable.cc PriorityQueue.cc diff --git a/src/DNS_Mgr.cc b/src/DNS_Mgr.cc index 17409a930b..4d96e21a3e 100644 --- a/src/DNS_Mgr.cc +++ b/src/DNS_Mgr.cc @@ -34,6 +34,7 @@ #include "Net.h" #include "Var.h" #include "Reporter.h" +#include "iosource/Manager.h" extern "C" { extern int select(int, fd_set *, fd_set *, fd_set *, struct timeval *); @@ -404,17 +405,17 @@ DNS_Mgr::~DNS_Mgr() delete [] dir; } -bool DNS_Mgr::Init() +void DNS_Mgr::Init() { if ( did_init ) - return true; + return; const char* cache_dir = dir ? dir : "."; if ( mode == DNS_PRIME && ! ensure_dir(cache_dir) ) { did_init = 0; - return false; + return; } cache_name = new char[strlen(cache_dir) + 64]; @@ -433,14 +434,12 @@ bool DNS_Mgr::Init() did_init = 1; - io_sources.Register(this, true); + iosource_mgr->Register(this, true); // We never set idle to false, having the main loop only calling us from // time to time. If we're issuing more DNS requests than we can handle // in this way, we are having problems anyway ... - idle = true; - - return true; + SetIdle(true); } TableVal* DNS_Mgr::LookupHost(const char* name) diff --git a/src/DNS_Mgr.h b/src/DNS_Mgr.h index bfcc70a5c2..069c7e1a2b 100644 --- a/src/DNS_Mgr.h +++ b/src/DNS_Mgr.h @@ -12,7 +12,7 @@ #include "BroList.h" #include "Dict.h" #include "EventHandler.h" -#include "IOSource.h" +#include "iosource/IOSource.h" #include "IPAddr.h" class Val; @@ -40,12 +40,12 @@ enum DNS_MgrMode { // Number of seconds we'll wait for a reply. #define DNS_TIMEOUT 5 -class DNS_Mgr : public IOSource { +class DNS_Mgr : public iosource::IOSource { public: DNS_Mgr(DNS_MgrMode mode); virtual ~DNS_Mgr(); - bool Init(); + void Init(); void Flush(); // Looks up the address or addresses of the given host, and returns diff --git a/src/DebugLogger.cc b/src/DebugLogger.cc index 78377eafcf..05f0f9e89f 100644 --- a/src/DebugLogger.cc +++ b/src/DebugLogger.cc @@ -17,7 +17,8 @@ DebugLogger::Stream DebugLogger::streams[NUM_DBGS] = { { "dpd", 0, false }, { "tm", 0, false }, { "logging", 0, false }, {"input", 0, false }, { "threading", 0, false }, { "file_analysis", 0, false }, - { "plugins", 0, false }, { "broxygen", 0, false } + { "plugins", 0, false }, { "broxygen", 0, false }, + { "pktio", 0, false} }; DebugLogger::DebugLogger(const char* filename) diff --git a/src/DebugLogger.h b/src/DebugLogger.h index d1f053788e..b098430a9a 100644 --- a/src/DebugLogger.h +++ b/src/DebugLogger.h @@ -29,6 +29,7 @@ enum DebugStream { DBG_FILE_ANALYSIS, // File analysis DBG_PLUGINS, DBG_BROXYGEN, + DBG_PKTIO, // Packet sources and dumpers. NUM_DBGS // Has to be last }; diff --git a/src/FlowSrc.cc b/src/FlowSrc.cc index 32aa4c4e3a..fb9676ab5b 100644 --- a/src/FlowSrc.cc +++ b/src/FlowSrc.cc @@ -15,7 +15,6 @@ FlowSrc::FlowSrc() { // TODO: v9. selectable_fd = -1; - idle = false; data = 0; pdu_len = -1; exporter_ip = 0; @@ -80,7 +79,7 @@ int FlowSocketSrc::ExtractNextPDU() reporter->Error("problem reading NetFlow data from socket"); data = 0; next_timestamp = -1.0; - closed = 1; + SetClosed(true); return 0; } @@ -115,7 +114,7 @@ FlowSocketSrc::FlowSocketSrc(const char* listen_parms) snprintf(errbuf, BRO_FLOW_ERRBUF_SIZE, "parsing your listen-spec went nuts: laddr='%s', port='%s'\n", laddr[0] ? laddr : "", port[0] ? port : ""); - closed = 1; + SetClosed(true); return; } @@ -131,7 +130,7 @@ FlowSocketSrc::FlowSocketSrc(const char* listen_parms) snprintf(errbuf, BRO_FLOW_ERRBUF_SIZE, "getaddrinfo(%s, %s, ...): %s", laddr, port, gai_strerror(ret)); - closed = 1; + SetClosed(true); return; } @@ -139,7 +138,7 @@ FlowSocketSrc::FlowSocketSrc(const char* listen_parms) { snprintf(errbuf, BRO_FLOW_ERRBUF_SIZE, "socket: %s", strerror(errno)); - closed = 1; + SetClosed(true); goto cleanup; } @@ -147,7 +146,7 @@ FlowSocketSrc::FlowSocketSrc(const char* listen_parms) { snprintf(errbuf, BRO_FLOW_ERRBUF_SIZE, "bind: %s", strerror(errno)); - closed = 1; + SetClosed(true); goto cleanup; } @@ -211,7 +210,7 @@ FlowFileSrc::FlowFileSrc(const char* readfile) selectable_fd = open(this->readfile, O_RDONLY); if ( selectable_fd < 0 ) { - closed = 1; + SetClosed(true); snprintf(errbuf, BRO_FLOW_ERRBUF_SIZE, "open: %s", strerror(errno)); } @@ -223,6 +222,6 @@ int FlowFileSrc::Error(int errlvl, const char* errmsg) "%s: %s", errmsg, strerror(errlvl)); data = 0; next_timestamp = -1.0; - closed = 1; + SetClosed(true); return 0; } diff --git a/src/FlowSrc.h b/src/FlowSrc.h index 03dda2761d..71c8b0cd11 100644 --- a/src/FlowSrc.h +++ b/src/FlowSrc.h @@ -5,7 +5,7 @@ #ifndef flowsrc_h #define flowsrc_h -#include "IOSource.h" +#include "iosource/IOSource.h" #include "NetVar.h" #include "binpac.h" @@ -28,7 +28,7 @@ namespace binpac { } } -class FlowSrc : public IOSource { +class FlowSrc : public iosource::IOSource { public: virtual ~FlowSrc(); diff --git a/src/Net.cc b/src/Net.cc index ac4dacf9b8..64baeff4aa 100644 --- a/src/Net.cc +++ b/src/Net.cc @@ -30,6 +30,9 @@ #include "PacketSort.h" #include "Serializer.h" #include "PacketDumper.h" +#include "iosource/Manager.h" +#include "iosource/pktsrc/PktSrc.h" +#include "iosource/pktsrc/PktDumper.h" extern "C" { #include "setsignal.h" @@ -39,10 +42,7 @@ extern "C" { extern int select(int, fd_set *, fd_set *, fd_set *, struct timeval *); } -PList(PktSrc) pkt_srcs; - -// FIXME: We should really merge PktDumper and PacketDumper. -PktDumper* pkt_dumper = 0; +iosource::PktDumper* pkt_dumper = 0; int reading_live = 0; int reading_traces = 0; @@ -65,8 +65,8 @@ const u_char* current_pkt = 0; int current_dispatched = 0; int current_hdr_size = 0; double current_timestamp = 0.0; -PktSrc* current_pktsrc = 0; -IOSource* current_iosrc; +iosource::PktSrc* current_pktsrc = 0; +iosource::IOSource* current_iosrc = 0; std::list files_scanned; std::vector sig_files; @@ -115,8 +115,8 @@ RETSIGTYPE watchdog(int /* signo */) // saving the packet which caused the // watchdog to trigger may be helpful, // so we'll save that one nevertheless. - pkt_dumper = new PktDumper("watchdog-pkt.pcap"); - if ( pkt_dumper->IsError() ) + pkt_dumper = iosource_mgr->OpenPktDumper("watchdog-pkt.pcap", false); + if ( ! pkt_dumper || pkt_dumper->IsError() ) { reporter->Error("watchdog: can't open watchdog-pkt.pcap for writing\n"); delete pkt_dumper; @@ -125,7 +125,12 @@ RETSIGTYPE watchdog(int /* signo */) } if ( pkt_dumper ) - pkt_dumper->Dump(current_hdr, current_pkt); + { + iosource::PktDumper::Packet p; + p.hdr = current_hdr; + p.data = current_pkt; + pkt_dumper->Record(&p); + } } net_get_final_stats(); @@ -157,18 +162,10 @@ void net_init(name_list& interfaces, name_list& readfiles, reading_traces = 1; for ( int i = 0; i < readfiles.length(); ++i ) - { - PktFileSrc* ps = new PktFileSrc(readfiles[i], filter); - - if ( ! ps->IsOpen() ) - reporter->FatalError("%s: problem with trace file %s - %s\n", - prog, readfiles[i], ps->ErrorMsg()); - else - { - pkt_srcs.append(ps); - io_sources.Register(ps); - } + iosource_mgr->OpenPktSrc(readfiles[i], filter, false); + } +#if 0 if ( secondary_filter ) { // We use a second PktFileSrc for the @@ -189,7 +186,6 @@ void net_init(name_list& interfaces, name_list& readfiles, ps->AddSecondaryTablePrograms(); } - } for ( int i = 0; i < flowfiles.length(); ++i ) { @@ -203,7 +199,7 @@ void net_init(name_list& interfaces, name_list& readfiles, io_sources.Register(fs); } } - } +#endif else if ((interfaces.length() > 0 || netflows.length() > 0)) { @@ -211,22 +207,13 @@ void net_init(name_list& interfaces, name_list& readfiles, reading_traces = 0; for ( int i = 0; i < interfaces.length(); ++i ) - { - PktSrc* ps; - ps = new PktInterfaceSrc(interfaces[i], filter); - - if ( ! ps->IsOpen() ) - reporter->FatalError("%s: problem with interface %s - %s\n", - prog, interfaces[i], ps->ErrorMsg()); - else - { - pkt_srcs.append(ps); - io_sources.Register(ps); - } + iosource_mgr->OpenPktSrc(interfaces[i], filter, true); + } +#if 0 if ( secondary_filter ) { - PktSrc* ps; + iosource::PktSrc* ps; ps = new PktInterfaceSrc(interfaces[i], filter, TYPE_FILTER_SECONDARY); @@ -258,8 +245,7 @@ void net_init(name_list& interfaces, name_list& readfiles, else io_sources.Register(fs); } - - } +#endif else // have_pending_timers = 1, possibly. We don't set @@ -270,12 +256,7 @@ void net_init(name_list& interfaces, name_list& readfiles, if ( writefile ) { - // ### This will fail horribly if there are multiple - // interfaces with different-lengthed media. - pkt_dumper = new PktDumper(writefile); - if ( pkt_dumper->IsError() ) - reporter->FatalError("%s: can't open write file \"%s\" - %s\n", - prog, writefile, pkt_dumper->ErrorMsg()); + pkt_dumper = iosource_mgr->OpenPktDumper(writefile, false); ID* id = global_scope()->Lookup("trace_output_file"); if ( ! id ) @@ -299,7 +280,7 @@ void net_init(name_list& interfaces, name_list& readfiles, } } -void expire_timers(PktSrc* src_ps) +void expire_timers(iosource::PktSrc* src_ps) { SegmentProfiler(segment_logger, "expiring-timers"); TimerMgr* tmgr = @@ -313,7 +294,7 @@ void expire_timers(PktSrc* src_ps) void net_packet_dispatch(double t, const struct pcap_pkthdr* hdr, const u_char* pkt, int hdr_size, - PktSrc* src_ps, PacketSortElement* pkt_elem) + iosource::PktSrc* src_ps, PacketSortElement* pkt_elem) { if ( ! bro_start_network_time ) bro_start_network_time = t; @@ -394,7 +375,7 @@ int process_packet_sorter(double latest_packet_time) void net_packet_arrival(double t, const struct pcap_pkthdr* hdr, const u_char* pkt, int hdr_size, - PktSrc* src_ps) + iosource::PktSrc* src_ps) { if ( packet_sorter ) { @@ -421,12 +402,12 @@ void net_run() { set_processing_status("RUNNING", "net_run"); - while ( io_sources.Size() || + while ( iosource_mgr->Size() || (packet_sorter && ! packet_sorter->Empty()) || (BifConst::exit_only_after_terminate && ! terminating) ) { double ts; - IOSource* src = io_sources.FindSoonest(&ts); + iosource::IOSource* src = iosource_mgr->FindSoonest(&ts); #ifdef DEBUG static int loop_counter = 0; @@ -535,16 +516,19 @@ void net_run() void net_get_final_stats() { - loop_over_list(pkt_srcs, i) + const iosource::Manager::PktSrcList& pkt_srcs(iosource_mgr->GetPktSrcs()); + + for ( iosource::Manager::PktSrcList::const_iterator i = pkt_srcs.begin(); + i != pkt_srcs.end(); i++ ) { - PktSrc* ps = pkt_srcs[i]; + iosource::PktSrc* ps = *i; if ( ps->IsLive() ) { - struct PktSrc::Stats s; + iosource::PktSrc::Stats s; ps->Statistics(&s); reporter->Info("%d packets received on interface %s, %d dropped\n", - s.received, ps->Interface(), s.dropped); + s.received, ps->Path().c_str(), s.dropped); } } } @@ -587,29 +571,6 @@ void net_delete() delete ip_anonymizer[i]; } -// net_packet_match -// -// Description: -// - Checks if a packet matches a filter. It just wraps up a call to -// [pcap.h's] bpf_filter(). -// -// Inputs: -// - fp: a BPF-compiled filter -// - pkt: a pointer to the packet -// - len: the original packet length -// - caplen: the captured packet length. This is pkt length -// -// Output: -// - return: 1 if the packet matches the filter, 0 otherwise - -int net_packet_match(BPF_Program* fp, const u_char* pkt, - u_int len, u_int caplen) - { - // NOTE: I don't like too much un-const'ing the pkt variable. - return bpf_filter(fp->GetProgram()->bf_insns, (u_char*) pkt, len, caplen); - } - - int _processing_suspended = 0; static double suspend_start = 0; @@ -627,8 +588,12 @@ void net_continue_processing() if ( _processing_suspended == 1 ) { reporter->Info("processing continued"); - loop_over_list(pkt_srcs, i) - pkt_srcs[i]->ContinueAfterSuspend(); + + const iosource::Manager::PktSrcList& pkt_srcs(iosource_mgr->GetPktSrcs()); + + for ( iosource::Manager::PktSrcList::const_iterator i = pkt_srcs.begin(); + i != pkt_srcs.end(); i++ ) + (*i)->ContinueAfterSuspend(); } --_processing_suspended; diff --git a/src/Net.h b/src/Net.h index 07c856d1dd..421bee5911 100644 --- a/src/Net.h +++ b/src/Net.h @@ -4,13 +4,18 @@ #define net_h #include "net_util.h" +<<<<<<< HEAD #include "util.h" #include "BPF_Program.h" +======= +>>>>>>> 5493253... Checkpoint. #include "List.h" -#include "PktSrc.h" #include "FlowSrc.h" #include "Func.h" #include "RemoteSerializer.h" +#include "iosource/IOSource.h" +#include "iosource/pktsrc/PktSrc.h" +#include "iosource/pktsrc/PktDumper.h" extern void net_init(name_list& interfaces, name_list& readfiles, name_list& netflows, name_list& flowfiles, @@ -22,10 +27,8 @@ extern void net_finish(int drain_events); extern void net_delete(); // Reclaim all memory, etc. extern void net_packet_arrival(double t, const struct pcap_pkthdr* hdr, const u_char* pkt, int hdr_size, - PktSrc* src_ps); -extern int net_packet_match(BPF_Program* fp, const u_char* pkt, - u_int len, u_int caplen); -extern void expire_timers(PktSrc* src_ps = 0); + iosource::PktSrc* src_ps); +extern void expire_timers(iosource::PktSrc* src_ps = 0); extern void termination_signal(); // Functions to temporarily suspend processing of live input (network packets @@ -82,13 +85,10 @@ extern const u_char* current_pkt; extern int current_dispatched; extern int current_hdr_size; extern double current_timestamp; -extern PktSrc* current_pktsrc; -extern IOSource* current_iosrc; +extern iosource::PktSrc* current_pktsrc; +extern iosource::IOSource* current_iosrc; -declare(PList,PktSrc); -extern PList(PktSrc) pkt_srcs; - -extern PktDumper* pkt_dumper; // where to save packets +extern iosource::PktDumper* pkt_dumper; // where to save packets extern char* writefile; diff --git a/src/PacketSort.cc b/src/PacketSort.cc index 429d8e2720..606d21b689 100644 --- a/src/PacketSort.cc +++ b/src/PacketSort.cc @@ -3,7 +3,7 @@ const bool DEBUG_packetsort = false; -PacketSortElement::PacketSortElement(PktSrc* arg_src, +PacketSortElement::PacketSortElement(iosource::PktSrc* arg_src, double arg_timestamp, const struct pcap_pkthdr* arg_hdr, const u_char* arg_pkt, int arg_hdr_size) { diff --git a/src/PacketSort.h b/src/PacketSort.h index 199da0732f..d61f66994e 100644 --- a/src/PacketSort.h +++ b/src/PacketSort.h @@ -16,16 +16,14 @@ enum { NUM_OF_PQ_LEVEL, }; -class PktSrc; - class PacketSortElement { public: - PacketSortElement(PktSrc* src, double timestamp, + PacketSortElement(iosource::PktSrc* src, double timestamp, const struct pcap_pkthdr* hdr, const u_char* pkt, int hdr_size); ~PacketSortElement(); - PktSrc* Src() const { return src; } + iosource::PktSrc* Src() const { return src; } double TimeStamp() const { return timestamp; } const struct pcap_pkthdr* Hdr() const { return &hdr; } const u_char* Pkt() const { return pkt; } @@ -33,7 +31,7 @@ public: const IP_Hdr* IPHdr() const { return ip_hdr; } protected: - PktSrc* src; + iosource::PktSrc* src; double timestamp; struct pcap_pkthdr hdr; u_char* pkt; diff --git a/src/PktSrc.cc b/src/PktSrc.cc deleted file mode 100644 index 9d6bce6fe9..0000000000 --- a/src/PktSrc.cc +++ /dev/null @@ -1,796 +0,0 @@ -// See the file "COPYING" in the main distribution directory for copyright. - -#include -#include - -#include "config.h" - -#include "util.h" -#include "PktSrc.h" -#include "Hash.h" -#include "Net.h" -#include "Sessions.h" - - -// ### This needs auto-confing. -#ifdef HAVE_PCAP_INT_H -#include -#endif - -PktSrc::PktSrc() - { - interface = readfile = 0; - data = last_data = 0; - memset(&hdr, 0, sizeof(hdr)); - hdr_size = 0; - datalink = 0; - netmask = 0xffffff00; - pd = 0; - idle = false; - - next_sync_point = 0; - first_timestamp = current_timestamp = next_timestamp = 0.0; - first_wallclock = current_wallclock = 0; - - stats.received = stats.dropped = stats.link = 0; - } - -PktSrc::~PktSrc() - { - Close(); - - loop_over_list(program_list, i) - delete program_list[i]; - - BPF_Program* code; - IterCookie* cookie = filters.InitForIteration(); - while ( (code = filters.NextEntry(cookie)) ) - delete code; - - delete [] interface; - delete [] readfile; - } - -void PktSrc::GetFds(int* read, int* write, int* except) - { - if ( pseudo_realtime ) - { - // Select would give erroneous results. But we simulate it - // by setting idle accordingly. - idle = CheckPseudoTime() == 0; - return; - } - - if ( selectable_fd >= 0 ) - *read = selectable_fd; - } - -int PktSrc::ExtractNextPacket() - { - // Don't return any packets if processing is suspended (except for the - // very first packet which we need to set up times). - if ( net_is_processing_suspended() && first_timestamp ) - { - idle = true; - return 0; - } - - data = last_data = pcap_next(pd, &hdr); - - if ( data && (hdr.len == 0 || hdr.caplen == 0) ) - { - sessions->Weird("empty_pcap_header", &hdr, data); - return 0; - } - - if ( data ) - next_timestamp = hdr.ts.tv_sec + double(hdr.ts.tv_usec) / 1e6; - - if ( pseudo_realtime ) - current_wallclock = current_time(true); - - if ( ! first_timestamp ) - first_timestamp = next_timestamp; - - idle = (data == 0); - - if ( data ) - ++stats.received; - - // Source has gone dry. If it's a network interface, this just means - // it's timed out. If it's a file, though, then the file has been - // exhausted. - if ( ! data && ! IsLive() ) - { - closed = true; - - if ( pseudo_realtime && using_communication ) - { - if ( remote_trace_sync_interval ) - remote_serializer->SendFinalSyncPoint(); - else - remote_serializer->Terminate(); - } - } - - return data != 0; - } - -double PktSrc::NextTimestamp(double* local_network_time) - { - if ( ! data && ! ExtractNextPacket() ) - return -1.0; - - if ( pseudo_realtime ) - { - // Delay packet if necessary. - double packet_time = CheckPseudoTime(); - if ( packet_time ) - return packet_time; - - idle = true; - return -1.0; - } - - return next_timestamp; - } - -void PktSrc::ContinueAfterSuspend() - { - current_wallclock = current_time(true); - } - -double PktSrc::CurrentPacketWallClock() - { - // We stop time when we are suspended. - if ( net_is_processing_suspended() ) - current_wallclock = current_time(true); - - return current_wallclock; - } - -double PktSrc::CheckPseudoTime() - { - if ( ! data && ! ExtractNextPacket() ) - return 0; - - if ( ! current_timestamp ) - return bro_start_time; - - if ( remote_trace_sync_interval ) - { - if ( next_sync_point == 0 || next_timestamp >= next_sync_point ) - { - int n = remote_serializer->SendSyncPoint(); - next_sync_point = first_timestamp + - n * remote_trace_sync_interval; - remote_serializer->Log(RemoteSerializer::LogInfo, - fmt("stopping at packet %.6f, next sync-point at %.6f", - current_timestamp, next_sync_point)); - - return 0; - } - } - - double pseudo_time = next_timestamp - first_timestamp; - double ct = (current_time(true) - first_wallclock) * pseudo_realtime; - - return pseudo_time <= ct ? bro_start_time + pseudo_time : 0; - } - -void PktSrc::Process() - { - if ( ! data && ! ExtractNextPacket() ) - return; - - current_timestamp = next_timestamp; - - int pkt_hdr_size = hdr_size; - - // Unfortunately some packets on the link might have MPLS labels - // while others don't. That means we need to ask the link-layer if - // labels are in place. - bool have_mpls = false; - - int protocol = 0; - - switch ( datalink ) { - case DLT_NULL: - { - protocol = (data[3] << 24) + (data[2] << 16) + (data[1] << 8) + data[0]; - - // From the Wireshark Wiki: "AF_INET6, unfortunately, has - // different values in {NetBSD,OpenBSD,BSD/OS}, - // {FreeBSD,DragonFlyBSD}, and {Darwin/Mac OS X}, so an IPv6 - // packet might have a link-layer header with 24, 28, or 30 - // as the AF_ value." As we may be reading traces captured on - // platforms other than what we're running on, we accept them - // all here. - if ( protocol != AF_INET - && protocol != AF_INET6 - && protocol != 24 - && protocol != 28 - && protocol != 30 ) - { - sessions->Weird("non_ip_packet_in_null_transport", &hdr, data); - data = 0; - return; - } - - break; - } - - case DLT_EN10MB: - { - // Get protocol being carried from the ethernet frame. - protocol = (data[12] << 8) + data[13]; - - switch ( protocol ) - { - // MPLS carried over the ethernet frame. - case 0x8847: - have_mpls = true; - break; - - // VLAN carried over the ethernet frame. - case 0x8100: - data += get_link_header_size(datalink); - data += 4; // Skip the vlan header - pkt_hdr_size = 0; - - // Check for 802.1ah (Q-in-Q) containing IP. - // Only do a second layer of vlan tag - // stripping because there is no - // specification that allows for deeper - // nesting. - if ( ((data[2] << 8) + data[3]) == 0x0800 ) - data += 4; - - break; - - // PPPoE carried over the ethernet frame. - case 0x8864: - data += get_link_header_size(datalink); - protocol = (data[6] << 8) + data[7]; - data += 8; // Skip the PPPoE session and PPP header - pkt_hdr_size = 0; - - if ( protocol != 0x0021 && protocol != 0x0057 ) - { - // Neither IPv4 nor IPv6. - sessions->Weird("non_ip_packet_in_pppoe_encapsulation", &hdr, data); - data = 0; - return; - } - break; - } - - break; - } - - case DLT_PPP_SERIAL: - { - // Get PPP protocol. - protocol = (data[2] << 8) + data[3]; - - if ( protocol == 0x0281 ) - // MPLS Unicast - have_mpls = true; - - else if ( protocol != 0x0021 && protocol != 0x0057 ) - { - // Neither IPv4 nor IPv6. - sessions->Weird("non_ip_packet_in_ppp_encapsulation", &hdr, data); - data = 0; - return; - } - break; - } - } - - if ( have_mpls ) - { - // Remove the data link layer - data += get_link_header_size(datalink); - - // Denote a header size of zero before the IP header - pkt_hdr_size = 0; - - // Skip the MPLS label stack. - bool end_of_stack = false; - - while ( ! end_of_stack ) - { - end_of_stack = *(data + 2) & 0x01; - data += 4; - } - } - - if ( pseudo_realtime ) - { - current_pseudo = CheckPseudoTime(); - net_packet_arrival(current_pseudo, &hdr, data, pkt_hdr_size, this); - if ( ! first_wallclock ) - first_wallclock = current_time(true); - } - - else - net_packet_arrival(current_timestamp, &hdr, data, pkt_hdr_size, this); - - data = 0; - } - -bool PktSrc::GetCurrentPacket(const struct pcap_pkthdr** arg_hdr, - const u_char** arg_pkt) - { - if ( ! last_data ) - return false; - - *arg_hdr = &hdr; - *arg_pkt = last_data; - return true; - } - -int PktSrc::PrecompileFilter(int index, const char* filter) - { - // Compile filter. - BPF_Program* code = new BPF_Program(); - - if ( ! code->Compile(pd, filter, netmask, errbuf, sizeof(errbuf)) ) - { - delete code; - return 0; - } - - // Store it in hash. - HashKey* hash = new HashKey(HashKey(bro_int_t(index))); - BPF_Program* oldcode = filters.Lookup(hash); - if ( oldcode ) - delete oldcode; - - filters.Insert(hash, code); - delete hash; - - return 1; - } - -int PktSrc::SetFilter(int index) - { - // We don't want load-level filters for the secondary path. - if ( filter_type == TYPE_FILTER_SECONDARY && index > 0 ) - return 1; - - HashKey* hash = new HashKey(HashKey(bro_int_t(index))); - BPF_Program* code = filters.Lookup(hash); - delete hash; - - if ( ! code ) - { - safe_snprintf(errbuf, sizeof(errbuf), - "No precompiled pcap filter for index %d", - index); - return 0; - } - - if ( pcap_setfilter(pd, code->GetProgram()) < 0 ) - { - safe_snprintf(errbuf, sizeof(errbuf), - "pcap_setfilter(%d): %s", - index, pcap_geterr(pd)); - return 0; - } - -#ifndef HAVE_LINUX - // Linux doesn't clear counters when resetting filter. - stats.received = stats.dropped = stats.link = 0; -#endif - - return 1; - } - -void PktSrc::SetHdrSize() - { - int dl = pcap_datalink(pd); - hdr_size = get_link_header_size(dl); - - if ( hdr_size < 0 ) - { - safe_snprintf(errbuf, sizeof(errbuf), - "unknown data link type 0x%x", dl); - Close(); - } - - datalink = dl; - } - -void PktSrc::Close() - { - if ( pd ) - { - pcap_close(pd); - pd = 0; - closed = true; - } - } - -void PktSrc::AddSecondaryTablePrograms() - { - BPF_Program* program; - - loop_over_list(secondary_path->EventTable(), i) - { - SecondaryEvent* se = secondary_path->EventTable()[i]; - program = new BPF_Program(); - - if ( ! program->Compile(snaplen, datalink, se->Filter(), - netmask, errbuf, sizeof(errbuf)) ) - { - delete program; - Close(); - return; - } - - SecondaryProgram* sp = new SecondaryProgram(program, se); - program_list.append(sp); - } - } - -void PktSrc::Statistics(Stats* s) - { - if ( reading_traces ) - s->received = s->dropped = s->link = 0; - - else - { - struct pcap_stat pstat; - if ( pcap_stats(pd, &pstat) < 0 ) - { - reporter->Error("problem getting packet filter statistics: %s", - ErrorMsg()); - s->received = s->dropped = s->link = 0; - } - - else - { - s->dropped = pstat.ps_drop; - s->link = pstat.ps_recv; - } - } - - s->received = stats.received; - - if ( pseudo_realtime ) - s->dropped = 0; - - stats.dropped = s->dropped; - } - -PktInterfaceSrc::PktInterfaceSrc(const char* arg_interface, const char* filter, - PktSrc_Filter_Type ft) -: PktSrc() - { - char tmp_errbuf[PCAP_ERRBUF_SIZE]; - filter_type = ft; - - // Determine interface if not specified. - if ( ! arg_interface && ! (arg_interface = pcap_lookupdev(tmp_errbuf)) ) - { - safe_snprintf(errbuf, sizeof(errbuf), - "pcap_lookupdev: %s", tmp_errbuf); - return; - } - - interface = copy_string(arg_interface); - - // Determine network and netmask. - uint32 net; - if ( pcap_lookupnet(interface, &net, &netmask, tmp_errbuf) < 0 ) - { - // ### The lookup can fail if no address is assigned to - // the interface; and libpcap doesn't have any useful notion - // of error codes, just error strings - how bogus - so we - // just kludge around the error :-(. - // sprintf(errbuf, "pcap_lookupnet %s", tmp_errbuf); - // return; - net = 0; - netmask = 0xffffff00; - } - - // We use the smallest time-out possible to return almost immediately if - // no packets are available. (We can't use set_nonblocking() as it's - // broken on FreeBSD: even when select() indicates that we can read - // something, we may get nothing if the store buffer hasn't filled up - // yet.) - pd = pcap_open_live(interface, snaplen, 1, 1, tmp_errbuf); - - if ( ! pd ) - { - safe_snprintf(errbuf, sizeof(errbuf), - "pcap_open_live: %s", tmp_errbuf); - closed = true; - return; - } - - // ### This needs autoconf'ing. -#ifdef HAVE_PCAP_INT_H - reporter->Info("pcap bufsize = %d\n", ((struct pcap *) pd)->bufsize); -#endif - -#ifdef HAVE_LINUX - if ( pcap_setnonblock(pd, 1, tmp_errbuf) < 0 ) - { - safe_snprintf(errbuf, sizeof(errbuf), - "pcap_setnonblock: %s", tmp_errbuf); - pcap_close(pd); - closed = true; - return; - } -#endif - selectable_fd = pcap_fileno(pd); - - if ( PrecompileFilter(0, filter) && SetFilter(0) ) - { - SetHdrSize(); - - if ( closed ) - // Couldn't get header size. - return; - - reporter->Info("listening on %s, capture length %d bytes\n", interface, snaplen); - } - else - closed = true; - } - - -PktFileSrc::PktFileSrc(const char* arg_readfile, const char* filter, - PktSrc_Filter_Type ft) -: PktSrc() - { - readfile = copy_string(arg_readfile); - - filter_type = ft; - - pd = pcap_open_offline((char*) readfile, errbuf); - - if ( pd && PrecompileFilter(0, filter) && SetFilter(0) ) - { - SetHdrSize(); - - if ( closed ) - // Unknown link layer type. - return; - - // We don't put file sources into non-blocking mode as - // otherwise we would not be able to identify the EOF. - - selectable_fd = fileno(pcap_file(pd)); - - if ( selectable_fd < 0 ) - reporter->InternalError("OS does not support selectable pcap fd"); - } - else - closed = true; - } - - -SecondaryPath::SecondaryPath() - { - filter = 0; - - // Glue together the secondary filter, if exists. - Val* secondary_fv = internal_val("secondary_filters"); - if ( secondary_fv->AsTableVal()->Size() == 0 ) - return; - - int did_first = 0; - const TableEntryValPDict* v = secondary_fv->AsTable(); - IterCookie* c = v->InitForIteration(); - TableEntryVal* tv; - HashKey* h; - - while ( (tv = v->NextEntry(h, c)) ) - { - // Get the index values. - ListVal* index = - secondary_fv->AsTableVal()->RecoverIndex(h); - - const char* str = - index->Index(0)->Ref()->AsString()->CheckString(); - - if ( ++did_first == 1 ) - { - filter = copy_string(str); - } - else - { - if ( strlen(filter) > 0 ) - { - char* tmp_f = new char[strlen(str) + strlen(filter) + 32]; - if ( strlen(str) == 0 ) - sprintf(tmp_f, "%s", filter); - else - sprintf(tmp_f, "(%s) or (%s)", filter, str); - delete [] filter; - filter = tmp_f; - } - } - - // Build secondary_path event table item and link it. - SecondaryEvent* se = - new SecondaryEvent(index->Index(0)->Ref()->AsString()->CheckString(), - tv->Value()->AsFunc() ); - - event_list.append(se); - - delete h; - Unref(index); - } - } - -SecondaryPath::~SecondaryPath() - { - loop_over_list(event_list, i) - delete event_list[i]; - - delete [] filter; - } - - -SecondaryProgram::~SecondaryProgram() - { - delete program; - } - -PktDumper::PktDumper(const char* arg_filename, bool arg_append) - { - filename[0] = '\0'; - is_error = false; - append = arg_append; - dumper = 0; - open_time = 0.0; - - // We need a pcap_t with a reasonable link-layer type. We try to get it - // from the packet sources. If not available, we fall back to Ethernet. - // FIXME: Perhaps we should make this configurable? - int linktype = -1; - - if ( pkt_srcs.length() ) - linktype = pkt_srcs[0]->LinkType(); - - if ( linktype < 0 ) - linktype = DLT_EN10MB; - - pd = pcap_open_dead(linktype, 8192); - if ( ! pd ) - { - Error("error for pcap_open_dead"); - return; - } - - if ( arg_filename ) - Open(arg_filename); - } - -bool PktDumper::Open(const char* arg_filename) - { - if ( ! arg_filename && ! *filename ) - { - Error("no filename given"); - return false; - } - - if ( arg_filename ) - { - if ( dumper && streq(arg_filename, filename) ) - // Already open. - return true; - - safe_strncpy(filename, arg_filename, FNBUF_LEN); - } - - if ( dumper ) - Close(); - - struct stat s; - int exists = 0; - - if ( append ) - { - // See if output file already exists (and is non-empty). - exists = stat(filename, &s); ; - - if ( exists < 0 && errno != ENOENT ) - { - Error(fmt("can't stat file %s: %s", filename, strerror(errno))); - return false; - } - } - - if ( ! append || exists < 0 || s.st_size == 0 ) - { - // Open new file. - dumper = pcap_dump_open(pd, filename); - if ( ! dumper ) - { - Error(pcap_geterr(pd)); - return false; - } - } - - else - { - // Old file and we need to append, which, unfortunately, - // is not supported by libpcap. So, we have to hack a - // little bit, knowing that pcap_dumpter_t is, in fact, - // a FILE ... :-( - dumper = (pcap_dumper_t*) fopen(filename, "a"); - if ( ! dumper ) - { - Error(fmt("can't open dump %s: %s", filename, strerror(errno))); - return false; - } - } - - open_time = network_time; - is_error = false; - return true; - } - -bool PktDumper::Close() - { - if ( dumper ) - { - pcap_dump_close(dumper); - dumper = 0; - is_error = false; - } - - return true; - } - -bool PktDumper::Dump(const struct pcap_pkthdr* hdr, const u_char* pkt) - { - if ( ! dumper ) - return false; - - if ( ! open_time ) - open_time = network_time; - - pcap_dump((u_char*) dumper, hdr, pkt); - - return true; - } - -void PktDumper::Error(const char* errstr) - { - safe_strncpy(errbuf, errstr, sizeof(errbuf)); - is_error = true; - } - -int get_link_header_size(int dl) - { - switch ( dl ) { - case DLT_NULL: - return 4; - - case DLT_EN10MB: - return 14; - - case DLT_FDDI: - return 13 + 8; // fddi_header + LLC - -#ifdef DLT_LINUX_SLL - case DLT_LINUX_SLL: - return 16; -#endif - - case DLT_PPP_SERIAL: // PPP_SERIAL - return 4; - - case DLT_RAW: - return 0; - } - - return -1; - } diff --git a/src/PktSrc.h b/src/PktSrc.h deleted file mode 100644 index 70eef4dd00..0000000000 --- a/src/PktSrc.h +++ /dev/null @@ -1,258 +0,0 @@ -// See the file "COPYING" in the main distribution directory for copyright. - -#ifndef pktsrc_h -#define pktsrc_h - -#include "Dict.h" -#include "Expr.h" -#include "BPF_Program.h" -#include "IOSource.h" -#include "RemoteSerializer.h" - -#define BRO_PCAP_ERRBUF_SIZE PCAP_ERRBUF_SIZE + 256 - -extern "C" { -#include -} - -declare(PDict,BPF_Program); - -// Whether a PktSrc object is used by the normal filter structure or the -// secondary-path structure. -typedef enum { - TYPE_FILTER_NORMAL, // the normal filter - TYPE_FILTER_SECONDARY, // the secondary-path filter -} PktSrc_Filter_Type; - - -// {filter,event} tuples conforming the secondary path. -class SecondaryEvent { -public: - SecondaryEvent(const char* arg_filter, Func* arg_event) - { - filter = arg_filter; - event = arg_event; - } - - const char* Filter() { return filter; } - Func* Event() { return event; } - -private: - const char* filter; - Func* event; -}; - -declare(PList,SecondaryEvent); -typedef PList(SecondaryEvent) secondary_event_list; - - - -class SecondaryPath { -public: - SecondaryPath(); - ~SecondaryPath(); - - secondary_event_list& EventTable() { return event_list; } - const char* Filter() { return filter; } - -private: - secondary_event_list event_list; - // OR'ed union of all SecondaryEvent filters - char* filter; -}; - -// Main secondary-path object. -extern SecondaryPath* secondary_path; - - -// {program, {filter,event}} tuple table. -class SecondaryProgram { -public: - SecondaryProgram(BPF_Program* arg_program, SecondaryEvent* arg_event) - { - program = arg_program; - event = arg_event; - } - - ~SecondaryProgram(); - - BPF_Program* Program() { return program; } - SecondaryEvent* Event() { return event; } - -private: - // Associated program. - BPF_Program *program; - - // Event that is run in case the program is matched. - SecondaryEvent* event; -}; - -declare(PList,SecondaryProgram); -typedef PList(SecondaryProgram) secondary_program_list; - - - -class PktSrc : public IOSource { -public: - ~PktSrc(); - - // IOSource interface - bool IsReady(); - void GetFds(int* read, int* write, int* except); - double NextTimestamp(double* local_network_time); - void Process(); - const char* Tag() { return "PktSrc"; } - - const char* ErrorMsg() const { return errbuf; } - void ClearErrorMsg() { *errbuf ='\0'; } - - // Returns the packet last processed; false if there is no - // current packet available. - bool GetCurrentPacket(const pcap_pkthdr** hdr, const u_char** pkt); - - int HdrSize() const { return hdr_size; } - int DataLink() const { return datalink; } - - void ConsumePacket() { data = 0; } - - int IsLive() const { return interface != 0; } - - pcap_t* PcapHandle() const { return pd; } - int LinkType() const { return pcap_datalink(pd); } - - const char* ReadFile() const { return readfile; } - const char* Interface() const { return interface; } - PktSrc_Filter_Type FilterType() const { return filter_type; } - void AddSecondaryTablePrograms(); - const secondary_program_list& ProgramTable() const - { return program_list; } - - // Signal packet source that processing was suspended and is now going - // to be continued. - void ContinueAfterSuspend(); - - // Only valid in pseudo-realtime mode. - double CurrentPacketTimestamp() { return current_pseudo; } - double CurrentPacketWallClock(); - - struct Stats { - unsigned int received; // pkts received (w/o drops) - unsigned int dropped; // pkts dropped - unsigned int link; // total packets on link - // (not always not available) - }; - - virtual void Statistics(Stats* stats); - - // Precompiles a filter and associates the given index with it. - // Returns true on success, 0 if a problem occurred. - virtual int PrecompileFilter(int index, const char* filter); - - // Activates the filter with the given index. - // Returns true on success, 0 if a problem occurred. - virtual int SetFilter(int index); - -protected: - PktSrc(); - - static const int PCAP_TIMEOUT = 20; - - void SetHdrSize(); - - virtual void Close(); - - // Returns 1 on success, 0 on time-out/gone dry. - virtual int ExtractNextPacket(); - - // Checks if the current packet has a pseudo-time <= current_time. - // If yes, returns pseudo-time, otherwise 0. - double CheckPseudoTime(); - - double current_timestamp; - double next_timestamp; - - // Only set in pseudo-realtime mode. - double first_timestamp; - double first_wallclock; - double current_wallclock; - double current_pseudo; - - struct pcap_pkthdr hdr; - const u_char* data; // contents of current packet - const u_char* last_data; // same, but unaffected by consuming - int hdr_size; - int datalink; - double next_sync_point; // For trace synchronziation in pseudo-realtime - - char* interface; // nil if not reading from an interface - char* readfile; // nil if not reading from a file - - pcap_t* pd; - int selectable_fd; - uint32 netmask; - char errbuf[BRO_PCAP_ERRBUF_SIZE]; - - Stats stats; - - PDict(BPF_Program) filters; // precompiled filters - - PktSrc_Filter_Type filter_type; // normal path or secondary path - secondary_program_list program_list; -}; - -class PktInterfaceSrc : public PktSrc { -public: - PktInterfaceSrc(const char* interface, const char* filter, - PktSrc_Filter_Type ft=TYPE_FILTER_NORMAL); -}; - -class PktFileSrc : public PktSrc { -public: - PktFileSrc(const char* readfile, const char* filter, - PktSrc_Filter_Type ft=TYPE_FILTER_NORMAL); -}; - - -extern int get_link_header_size(int dl); - -class PktDumper { -public: - PktDumper(const char* file = 0, bool append = false); - ~PktDumper() { Close(); } - - bool Open(const char* file = 0); - bool Close(); - bool Dump(const struct pcap_pkthdr* hdr, const u_char* pkt); - - pcap_dumper_t* PcapDumper() { return dumper; } - - const char* FileName() const { return filename; } - bool IsError() const { return is_error; } - const char* ErrorMsg() const { return errbuf; } - - // This heuristic will horribly fail if we're using packets - // with different link layers. (If we can't derive a reasonable value - // from the packet sources, our fall-back is Ethernet.) - int HdrSize() const - { return get_link_header_size(pcap_datalink(pd)); } - - // Network time when dump file was opened. - double OpenTime() const { return open_time; } - -private: - void InitPd(); - void Error(const char* str); - - static const int FNBUF_LEN = 1024; - char filename[FNBUF_LEN]; - - bool append; - pcap_dumper_t* dumper; - pcap_t* pd; - double open_time; - - bool is_error; - char errbuf[BRO_PCAP_ERRBUF_SIZE]; -}; - -#endif diff --git a/src/RemoteSerializer.cc b/src/RemoteSerializer.cc index c8cf03667b..b0db8fafe8 100644 --- a/src/RemoteSerializer.cc +++ b/src/RemoteSerializer.cc @@ -188,10 +188,11 @@ #include "File.h" #include "Conn.h" #include "Reporter.h" -#include "threading/SerialTypes.h" -#include "logging/Manager.h" #include "IPAddr.h" #include "bro_inet_ntop.h" +#include "threading/SerialTypes.h" +#include "logging/Manager.h" +#include "iosource/Manager.h" extern "C" { #include "setsignal.h" @@ -284,10 +285,10 @@ struct ping_args { \ if ( ! c ) \ { \ - idle = io->IsIdle();\ + SetIdle(io->IsIdle());\ return true; \ } \ - idle = false; \ + SetIdle(false); \ } static const char* msgToStr(int msg) @@ -536,7 +537,6 @@ RemoteSerializer::RemoteSerializer() current_sync_point = 0; syncing_times = false; io = 0; - closed = false; terminating = false; in_sync = 0; last_flush = 0; @@ -574,7 +574,7 @@ void RemoteSerializer::Init() Fork(); - io_sources.Register(this); + iosource_mgr->Register(this); Log(LogInfo, fmt("communication started, parent pid is %d, child pid is %d", getpid(), child_pid)); initialized = 1; @@ -1278,7 +1278,7 @@ bool RemoteSerializer::Listen(const IPAddr& ip, uint16 port, bool expect_ssl, return false; listening = true; - closed = false; + SetClosed(false); return true; } @@ -1347,7 +1347,7 @@ bool RemoteSerializer::StopListening() return false; listening = false; - closed = ! IsActive(); + SetClosed(! IsActive()); return true; } @@ -1385,7 +1385,7 @@ double RemoteSerializer::NextTimestamp(double* local_network_time) if ( received_logs > 0 ) { // If we processed logs last time, assume there's more. - idle = false; + SetIdle(false); received_logs = 0; return timer_mgr->Time(); } @@ -1400,7 +1400,7 @@ double RemoteSerializer::NextTimestamp(double* local_network_time) pt = timer_mgr->Time(); if ( packets.length() ) - idle = false; + SetIdle(false); if ( et >= 0 && (et < pt || pt < 0) ) return et; @@ -1479,7 +1479,7 @@ void RemoteSerializer::Process() } if ( packets.length() ) - idle = false; + SetIdle(false); } void RemoteSerializer::Finish() @@ -1511,7 +1511,7 @@ bool RemoteSerializer::Poll(bool may_block) } io->Flush(); - idle = false; + SetIdle(false); switch ( msgstate ) { case TYPE: @@ -1695,7 +1695,7 @@ bool RemoteSerializer::DoMessage() case MSG_TERMINATE: assert(terminating); - io_sources.Terminate(); + iosource_mgr->Terminate(); return true; case MSG_REMOTE_PRINT: @@ -1885,7 +1885,7 @@ void RemoteSerializer::RemovePeer(Peer* peer) delete peer->cache_out; delete peer; - closed = ! IsActive(); + SetClosed(! IsActive()); if ( in_sync == peer ) in_sync = 0; @@ -2850,7 +2850,7 @@ void RemoteSerializer::GotEvent(const char* name, double time, BufferedEvent* e = new BufferedEvent; // Our time, not the time when the event was generated. - e->time = pkt_srcs.length() ? + e->time = iosource_mgr->GetPktSrcs().size() ? time_t(network_time) : time_t(timer_mgr->Time()); e->src = current_peer->id; @@ -3094,7 +3094,7 @@ RecordVal* RemoteSerializer::GetPeerVal(PeerID id) void RemoteSerializer::ChildDied() { Log(LogError, "child died"); - closed = true; + SetClosed(true); child_pid = 0; // Shut down the main process as well. @@ -3188,7 +3188,7 @@ void RemoteSerializer::FatalError(const char* msg) Log(LogError, msg); reporter->Error("%s", msg); - closed = true; + SetClosed(true); if ( kill(child_pid, SIGQUIT) < 0 ) reporter->Warning("warning: cannot kill child pid %d, %s", child_pid, strerror(errno)); diff --git a/src/RemoteSerializer.h b/src/RemoteSerializer.h index 5ff7fff8d6..f8b306f002 100644 --- a/src/RemoteSerializer.h +++ b/src/RemoteSerializer.h @@ -6,7 +6,7 @@ #include "Dict.h" #include "List.h" #include "Serializer.h" -#include "IOSource.h" +#include "iosource/IOSource.h" #include "Stats.h" #include "File.h" #include "logging/WriterBackend.h" @@ -22,7 +22,7 @@ namespace threading { } // This class handles the communication done in Bro's main loop. -class RemoteSerializer : public Serializer, public IOSource { +class RemoteSerializer : public Serializer, public iosource::IOSource { public: RemoteSerializer(); virtual ~RemoteSerializer(); diff --git a/src/Serializer.cc b/src/Serializer.cc index 156ad67f2e..74740497a1 100644 --- a/src/Serializer.cc +++ b/src/Serializer.cc @@ -19,6 +19,7 @@ #include "Conn.h" #include "Timer.h" #include "RemoteSerializer.h" +#include "iosource/Manager.h" Serializer::Serializer(SerializationFormat* arg_format) { @@ -1045,7 +1046,7 @@ EventPlayer::EventPlayer(const char* file) Error(fmt("event replayer: cannot open %s", file)); if ( ReadHeader() ) - io_sources.Register(this); + iosource_mgr->Register(this); } EventPlayer::~EventPlayer() @@ -1085,7 +1086,7 @@ double EventPlayer::NextTimestamp(double* local_network_time) { UnserialInfo info(this); Unserialize(&info); - closed = io->Eof(); + SetClosed(io->Eof()); } if ( ! ne_time ) @@ -1142,7 +1143,7 @@ bool Packet::Serialize(SerialInfo* info) const static BroFile* profiling_output = 0; #ifdef DEBUG -static PktDumper* dump = 0; +static iosource::PktDumper* dump = 0; #endif Packet* Packet::Unserialize(UnserialInfo* info) @@ -1188,7 +1189,7 @@ Packet* Packet::Unserialize(UnserialInfo* info) p->hdr = hdr; p->pkt = (u_char*) pkt; p->tag = tag; - p->hdr_size = get_link_header_size(p->link_type); + p->hdr_size = iosource::PktSrc::GetLinkHeaderSize(p->link_type); delete [] tag; @@ -1213,9 +1214,15 @@ Packet* Packet::Unserialize(UnserialInfo* info) if ( debug_logger.IsEnabled(DBG_TM) ) { if ( ! dump ) - dump = new PktDumper("tm.pcap"); + dump = iosource_mgr->OpenPktDumper("tm.pcap", true); - dump->Dump(p->hdr, p->pkt); + if ( dump ) + { + iosource::PktDumper::Packet dp; + dp.hdr = p->hdr; + dp.data = p->pkt; + dump->Record(&dp); + } } #endif diff --git a/src/Serializer.h b/src/Serializer.h index af4878ccf5..3be2da5134 100644 --- a/src/Serializer.h +++ b/src/Serializer.h @@ -15,7 +15,7 @@ #include "SerialInfo.h" #include "IP.h" #include "Timer.h" -#include "IOSource.h" +#include "iosource/IOSource.h" #include "Reporter.h" class SerializationCache; @@ -350,7 +350,7 @@ public: }; // Plays a file of events back. -class EventPlayer : public FileSerializer, public IOSource { +class EventPlayer : public FileSerializer, public iosource::IOSource { public: EventPlayer(const char* file); virtual ~EventPlayer(); diff --git a/src/Sessions.cc b/src/Sessions.cc index acc306d277..c84c677db4 100644 --- a/src/Sessions.cc +++ b/src/Sessions.cc @@ -168,7 +168,7 @@ void NetSessions::Done() void NetSessions::DispatchPacket(double t, const struct pcap_pkthdr* hdr, const u_char* pkt, int hdr_size, - PktSrc* src_ps, PacketSortElement* pkt_elem) + iosource::PktSrc* src_ps, PacketSortElement* pkt_elem) { const struct ip* ip_hdr = 0; const u_char* ip_data = 0; @@ -185,10 +185,14 @@ void NetSessions::DispatchPacket(double t, const struct pcap_pkthdr* hdr, // Blanket encapsulation hdr_size += encap_hdr_size; +#if 0 if ( src_ps->FilterType() == TYPE_FILTER_NORMAL ) NextPacket(t, hdr, pkt, hdr_size, pkt_elem); else NextPacketSecondary(t, hdr, pkt, hdr_size, src_ps); +#else + NextPacket(t, hdr, pkt, hdr_size, pkt_elem); +#endif } void NetSessions::NextPacket(double t, const struct pcap_pkthdr* hdr, @@ -278,7 +282,7 @@ void NetSessions::NextPacket(double t, const struct pcap_pkthdr* hdr, void NetSessions::NextPacketSecondary(double /* t */, const struct pcap_pkthdr* hdr, const u_char* const pkt, int hdr_size, - const PktSrc* src_ps) + const iosource::PktSrc* src_ps) { SegmentProfiler(segment_logger, "processing-secondary-packet"); @@ -291,6 +295,7 @@ void NetSessions::NextPacketSecondary(double /* t */, const struct pcap_pkthdr* return; } +#if 0 const struct ip* ip = (const struct ip*) (pkt + hdr_size); if ( ip->ip_v == 4 ) { @@ -321,6 +326,7 @@ void NetSessions::NextPacketSecondary(double /* t */, const struct pcap_pkthdr* delete args; } } +#endif } int NetSessions::CheckConnectionTag(Connection* conn) @@ -1341,14 +1347,24 @@ void NetSessions::DumpPacket(const struct pcap_pkthdr* hdr, return; if ( len == 0 ) - pkt_dumper->Dump(hdr, pkt); + { + iosource::PktDumper::Packet p; + p.hdr = hdr; + p.data = pkt; + pkt_dumper->Record(&p); + } + else { struct pcap_pkthdr h = *hdr; h.caplen = len; if ( h.caplen > hdr->caplen ) reporter->InternalError("bad modified caplen"); - pkt_dumper->Dump(&h, pkt); + + iosource::PktDumper::Packet p; + p.hdr = &h; + p.data = pkt; + pkt_dumper->Record(&p); } } diff --git a/src/Sessions.h b/src/Sessions.h index 1788541f45..4f12bd1240 100644 --- a/src/Sessions.h +++ b/src/Sessions.h @@ -74,7 +74,7 @@ public: // employing the packet sorter first. void DispatchPacket(double t, const struct pcap_pkthdr* hdr, const u_char* const pkt, int hdr_size, - PktSrc* src_ps, PacketSortElement* pkt_elem); + iosource::PktSrc* src_ps, PacketSortElement* pkt_elem); void Done(); // call to drain events before destructing @@ -225,7 +225,7 @@ protected: void NextPacketSecondary(double t, const struct pcap_pkthdr* hdr, const u_char* const pkt, int hdr_size, - const PktSrc* src_ps); + const iosource::PktSrc* src_ps); // Record the given packet (if a dumper is active). If len=0 // then the whole packet is recorded, otherwise just the first diff --git a/src/bro.bif b/src/bro.bif index 24dff3c77c..2b94307143 100644 --- a/src/bro.bif +++ b/src/bro.bif @@ -21,6 +21,7 @@ #include "IPAddr.h" #include "util.h" #include "file_analysis/Manager.h" +#include "iosource/Manager.h" using namespace std; @@ -33,7 +34,7 @@ TableType* var_sizes; // and hence it's declared in NetVar.{h,cc}. extern RecordType* gap_info; -static PktDumper* addl_pkt_dumper = 0; +static iosource::PktDumper* addl_pkt_dumper = 0; bro_int_t parse_int(const char*& fmt) { @@ -1657,11 +1658,14 @@ function net_stats%(%): NetStats unsigned int drop = 0; unsigned int link = 0; - loop_over_list(pkt_srcs, i) - { - PktSrc* ps = pkt_srcs[i]; + const iosource::Manager::PktSrcList& pkt_srcs(iosource_mgr->GetPktSrcs()); - struct PktSrc::Stats stat; + for ( iosource::Manager::PktSrcList::const_iterator i = pkt_srcs.begin(); + i != pkt_srcs.end(); i++ ) + { + iosource::PktSrc* ps = *i; + + struct iosource::PktSrc::Stats stat; ps->Statistics(&stat); recv += stat.received; drop += stat.dropped; @@ -3206,10 +3210,15 @@ function dump_current_packet%(file_name: string%) : bool return new Val(0, TYPE_BOOL); if ( ! addl_pkt_dumper ) - addl_pkt_dumper = new PktDumper(0, true); + addl_pkt_dumper = iosource_mgr->OpenPktDumper(file_name->CheckString(), true); - addl_pkt_dumper->Open(file_name->CheckString()); - addl_pkt_dumper->Dump(hdr, pkt); + if ( addl_pkt_dumper ) + { + iosource::PktDumper::Packet p; + p.hdr = hdr; + p.data = pkt; + addl_pkt_dumper->Record(&p); + } return new Val(! addl_pkt_dumper->IsError(), TYPE_BOOL); %} @@ -3266,10 +3275,15 @@ function dump_packet%(pkt: pcap_packet, file_name: string%) : bool hdr.len = (*pkt_vl)[3]->AsCount(); if ( ! addl_pkt_dumper ) - addl_pkt_dumper = new PktDumper(0, true); + addl_pkt_dumper = iosource_mgr->OpenPktDumper(file_name->CheckString(), true); - addl_pkt_dumper->Open(file_name->CheckString()); - addl_pkt_dumper->Dump(&hdr, (*pkt_vl)[4]->AsString()->Bytes()); + if ( addl_pkt_dumper ) + { + iosource::PktDumper::Packet p; + p.hdr = &hdr; + p.data = (*pkt_vl)[4]->AsString()->Bytes(); + addl_pkt_dumper->Record(&p); + } return new Val(addl_pkt_dumper->IsError(), TYPE_BOOL); %} @@ -4030,14 +4044,14 @@ function rotate_file_by_name%(f: string%): rotate_info bool is_addl_pkt_dumper = false; // Special case: one of current dump files. - if ( pkt_dumper && streq(pkt_dumper->FileName(), f->CheckString()) ) + if ( pkt_dumper && streq(pkt_dumper->Path().c_str(), f->CheckString()) ) { is_pkt_dumper = true; pkt_dumper->Close(); } if ( addl_pkt_dumper && - streq(addl_pkt_dumper->FileName(), f->CheckString()) ) + streq(addl_pkt_dumper->Path().c_str(), f->CheckString()) ) { is_addl_pkt_dumper = true; addl_pkt_dumper->Close(); @@ -4156,15 +4170,18 @@ function precompile_pcap_filter%(id: PcapFilterID, s: string%): bool %{ bool success = true; - loop_over_list(pkt_srcs, i) - { - pkt_srcs[i]->ClearErrorMsg(); + const iosource::Manager::PktSrcList& pkt_srcs(iosource_mgr->GetPktSrcs()); - if ( ! pkt_srcs[i]->PrecompileFilter(id->ForceAsInt(), + for ( iosource::Manager::PktSrcList::const_iterator i = pkt_srcs.begin(); + i != pkt_srcs.end(); i++ ) + { + iosource::PktSrc* ps = *i; + + if ( ! ps->PrecompileFilter(id->ForceAsInt(), s->CheckString()) ) { reporter->Error("precompile_pcap_filter: %s", - pkt_srcs[i]->ErrorMsg()); + ps->ErrorMsg()); success = false; } } @@ -4194,11 +4211,14 @@ function install_pcap_filter%(id: PcapFilterID%): bool %{ bool success = true; - loop_over_list(pkt_srcs, i) - { - pkt_srcs[i]->ClearErrorMsg(); + const iosource::Manager::PktSrcList& pkt_srcs(iosource_mgr->GetPktSrcs()); - if ( ! pkt_srcs[i]->SetFilter(id->ForceAsInt()) ) + for ( iosource::Manager::PktSrcList::const_iterator i = pkt_srcs.begin(); + i != pkt_srcs.end(); i++ ) + { + iosource::PktSrc* ps = *i; + + if ( ! ps->SetFilter(id->ForceAsInt()) ) success = false; } @@ -4221,9 +4241,14 @@ function install_pcap_filter%(id: PcapFilterID%): bool ## uninstall_dst_net_filter function pcap_error%(%): string %{ - loop_over_list(pkt_srcs, i) + const iosource::Manager::PktSrcList& pkt_srcs(iosource_mgr->GetPktSrcs()); + + for ( iosource::Manager::PktSrcList::const_iterator i = pkt_srcs.begin(); + i != pkt_srcs.end(); i++ ) { - const char* err = pkt_srcs[i]->ErrorMsg(); + iosource::PktSrc* ps = *i; + + const char* err = ps->ErrorMsg(); if ( *err ) return new StringVal(err); } diff --git a/src/iosource/CMakeLists.txt b/src/iosource/CMakeLists.txt new file mode 100644 index 0000000000..a9246e8de9 --- /dev/null +++ b/src/iosource/CMakeLists.txt @@ -0,0 +1,23 @@ + +include(BroSubdir) + +include_directories(BEFORE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_BINARY_DIR} +) + +add_subdirectory(pktsrc) + + +set(iosource_SRCS + Component.cc + Manager.cc + + pktsrc/Component.cc + pktsrc/PktDumper.cc + pktsrc/PktSrc.cc +) + +bro_add_subdir_library(iosource ${iosource_SRCS}) +add_dependencies(bro_iosource generate_outputs) + diff --git a/src/iosource/Component.cc b/src/iosource/Component.cc new file mode 100644 index 0000000000..5f916c0a11 --- /dev/null +++ b/src/iosource/Component.cc @@ -0,0 +1,44 @@ + +#include "Component.h" + +#include "Desc.h" + +using namespace iosource; + +Component::Component(const std::string& arg_name) + : plugin::Component(plugin::component::IOSOURCE) + { + name = arg_name; + } + +Component::Component(plugin::component::Type type, const std::string& arg_name) + : plugin::Component(type) + { + name = arg_name; + } + +Component::Component(const Component& other) + : plugin::Component(other) + { + name = other.name; + } + +Component::~Component() + { + } + +void Component::Describe(ODesc* d) const + { + plugin::Component::Describe(d); + d->Add(name); + } + +Component& Component::operator=(const Component& other) + { + plugin::Component::operator=(other); + + if ( &other != this ) + name = other.name; + + return *this; + } diff --git a/src/iosource/Component.h b/src/iosource/Component.h new file mode 100644 index 0000000000..b56eeb038c --- /dev/null +++ b/src/iosource/Component.h @@ -0,0 +1,56 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#ifndef IOSOURCE_PLUGIN_COMPONENT_H +#define IOSOURCE_PLUGIN_COMPONENT_H + +#include "plugin/Component.h" + +namespace iosource { + +class IOSource; + +/** + * Component description for plugins providing IOSources. + */ +class Component : public plugin::Component { +public: + typedef IOSource* (*factory_callback)(); + + /** + * XXX + */ + Component(const std::string& name); + + /** + * Copy constructor. + */ + Component(const Component& other); + + /** + * Destructor. + */ + ~Component(); + + /** + * XXX + */ + virtual const char* Name() const { return name.c_str(); } + + /** + * Generates a human-readable description of the component. This goes + * into the output of \c "bro -NN". + */ + virtual void Describe(ODesc* d) const; + + Component& operator=(const Component& other); + +protected: + Component(plugin::component::Type type, const std::string& name); + +private: + std::string name; +}; + +} + +#endif diff --git a/src/IOSource.h b/src/iosource/IOSource.h similarity index 51% rename from src/IOSource.h rename to src/iosource/IOSource.h index db50bbd2a9..3419152a9a 100644 --- a/src/IOSource.h +++ b/src/iosource/IOSource.h @@ -1,13 +1,17 @@ -// Interface for classes providing/consuming data during Bro's main loop. +// See the file "COPYING" in the main distribution directory for copyright. -#ifndef iosource_h -#define iosource_h +#ifndef IOSOURCE_IOSOURCE_H +#define IOSOURCE_IOSOURCE_H + +#include -#include #include "Timer.h" -using namespace std; +namespace iosource { +/** + * Interface class for components providing/consuming data inside Bro's main loop. + */ class IOSource { public: IOSource() { idle = closed = false; } @@ -20,6 +24,12 @@ public: // Otherwise, source may be removed. bool IsOpen() const { return ! closed; } + // XXX + virtual void Init() { } + + // XXX + virtual void Done() { } + // Returns select'able fds (leaves args untouched if we don't have // selectable fds). virtual void GetFds(int* read, int* write, int* except) = 0; @@ -46,58 +56,18 @@ public: protected: // Derived classed are to set this to true if they have gone dry // temporarily. - bool idle; + void SetIdle(bool is_idle) { idle = is_idle; } + // Derived classed are to set this to true if they have gone dry - // permanently. + // temporarily. + void SetClosed(bool is_closed) { closed = is_closed; } + +private: + bool idle; bool closed; }; -class IOSourceRegistry { -public: - IOSourceRegistry() { call_count = 0; dont_counts = 0; } - ~IOSourceRegistry(); - - // If dont_count is true, this source does not contribute to the - // number of IOSources returned by Size(). The effect is that - // if all sources but the non-counting ones have gone dry, - // processing will shut down. - void Register(IOSource* src, bool dont_count = false); - - // This may block for some time. - IOSource* FindSoonest(double* ts); - - int Size() const { return sources.size() - dont_counts; } - - // Terminate IOSource processing immediately by removing all - // sources (and therefore returning a Size() of zero). - void Terminate() { RemoveAll(); } - -protected: - // When looking for a source with something to process, - // every SELECT_FREQUENCY calls we will go ahead and - // block on a select(). - static const int SELECT_FREQUENCY = 25; - - // Microseconds to wait in an empty select if no source is ready. - static const int SELECT_TIMEOUT = 50; - - void RemoveAll(); - - unsigned int call_count; - int dont_counts; - - struct Source { - IOSource* src; - int fd_read; - int fd_write; - int fd_except; - }; - - typedef list SourceList; - SourceList sources; -}; - -extern IOSourceRegistry io_sources; +} #endif diff --git a/src/IOSource.cc b/src/iosource/Manager.cc similarity index 52% rename from src/IOSource.cc rename to src/iosource/Manager.cc index d47007caad..9c14330868 100644 --- a/src/IOSource.cc +++ b/src/iosource/Manager.cc @@ -1,3 +1,4 @@ + #include #include #include @@ -5,26 +6,37 @@ #include -#include "util.h" +#include "Manager.h" #include "IOSource.h" +#include "pktsrc/PktSrc.h" +#include "pktsrc/PktDumper.h" +#include "pktsrc/Component.h" +#include "plugin/Manager.h" -IOSourceRegistry io_sources; +#include "util.h" -IOSourceRegistry::~IOSourceRegistry() +#define DEFAULT_PREFIX "pcap" + +using namespace iosource; + +Manager::~Manager() { for ( SourceList::iterator i = sources.begin(); i != sources.end(); ++i ) + { + (*i)->src->Done(); delete *i; + } sources.clear(); } -void IOSourceRegistry::RemoveAll() +void Manager::RemoveAll() { // We're cheating a bit here ... dont_counts = sources.size(); } -IOSource* IOSourceRegistry::FindSoonest(double* ts) +IOSource* Manager::FindSoonest(double* ts) { // Remove sources which have gone dry. For simplicity, we only // remove at most one each time. @@ -101,9 +113,9 @@ IOSource* IOSourceRegistry::FindSoonest(double* ts) FD_SET(src->fd_write, &fd_write); FD_SET(src->fd_except, &fd_except); - maxx = max(src->fd_read, maxx); - maxx = max(src->fd_write, maxx); - maxx = max(src->fd_except, maxx); + maxx = std::max(src->fd_read, maxx); + maxx = std::max(src->fd_write, maxx); + maxx = std::max(src->fd_except, maxx); } // We can't block indefinitely even when all sources are dry: @@ -166,11 +178,130 @@ finished: return soonest_src; } -void IOSourceRegistry::Register(IOSource* src, bool dont_count) +void Manager::Register(IOSource* src, bool dont_count) { + src->Init(); Source* s = new Source; s->src = src; if ( dont_count ) ++dont_counts; - return sources.push_back(s); + + sources.push_back(s); + } + +void Manager::Register(PktSrc* src) + { + pkt_srcs.push_back(src); + Register(src, false); + } + +static std::pair split_prefix(std::string path) + { + // See if the path comes with a prefix telling us which type of + // PktSrc to use. If not, choose default. + std::string prefix; + + std::string::size_type i = path.find(":"); + if ( i != std::string::npos ) + { + prefix = path.substr(0, i); + path = path.substr(++i, std::string::npos); + } + + else + prefix= DEFAULT_PREFIX; + + return std::make_pair(prefix, path); + } + +PktSrc* Manager::OpenPktSrc(const std::string& path, const std::string& filter, bool is_live) + { + std::pair t = split_prefix(path); + std::string prefix = t.first; + std::string npath = t.second; + + // Find the component providing packet sources of the requested prefix. + + pktsrc::SourceComponent* component = 0; + + std::list all_components = plugin_mgr->Components(); + + for ( std::list::const_iterator i = all_components.begin(); + i != all_components.end(); i++ ) + { + pktsrc::SourceComponent* c = *i; + + if ( c->Prefix() == prefix && + (( is_live && c->DoesLive() ) || + (! is_live && c->DoesTrace())) ) + { + component = c; + break; + } + } + + + if ( ! component ) + reporter->FatalError("type of packet source '%s' not recognized", prefix.c_str()); + + // Instantiate packet source. + + PktSrc* ps = (*component->Factory())(path, filter, is_live); + + if ( ! (ps && ps->IsOpen()) ) + { + string type = (is_live ? "interface" : "trace file"); + string pserr = ps->ErrorMsg() ? (string(" - ") + ps->ErrorMsg()) : ""; + + reporter->FatalError("%s: problem with %s %s%s\n", + prog, path.c_str(), type.c_str(), pserr.c_str()); + } + + DBG_LOG(DBG_PKTIO, "Created packet source of type %s for %s\n", component->Name(), path.c_str()); + + Register(ps); + return ps; + } + + +PktDumper* Manager::OpenPktDumper(const string& path, bool append) + { + std::pair t = split_prefix(path); + std::string prefix = t.first; + std::string npath = t.second; + + // Find the component providing packet dumpers of the requested prefix. + + pktsrc::DumperComponent* component = 0; + + std::list all_components = plugin_mgr->Components(); + + for ( std::list::const_iterator i = all_components.begin(); + i != all_components.end(); i++ ) + { + if ( (*i)->Prefix() == prefix ) + { + component = (*i); + break; + } + } + + if ( ! component ) + reporter->FatalError("type of packet dumper '%s' not recognized", prefix.c_str()); + + // Instantiate packet dumper. + + PktDumper* pd = (*component->Factory())(path, append); + + if ( ! (pd && pd->IsOpen()) ) + { + string pderr = pd->ErrorMsg().size() ? (string(" - ") + pd->ErrorMsg()) : ""; + + reporter->FatalError("%s: can't open write file \"%s\"%s\n", + prog, path.c_str(), pderr.c_str()); + } + + DBG_LOG(DBG_PKTIO, "Created packer dumper of type %s for %s\n", component->Name(), path.c_str()); + + return pd; } diff --git a/src/iosource/Manager.h b/src/iosource/Manager.h new file mode 100644 index 0000000000..5a3a58d798 --- /dev/null +++ b/src/iosource/Manager.h @@ -0,0 +1,75 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#ifndef IOSOURCE_MANAGER_H +#define IOSOURCE_MANAGER_H + +#include +#include + +namespace iosource { + +class IOSource; +class PktSrc; +class PktDumper; + +class Manager { +public: + Manager() { call_count = 0; dont_counts = 0; } + ~Manager(); + + // If dont_count is true, this source does not contribute to the + // number of IOSources returned by Size(). The effect is that + // if all sources but the non-counting ones have gone dry, + // processing will shut down. + void Register(IOSource* src, bool dont_count = false); + + // This may block for some time. + IOSource* FindSoonest(double* ts); + + int Size() const { return sources.size() - dont_counts; } + + typedef std::list PktSrcList; + const PktSrcList& GetPktSrcs() const { return pkt_srcs; } + + // Terminate IOSource processing immediately by removing all + // sources (and therefore returning a Size() of zero). + void Terminate() { RemoveAll(); } + + PktSrc* OpenPktSrc(const std::string& path, const std::string& filter, bool is_live); + PktDumper* OpenPktDumper(const std::string& path, bool append); + +protected: + void Register(PktSrc* src); + + // When looking for a source with something to process, + // every SELECT_FREQUENCY calls we will go ahead and + // block on a select(). + static const int SELECT_FREQUENCY = 25; + + // Microseconds to wait in an empty select if no source is ready. + static const int SELECT_TIMEOUT = 50; + + void RemoveAll(); + + unsigned int call_count; + int dont_counts; + + struct Source { + IOSource* src; + int fd_read; + int fd_write; + int fd_except; + }; + + typedef std::list SourceList; + SourceList sources; + + PktSrcList pkt_srcs; +}; + +} + +extern iosource::Manager* iosource_mgr; + +#endif + diff --git a/src/iosource/pktsrc/CMakeLists.txt b/src/iosource/pktsrc/CMakeLists.txt new file mode 100644 index 0000000000..07303b46a3 --- /dev/null +++ b/src/iosource/pktsrc/CMakeLists.txt @@ -0,0 +1,2 @@ + +add_subdirectory(pcap) diff --git a/src/iosource/pktsrc/Component.cc b/src/iosource/pktsrc/Component.cc new file mode 100644 index 0000000000..22c49feed0 --- /dev/null +++ b/src/iosource/pktsrc/Component.cc @@ -0,0 +1,130 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include "Component.h" + +#include "../Desc.h" + +using namespace iosource::pktsrc; + +SourceComponent::SourceComponent(const std::string& arg_name, const std::string& arg_prefix, InputType arg_type, factory_callback arg_factory) + : iosource::Component(plugin::component::PKTSRC, arg_name) + { + prefix = arg_prefix; + type = arg_type; + factory = arg_factory; + } + +SourceComponent::SourceComponent(const SourceComponent& other) + : iosource::Component(other) + { + prefix = other.prefix; + type = other.type; + factory = other.factory; + } + +SourceComponent::~SourceComponent() + { + } + +const std::string& SourceComponent::Prefix() const + { + return prefix; + } + +bool SourceComponent::DoesLive() const + { + return type == LIVE || type == BOTH; + } + +bool SourceComponent::DoesTrace() const + { + return type == TRACE || type == BOTH; + } + +SourceComponent::factory_callback SourceComponent::Factory() const + { + return factory; + } + + +void SourceComponent::Describe(ODesc* d) const + { + iosource::Component::Describe(d); + + d->Add(" (interface prefix: "); + d->Add(prefix); + d->Add(")"); + } + +SourceComponent& SourceComponent::operator=(const SourceComponent& other) + { + iosource::Component::operator=(other); + + if ( &other != this ) + { + prefix = other.prefix; + type = other.type; + factory = other.factory; + } + + return *this; + } + +DumperComponent::DumperComponent(const std::string& arg_name, const std::string& arg_prefix, factory_callback arg_factory) + : plugin::Component(plugin::component::PKTDUMPER) + { + name = arg_name; + factory = arg_factory; + prefix = arg_prefix; + } + +DumperComponent::DumperComponent(const DumperComponent& other) + : plugin::Component(other) + { + name = other.name; + factory = other.factory; + prefix = other.prefix; + } + +DumperComponent::~DumperComponent() + { + } + +DumperComponent::factory_callback DumperComponent::Factory() const + { + return factory; + } + +const char* DumperComponent::Name() const + { + return name.c_str(); + } + +const std::string& DumperComponent::Prefix() const + { + return prefix; + } + +void DumperComponent::Describe(ODesc* d) const + { + plugin::Component::Describe(d); + + d->Add(name); + d->Add(" (dumper prefix: "); + d->Add(prefix); + d->Add(")"); + } + +DumperComponent& DumperComponent::operator=(const DumperComponent& other) + { + plugin::Component::operator=(other); + + if ( &other != this ) + { + name = other.name; + factory = other.factory; + prefix = other.prefix; + } + + return *this; + } diff --git a/src/iosource/pktsrc/Component.h b/src/iosource/pktsrc/Component.h new file mode 100644 index 0000000000..2a62fb5503 --- /dev/null +++ b/src/iosource/pktsrc/Component.h @@ -0,0 +1,132 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#ifndef IOSOURCE_PKTSRC_PLUGIN_COMPONENT_H +#define IOSOURCE_PKTSRC_PLUGIN_COMPONENT_H + +#include "../Component.h" + +namespace iosource { + +class PktSrc; +class PktDumper; + +namespace pktsrc { + +/** + * Component description for plugins providing a PktSrc for packet input. + */ +class SourceComponent : public iosource::Component { +public: + enum InputType { LIVE, TRACE, BOTH }; + + typedef PktSrc* (*factory_callback)(const std::string& path, const std::string& filter, bool is_live); + + /** + * XXX + */ + SourceComponent(const std::string& name, const std::string& prefix, InputType type, factory_callback factory); + + /** + * Copy constructor. + */ + SourceComponent(const SourceComponent& other); + + /** + * Destructor. + */ + virtual ~SourceComponent(); + + /** + * Returns the prefix passes to the constructor. + */ + const std::string& Prefix() const; + + /** + * Returns true if packet source instantiated by the component handle + * live traffic. + */ + bool DoesLive() const; + + /** + * Returns true if packet source instantiated by the component handle + * offline traces. + */ + bool DoesTrace() const; + + /** + * Returns the source's factory function. + */ + factory_callback Factory() const; + + /** + * Generates a human-readable description of the component. This goes + * into the output of \c "bro -NN". + */ + virtual void Describe(ODesc* d) const; + + SourceComponent& operator=(const SourceComponent& other); + +private: + std::string prefix; + InputType type; + factory_callback factory; +}; + +/** + * Component description for plugins providing a PktDumper for packet output. + * + * PktDumpers aren't IOSurces but we locate them here to keep them along with + * the PktSrc. + */ +class DumperComponent : public plugin::Component { +public: + typedef PktDumper* (*factory_callback)(const std::string& path, bool append); + + /** + * XXX + */ + DumperComponent(const std::string& name, const std::string& prefix, factory_callback factory); + + /** + * Copy constructor. + */ + DumperComponent(const DumperComponent& other); + + /** + * Destructor. + */ + ~DumperComponent(); + + /** + * XXX + */ + virtual const char* Name() const; + + /** + * Returns the prefix passes to the constructor. + */ + const std::string& Prefix() const; + + /** + * Returns the source's factory function. + */ + factory_callback Factory() const; + + /** + * Generates a human-readable description of the component. This goes + * into the output of \c "bro -NN". + */ + virtual void Describe(ODesc* d) const; + + DumperComponent& operator=(const DumperComponent& other); + +private: + std::string name; + std::string prefix; + factory_callback factory; +}; + +} +} + +#endif diff --git a/src/iosource/pktsrc/PktDumper.cc b/src/iosource/pktsrc/PktDumper.cc new file mode 100644 index 0000000000..21ad79b87d --- /dev/null +++ b/src/iosource/pktsrc/PktDumper.cc @@ -0,0 +1,79 @@ + +// See the file "COPYING" in the main distribution directory for copyright. + +#include +#include + +#include "config.h" + +#include "PktDumper.h" + +using namespace iosource; + +PktDumper::PktDumper() + { + is_open = false; + errmsg = ""; + } + +PktDumper::~PktDumper() + { + } + +const std::string& PktDumper::Path() const + { + return props.path; + } + +bool PktDumper::IsOpen() const + { + return is_open; + } + +double PktDumper::OpenTime() const + { + return is_open ? props.open_time : 0; + } + +bool PktDumper::IsError() const + { + return errmsg.size(); + } + +const std::string& PktDumper::ErrorMsg() const + { + return errmsg; + } + +int PktDumper::HdrSize() const + { + return is_open ? props.hdr_size : -1; + } + +bool PktDumper::Record(const Packet* pkt) + { + return Dump(pkt); + } + +void PktDumper::Opened(const Properties& arg_props) + { + is_open = true; + props = arg_props; + DBG_LOG(DBG_PKTIO, "Opened dumper %s", props.path.c_str()); + } + +void PktDumper::Closed() + { + is_open = false; + props.path = ""; + DBG_LOG(DBG_PKTIO, "Closed dumper %s", props.path.c_str()); + } + +void PktDumper::Error(const std::string& msg) + { + errmsg = msg; + + DBG_LOG(DBG_PKTIO, "Error with dumper %s: %s", + IsOpen() ? props.path.c_str() : "", + msg.c_str()); + } diff --git a/src/iosource/pktsrc/PktDumper.h b/src/iosource/pktsrc/PktDumper.h new file mode 100644 index 0000000000..b8f3595e32 --- /dev/null +++ b/src/iosource/pktsrc/PktDumper.h @@ -0,0 +1,57 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#ifndef IOSOURCE_PKTSRC_PKTDUMPER_H +#define IOSOURCE_PKTSRC_PKTDUMPER_H + +#include "../IOSource.h" + +namespace iosource { + +class PktDumper { +public: + struct Packet { + const struct pcap_pkthdr* hdr; + const u_char* data; + }; + + PktDumper(); + virtual ~PktDumper(); + + const std::string& Path() const; + bool IsOpen() const; + double OpenTime() const; + bool IsError() const; + const std::string& ErrorMsg() const; + int HdrSize() const; + bool Record(const Packet* pkt); + + virtual void Close() = 0; + virtual void Open() = 0; + +protected: + // Methods to use by derived classed. + // + struct Properties { + std::string path; + int hdr_size; + double open_time; + }; + + void Opened(const Properties& props); + void Closed(); + void Error(const std::string& msg); + + // PktSrc interface for derived classes to implement. + + virtual bool Dump(const Packet* pkt) = 0; + +private: + bool is_open; + Properties props; + + std::string errmsg; +}; + +} + +#endif diff --git a/src/iosource/pktsrc/PktSrc.cc b/src/iosource/pktsrc/PktSrc.cc new file mode 100644 index 0000000000..703a2d634b --- /dev/null +++ b/src/iosource/pktsrc/PktSrc.cc @@ -0,0 +1,411 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include +#include + +#include "config.h" + +#include "util.h" +#include "PktSrc.h" +#include "Hash.h" +#include "Net.h" +#include "Sessions.h" + +using namespace iosource; + +PktSrc::PktSrc() + { + have_packet = false; + errbuf = ""; + + next_sync_point = 0; + first_timestamp = 0.0; + first_wallclock = current_wallclock = 0; + } + +PktSrc::~PktSrc() + { + } + +const std::string& PktSrc::Path() const + { + static std::string not_open("not open"); + return IsOpen() ? props.path : not_open; + } + +const char* PktSrc::ErrorMsg() const + { + return errbuf.c_str(); + } + +int PktSrc::LinkType() const + { + return IsOpen() ? props.link_type : -1; + } + +int PktSrc::HdrSize() const + { + return IsOpen() ? props.hdr_size : -1; + } + +int PktSrc::SnapLen() const + { + return snaplen; // That's a global. Change? + } + +bool PktSrc::IsLive() const + { + return props.is_live; + } + +double PktSrc::CurrentPacketTimestamp() + { + return current_pseudo; + } + +double PktSrc::CurrentPacketWallClock() + { + // We stop time when we are suspended. + if ( net_is_processing_suspended() ) + current_wallclock = current_time(true); + + return current_wallclock; + } + +void PktSrc::Opened(const Properties& arg_props) + { + props = arg_props; + SetClosed(false); + + DBG_LOG(DBG_PKTIO, "Opened source %s", props.path.c_str()); + } + +void PktSrc::Closed() + { + SetClosed(true); + + DBG_LOG(DBG_PKTIO, "Closed source %s", props.path.c_str()); + } + +void PktSrc::Error(const std::string& msg) + { + // We don't report this immediately, Bro will ask us for the error + // once it notices we aren't open. + errbuf = msg; + DBG_LOG(DBG_PKTIO, "Error with source %s: %s", + IsOpen() ? props.path.c_str() : "", + msg.c_str()); + } + +void PktSrc::Info(const std::string& msg) + { + reporter->Info("%s", msg.c_str()); + } + +void PktSrc::Weird(const std::string& msg, const Packet* p) + { + sessions->Weird(msg.c_str(), p->hdr, p->data, 0); + } + +void PktSrc::InternalError(const std::string& msg) + { + reporter->InternalError("%s", msg.c_str()); + } + +void PktSrc::ContinueAfterSuspend() + { + current_wallclock = current_time(true); + } + +int PktSrc::GetLinkHeaderSize(int link_type) + { + switch ( link_type ) { + case DLT_NULL: + return 4; + + case DLT_EN10MB: + return 14; + + case DLT_FDDI: + return 13 + 8; // fddi_header + LLC + +#ifdef DLT_LINUX_SLL + case DLT_LINUX_SLL: + return 16; +#endif + + case DLT_PPP_SERIAL: // PPP_SERIAL + return 4; + + case DLT_RAW: + return 0; + } + + return -1; + } + +double PktSrc::CheckPseudoTime() + { + if ( ! IsOpen() ) + return 0; + + if ( ! ExtractNextPacketInternal() ) + return 0; + + if ( remote_trace_sync_interval ) + { + if ( next_sync_point == 0 || current_packet.ts >= next_sync_point ) + { + int n = remote_serializer->SendSyncPoint(); + next_sync_point = first_timestamp + + n * remote_trace_sync_interval; + remote_serializer->Log(RemoteSerializer::LogInfo, + fmt("stopping at packet %.6f, next sync-point at %.6f", + current_packet.ts, next_sync_point)); + + return 0; + } + } + + double pseudo_time = current_packet.ts - first_timestamp; + double ct = (current_time(true) - first_wallclock) * pseudo_realtime; + + return pseudo_time <= ct ? bro_start_time + pseudo_time : 0; + } + +void PktSrc::Init() + { + Open(); + } + +void PktSrc::Done() + { + Close(); + } + +void PktSrc::GetFds(int* read, int* write, int* except) + { + if ( pseudo_realtime ) + { + // Select would give erroneous results. But we simulate it + // by setting idle accordingly. + SetIdle(CheckPseudoTime() == 0); + return; + } + + if ( IsOpen() && props.selectable_fd >= 0 ) + *read = props.selectable_fd; + } + +double PktSrc::NextTimestamp(double* local_network_time) + { + if ( ! IsOpen() ) + return -1.0; + + if ( ! ExtractNextPacketInternal() ) + return -1.0; + + if ( pseudo_realtime ) + { + // Delay packet if necessary. + double packet_time = CheckPseudoTime(); + if ( packet_time ) + return packet_time; + + SetIdle(true); + return -1.0; + } + + return current_packet.ts; + } + +void PktSrc::Process() + { + if ( ! IsOpen() ) + return; + + if ( ! ExtractNextPacketInternal() ) + return; + + int pkt_hdr_size = props.hdr_size; + + // Unfortunately some packets on the link might have MPLS labels + // while others don't. That means we need to ask the link-layer if + // labels are in place. + bool have_mpls = false; + + int protocol = 0; + const u_char* data = current_packet.data; + + switch ( props.link_type ) { + case DLT_NULL: + { + protocol = (data[3] << 24) + (data[2] << 16) + (data[1] << 8) + data[0]; + + // From the Wireshark Wiki: "AF_INET6, unfortunately, has + // different values in {NetBSD,OpenBSD,BSD/OS}, + // {FreeBSD,DragonFlyBSD}, and {Darwin/Mac OS X}, so an IPv6 + // packet might have a link-layer header with 24, 28, or 30 + // as the AF_ value." As we may be reading traces captured on + // platforms other than what we're running on, we accept them + // all here. + if ( protocol != AF_INET + && protocol != AF_INET6 + && protocol != 24 + && protocol != 28 + && protocol != 30 ) + { + Weird("non_ip_packet_in_null_transport", ¤t_packet); + data = 0; + return; + } + + break; + } + + case DLT_EN10MB: + { + // Get protocol being carried from the ethernet frame. + protocol = (data[12] << 8) + data[13]; + + switch ( protocol ) + { + // MPLS carried over the ethernet frame. + case 0x8847: + have_mpls = true; + break; + + // VLAN carried over the ethernet frame. + case 0x8100: + data += GetLinkHeaderSize(props.link_type); + data += 4; // Skip the vlan header + pkt_hdr_size = 0; + + // Check for 802.1ah (Q-in-Q) containing IP. + // Only do a second layer of vlan tag + // stripping because there is no + // specification that allows for deeper + // nesting. + if ( ((data[2] << 8) + data[3]) == 0x0800 ) + data += 4; + + break; + + // PPPoE carried over the ethernet frame. + case 0x8864: + data += GetLinkHeaderSize(props.link_type); + protocol = (data[6] << 8) + data[7]; + data += 8; // Skip the PPPoE session and PPP header + pkt_hdr_size = 0; + + if ( protocol != 0x0021 && protocol != 0x0057 ) + { + // Neither IPv4 nor IPv6. + Weird("non_ip_packet_in_pppoe_encapsulation", ¤t_packet); + data = 0; + return; + } + break; + } + + break; + } + + case DLT_PPP_SERIAL: + { + // Get PPP protocol. + protocol = (data[2] << 8) + data[3]; + + if ( protocol == 0x0281 ) + // MPLS Unicast + have_mpls = true; + + else if ( protocol != 0x0021 && protocol != 0x0057 ) + { + // Neither IPv4 nor IPv6. + Weird("non_ip_packet_in_ppp_encapsulation", ¤t_packet); + data = 0; + return; + } + break; + } + } + + if ( have_mpls ) + { + // Remove the data link layer + data += GetLinkHeaderSize(props.link_type); + + // Denote a header size of zero before the IP header + pkt_hdr_size = 0; + + // Skip the MPLS label stack. + bool end_of_stack = false; + + while ( ! end_of_stack ) + { + end_of_stack = *(data + 2) & 0x01; + data += 4; + } + } + + if ( pseudo_realtime ) + { + current_pseudo = CheckPseudoTime(); + net_packet_arrival(current_pseudo, current_packet.hdr, current_packet.data, pkt_hdr_size, this); + if ( ! first_wallclock ) + first_wallclock = current_time(true); + } + + else + net_packet_arrival(current_packet.ts, current_packet.hdr, current_packet.data, pkt_hdr_size, this); + + have_packet = 0; + DoneWithPacket(¤t_packet); + } + +const char* PktSrc::Tag() + { + return "PktSrc"; + } + +int PktSrc::ExtractNextPacketInternal() + { + if ( have_packet ) + return true; + + have_packet = false; + + // Don't return any packets if processing is suspended (except for the + // very first packet which we need to set up times). + if ( net_is_processing_suspended() && first_timestamp ) + { + SetIdle(true); + return 0; + } + + if ( pseudo_realtime ) + current_wallclock = current_time(true); + + if ( ExtractNextPacket(¤t_packet) ) + { + if ( ! first_timestamp ) + first_timestamp = current_packet.ts; + + have_packet = true; + return 1; + } + + if ( pseudo_realtime && using_communication && ! IsOpen() ) + { + // Source has gone dry, we're done. + if ( remote_trace_sync_interval ) + remote_serializer->SendFinalSyncPoint(); + else + remote_serializer->Terminate(); + } + + SetIdle(true); + return 0; + } + diff --git a/src/iosource/pktsrc/PktSrc.h b/src/iosource/pktsrc/PktSrc.h new file mode 100644 index 0000000000..3c3436bb19 --- /dev/null +++ b/src/iosource/pktsrc/PktSrc.h @@ -0,0 +1,140 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#ifndef IOSOURCE_PKTSRC_PKTSRC_H +#define IOSOURCE_PKTSRC_PKTSRC_H + +#include "../IOSource.h" + +struct pcap_pkthdr; + +namespace iosource { + +class PktSrc : public IOSource { +public: + struct Stats { + unsigned int received; // pkts received (w/o drops) + unsigned int dropped; // pkts dropped + unsigned int link; // total packets on link + // (not always not available) + // + Stats() { received = dropped = link = 0; } + }; + + PktSrc(); + virtual ~PktSrc(); + + const std::string& Path() const; + const std::string& Filter() const; + bool IsLive() const; + int LinkType() const; + const char* ErrorMsg() const; + int HdrSize() const; + int SnapLen() const; + + // Only valid in pseudo-realtime mode. + double CurrentPacketTimestamp(); + double CurrentPacketWallClock(); + + // Signal packet source that processing was suspended and is now + // going to be continued. + void ContinueAfterSuspend(); + + virtual void Statistics(Stats* stats) = 0; + + // Returns the packet last processed; false if there is no + // current packet available. + virtual bool GetCurrentPacket(const pcap_pkthdr** hdr, const u_char** pkt) = 0; + + // Precompiles a filter and associates the given index with it. + // Returns true on success, 0 if a problem occurred or filtering is + // not supported. + virtual int PrecompileFilter(int index, const std::string& filter) = 0; + + // Activates the filter with the given index. Returns true on + // success, 0 if a problem occurred or the filtering is not + // supported. + virtual int SetFilter(int index) = 0; + + static int GetLinkHeaderSize(int link_type); + +#if 0 + PktSrc_Filter_Type FilterType() const { return filter_type; } + + void AddSecondaryTablePrograms(); + const secondary_program_list& ProgramTable() const + { return program_list; } +#endif + +protected: + // Methods to use by derived classes. + + struct Properties { + std::string path; + std::string filter; // Maybe different than what's passed in if not (directly) supported. + int selectable_fd; + int link_type; + int hdr_size; + bool is_live; + }; + + struct Packet { + double ts; + const struct ::pcap_pkthdr* hdr; + const u_char* data; + }; + + void Opened(const Properties& props); + void Closed(); + void Info(const std::string& msg); + void Error(const std::string& msg); + void Weird(const std::string& msg, const Packet* pkt); + void InternalError(const std::string& msg); + + // PktSrc interface for derived classes to implement. + + virtual void Open() = 0; + virtual void Close() = 0; + // Returns 1 on success, 0 on time-out/gone dry. + virtual int ExtractNextPacket(Packet* pkt) = 0; + virtual void DoneWithPacket(Packet* pkt) = 0; + +private: + // Checks if the current packet has a pseudo-time <= current_time. + // If yes, returns pseudo-time, otherwise 0. + double CheckPseudoTime(); + + // XXX + int ExtractNextPacketInternal(); + + // IOSource interface implementation. + virtual void Init(); + virtual void Done(); + virtual void GetFds(int* read, int* write, int* except); + virtual double NextTimestamp(double* local_network_time); + virtual void Process(); + virtual const char* Tag(); + + Properties props; + + bool have_packet; + Packet current_packet; + + // Only set in pseudo-realtime mode. + double first_timestamp; + double first_wallclock; + double current_wallclock; + double current_pseudo; + double next_sync_point; // For trace synchronziation in pseudo-realtime + + std::string errbuf; + +#if 0 + PktSrc_Filter_Type filter_type; // normal path or secondary path + secondary_program_list program_list; +#endif +}; + +} + + +#endif diff --git a/src/iosource/pktsrc/old-2ndary-code.h b/src/iosource/pktsrc/old-2ndary-code.h new file mode 100644 index 0000000000..0b47cccdc5 --- /dev/null +++ b/src/iosource/pktsrc/old-2ndary-code.h @@ -0,0 +1,69 @@ +// Whether a PktSrc object is used by the normal filter structure or the +// secondary-path structure. +typedef enum { + TYPE_FILTER_NORMAL, // the normal filter + TYPE_FILTER_SECONDARY, // the secondary-path filter +} PktSrc_Filter_Type; + +// {filter,event} tuples conforming the secondary path. +class SecondaryEvent { +public: + SecondaryEvent(const char* arg_filter, Func* arg_event) + { + filter = arg_filter; + event = arg_event; + } + + const char* Filter() { return filter; } + Func* Event() { return event; } + +private: + const char* filter; + Func* event; +}; + +declare(PList,SecondaryEvent); +typedef PList(SecondaryEvent) secondary_event_list; + +class SecondaryPath { +public: + SecondaryPath(); + ~SecondaryPath(); + + secondary_event_list& EventTable() { return event_list; } + const char* Filter() { return filter; } + +private: + secondary_event_list event_list; + // OR'ed union of all SecondaryEvent filters + char* filter; +}; + +// Main secondary-path object. +extern SecondaryPath* secondary_path; + +// {program, {filter,event}} tuple table. +class SecondaryProgram { +public: + SecondaryProgram(BPF_Program* arg_program, SecondaryEvent* arg_event) + { + program = arg_program; + event = arg_event; + } + + ~SecondaryProgram(); + + BPF_Program* Program() { return program; } + SecondaryEvent* Event() { return event; } + +private: + // Associated program. + BPF_Program *program; + + // Event that is run in case the program is matched. + SecondaryEvent* event; +}; + +declare(PList,SecondaryProgram); +typedef PList(SecondaryProgram) secondary_program_list; + diff --git a/src/BPF_Program.cc b/src/iosource/pktsrc/pcap/BPF_Program.cc similarity index 100% rename from src/BPF_Program.cc rename to src/iosource/pktsrc/pcap/BPF_Program.cc diff --git a/src/BPF_Program.h b/src/iosource/pktsrc/pcap/BPF_Program.h similarity index 100% rename from src/BPF_Program.h rename to src/iosource/pktsrc/pcap/BPF_Program.h diff --git a/src/iosource/pktsrc/pcap/CMakeLists.txt b/src/iosource/pktsrc/pcap/CMakeLists.txt new file mode 100644 index 0000000000..b43d51b0ca --- /dev/null +++ b/src/iosource/pktsrc/pcap/CMakeLists.txt @@ -0,0 +1,8 @@ + +include(BroPlugin) + +include_directories(BEFORE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) + +bro_plugin_begin(Bro Pcap) +bro_plugin_cc(Source.cc Dumper.cc BPF_Program.cc Plugin.cc) +bro_plugin_end() diff --git a/src/iosource/pktsrc/pcap/Dumper.cc b/src/iosource/pktsrc/pcap/Dumper.cc new file mode 100644 index 0000000000..ad1ac2fa73 --- /dev/null +++ b/src/iosource/pktsrc/pcap/Dumper.cc @@ -0,0 +1,111 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#include +#include + +#include "Dumper.h" +#include "../PktSrc.h" + +using namespace iosource::pktsrc; + +PcapDumper::PcapDumper(const std::string& path, bool arg_append) + { + append = arg_append; + props.path = path; + dumper = 0; + pd = 0; + } + +PcapDumper::~PcapDumper() + { + } + +void PcapDumper::Open() + { + int linktype = -1; + + pd = pcap_open_dead(DLT_EN10MB, 8192); + if ( ! pd ) + { + Error("error for pcap_open_dead"); + return; + } + + if ( props.path.empty() ) + { + Error("no filename given"); + return; + } + + struct stat s; + int exists = 0; + + if ( append ) + { + // See if output file already exists (and is non-empty). + exists = stat(props.path.c_str(), &s); ; + + if ( exists < 0 && errno != ENOENT ) + { + Error(fmt("can't stat file %s: %s", props.path.c_str(), strerror(errno))); + return; + } + } + + if ( ! append || exists < 0 || s.st_size == 0 ) + { + // Open new file. + dumper = pcap_dump_open(pd, props.path.c_str()); + if ( ! dumper ) + { + Error(pcap_geterr(pd)); + return; + } + } + + else + { + // Old file and we need to append, which, unfortunately, + // is not supported by libpcap. So, we have to hack a + // little bit, knowing that pcap_dumpter_t is, in fact, + // a FILE ... :-( + dumper = (pcap_dumper_t*) fopen(props.path.c_str(), "a"); + if ( ! dumper ) + { + Error(fmt("can't open dump %s: %s", props.path.c_str(), strerror(errno))); + return; + } + } + + props.open_time = network_time; + props.hdr_size = PktSrc::GetLinkHeaderSize(pcap_datalink(pd)); + Opened(props); + } + +void PcapDumper::Close() + { + if ( ! dumper ) + return; + + pcap_dump_close(dumper); + pcap_close(pd); + dumper = 0; + pd = 0; + + Closed(); + } + +bool PcapDumper::Dump(const Packet* pkt) + { + if ( ! dumper ) + return false; + + pcap_dump((u_char*) dumper, pkt->hdr, pkt->data); + + return true; + } + +iosource::PktDumper* PcapDumper::Instantiate(const std::string& path, bool append) + { + return new PcapDumper(path, append); + } diff --git a/src/iosource/pktsrc/pcap/Dumper.h b/src/iosource/pktsrc/pcap/Dumper.h new file mode 100644 index 0000000000..c2762a2b04 --- /dev/null +++ b/src/iosource/pktsrc/pcap/Dumper.h @@ -0,0 +1,40 @@ + +#ifndef IOSOURCE_PKTSRC_PCAP_DUMPER_H +#define IOSOURCE_PKTSRC_PCAP_DUMPER_H + +extern "C" { +#include +} + +#include "../PktDumper.h" + +namespace iosource { +namespace pktsrc { + +class PcapDumper : public PktDumper { +public: + PcapDumper(const std::string& path, bool append); + virtual ~PcapDumper(); + + static PktDumper* Instantiate(const std::string& path, bool appen); + +protected: + // PktDumper interface. + virtual void Open(); + virtual void Close(); + virtual bool Dump(const Packet* pkt); + +private: + Properties props; + + bool append; + pcap_dumper_t* dumper; + pcap_t* pd; +}; + +} +} + +#endif + + diff --git a/src/iosource/pktsrc/pcap/Plugin.cc b/src/iosource/pktsrc/pcap/Plugin.cc new file mode 100644 index 0000000000..307f2da99f --- /dev/null +++ b/src/iosource/pktsrc/pcap/Plugin.cc @@ -0,0 +1,12 @@ + +#include "plugin/Plugin.h" +#include "iosource/pktsrc/Component.h" + +#include "Source.h" +#include "Dumper.h" + +BRO_PLUGIN_BEGIN(Bro, Pcap) + BRO_PLUGIN_DESCRIPTION("Packet I/O via libpcap"); + BRO_PLUGIN_PKTSRC("PcapReader", "pcap", SourceComponent::BOTH, PcapSource); + BRO_PLUGIN_PKTDUMPER("PcapTraceWriter", "pcap", PcapDumper); +BRO_PLUGIN_END diff --git a/src/iosource/pktsrc/pcap/Source.cc b/src/iosource/pktsrc/pcap/Source.cc new file mode 100644 index 0000000000..86c0273adf --- /dev/null +++ b/src/iosource/pktsrc/pcap/Source.cc @@ -0,0 +1,343 @@ + +#include "config.h" + +#include "Source.h" + +#ifdef HAVE_PCAP_INT_H +#include +#endif + +using namespace iosource::pktsrc; + +PcapSource::~PcapSource() + { + Close(); + } + +PcapSource::PcapSource(const std::string& path, const std::string& filter, bool is_live) + { + props.path = path; + props.filter = filter; + props.is_live = is_live; + last_data = 0; + } + +void PcapSource::Open() + { + if ( props.is_live ) + OpenLive(); + else + OpenOffline(); + } + +void PcapSource::Close() + { + if ( ! pd ) + return; + + BPF_Program* code; + IterCookie* cookie = filters.InitForIteration(); + while ( (code = filters.NextEntry(cookie)) ) + delete code; + + filters.Clear(); + + pcap_close(pd); + pd = 0; + last_data = 0; + + Closed(); + } + +void PcapSource::OpenLive() + { + char errbuf[PCAP_ERRBUF_SIZE]; + char tmp_errbuf[PCAP_ERRBUF_SIZE]; + +#if 0 + filter_type = ft; +#endif + + // Determine interface if not specified. + if ( props.path.empty() ) + props.path = pcap_lookupdev(tmp_errbuf); + + if ( props.path.empty() ) + { + safe_snprintf(errbuf, sizeof(errbuf), + "pcap_lookupdev: %s", tmp_errbuf); + Error(errbuf); + return; + } + + // Determine network and netmask. + uint32 net; + if ( pcap_lookupnet(props.path.c_str(), &net, &netmask, tmp_errbuf) < 0 ) + { + // ### The lookup can fail if no address is assigned to + // the interface; and libpcap doesn't have any useful notion + // of error codes, just error std::strings - how bogus - so we + // just kludge around the error :-(. + // sprintf(errbuf, "pcap_lookupnet %s", tmp_errbuf); + // return; + netmask = 0xffffff00; + } + + // We use the smallest time-out possible to return almost immediately if + // no packets are available. (We can't use set_nonblocking() as it's + // broken on FreeBSD: even when select() indicates that we can read + // something, we may get nothing if the store buffer hasn't filled up + // yet.) + pd = pcap_open_live(props.path.c_str(), SnapLen(), 1, 1, tmp_errbuf); + + if ( ! pd ) + { + safe_snprintf(errbuf, sizeof(errbuf), + "pcap_open_live: %s", tmp_errbuf); + Error(errbuf); + return; + } + + // ### This needs autoconf'ing. +#ifdef HAVE_PCAP_INT_H + Info("pcap bufsize = %d\n", ((struct pcap *) pd)->bufsize); +#endif + +#ifdef HAVE_LINUX + if ( pcap_setnonblock(pd, 1, tmp_errbuf) < 0 ) + { + safe_snprintf(errbuf, sizeof(errbuf), + "pcap_setnonblock: %s", tmp_errbuf); + Error(errbuf); + pcap_close(pd); + return; + } +#endif + + props.selectable_fd = pcap_fileno(pd); + + if ( PrecompileFilter(0, props.filter) && SetFilter(0) ) + { + SetHdrSize(); + + if ( ! pd ) + // Was closed, couldn't get header size. + return; + + Info(fmt("listening on %s, capture length %d bytes\n", props.path.c_str(), SnapLen())); + } + else + Close(); + + props.is_live = true; + Opened(props); + } + +void PcapSource::OpenOffline() + { + char errbuf[PCAP_ERRBUF_SIZE]; + char tmp_errbuf[PCAP_ERRBUF_SIZE]; + +#if 0 + filter_type = ft; +#endif + + pd = pcap_open_offline(props.path.c_str(), errbuf); + + if ( ! pd ) + { + safe_snprintf(errbuf, sizeof(errbuf), + "pcap_open_offline: %s", tmp_errbuf); + Error(errbuf); + return; + } + + if ( PrecompileFilter(0, props.filter) && SetFilter(0) ) + { + SetHdrSize(); + + if ( ! pd ) + // Was closed, unknown link layer type. + return; + + // We don't put file sources into non-blocking mode as + // otherwise we would not be able to identify the EOF. + + props.selectable_fd = fileno(pcap_file(pd)); + + if ( props.selectable_fd < 0 ) + InternalError("OS does not support selectable pcap fd"); + } + + else + Close(); + + props.is_live = false; + Opened(props); + } + +int PcapSource::ExtractNextPacket(Packet* pkt) + { + const u_char* data = pcap_next(pd, ¤t_hdr); + + if ( ! data ) + { + // Source has gone dry. If it's a network interface, this just means + // it's timed out. If it's a file, though, then the file has been + // exhausted. + if ( ! props.is_live ) + Close(); + + return 0; + } + + pkt->ts = current_hdr.ts.tv_sec + double(current_hdr.ts.tv_usec) / 1e6; + pkt->hdr = ¤t_hdr; + pkt->data = last_data = data; + + if ( current_hdr.len == 0 || current_hdr.caplen == 0 ) + { + Weird("empty_pcap_header", pkt); + return 0; + } + + last_hdr = current_hdr; + last_data = data; + ++stats.received; + return 1; + } + +void PcapSource::DoneWithPacket(Packet* pkt) + { + // Nothing to do. + } + +int PcapSource::PrecompileFilter(int index, const std::string& filter) + { + char errbuf[PCAP_ERRBUF_SIZE]; + + // Compile filter. + BPF_Program* code = new BPF_Program(); + + if ( ! code->Compile(pd, filter.c_str(), netmask, errbuf, sizeof(errbuf)) ) + { + PcapError(); + delete code; + return 0; + } + + // Store it in hash. + HashKey* hash = new HashKey(HashKey(bro_int_t(index))); + BPF_Program* oldcode = filters.Lookup(hash); + if ( oldcode ) + delete oldcode; + + filters.Insert(hash, code); + delete hash; + + return 1; + } + +int PcapSource::SetFilter(int index) + { + char errbuf[PCAP_ERRBUF_SIZE]; + +#if 0 + // We don't want load-level filters for the secondary path. + if ( filter_type == TYPE_FILTER_SECONDARY && index > 0 ) + return 1; +#endif + + HashKey* hash = new HashKey(HashKey(bro_int_t(index))); + BPF_Program* code = filters.Lookup(hash); + delete hash; + + if ( ! code ) + { + safe_snprintf(errbuf, sizeof(errbuf), + "No precompiled pcap filter for index %d", + index); + Error(errbuf); + return 0; + } + + if ( pcap_setfilter(pd, code->GetProgram()) < 0 ) + { + PcapError(); + return 0; + } + +#ifndef HAVE_LINUX + // Linux doesn't clear counters when resetting filter. + stats.received = stats.dropped = stats.link = 0; +#endif + + return 1; + } + +void PcapSource::Statistics(Stats* s) + { + char errbuf[PCAP_ERRBUF_SIZE]; + + if ( ! props.is_live ) + s->received = s->dropped = s->link = 0; + + else + { + struct pcap_stat pstat; + if ( pcap_stats(pd, &pstat) < 0 ) + { + PcapError(); + s->received = s->dropped = s->link = 0; + } + + else + { + s->dropped = pstat.ps_drop; + s->link = pstat.ps_recv; + } + } + + s->received = stats.received; + + if ( ! props.is_live ) + s->dropped = 0; + } + +bool PcapSource::GetCurrentPacket(const pcap_pkthdr** hdr, const u_char** pkt) + { + if ( ! last_data ) + return false; + + *hdr = &last_hdr; + *pkt = last_data; + return true; + } + +void PcapSource::PcapError() + { + assert(pd); + Error(fmt("pcap_error: %s", pcap_geterr(pd))); + Close(); + } + +void PcapSource::SetHdrSize() + { + char errbuf[PCAP_ERRBUF_SIZE]; + + props.link_type = pcap_datalink(pd); + props.hdr_size = GetLinkHeaderSize(props.link_type); + + if ( props.hdr_size < 0 ) + { + safe_snprintf(errbuf, sizeof(errbuf), + "unknown data link type 0x%x", props.link_type); + Error(errbuf); + Close(); + } + } + +iosource::PktSrc* PcapSource::Instantiate(const std::string& path, const std::string& filter, bool is_live) + { + return new PcapSource(path, filter, is_live); + } diff --git a/src/iosource/pktsrc/pcap/Source.h b/src/iosource/pktsrc/pcap/Source.h new file mode 100644 index 0000000000..9f1d7c7eb8 --- /dev/null +++ b/src/iosource/pktsrc/pcap/Source.h @@ -0,0 +1,60 @@ +// See the file "COPYING" in the main distribution directory for copyright. + +#ifndef IOSOURCE_PKTSRC_PCAP_SOURCE_H +#define IOSOURCE_PKTSRC_PCAP_SOURCE_H + +extern "C" { +#include +} + +#include "../PktSrc.h" +#include "BPF_Program.h" +#include "Dict.h" + +declare(PDict,BPF_Program); + +namespace iosource { +namespace pktsrc { + +class PcapSource : public iosource::PktSrc { +public: + // XXX + PcapSource(const std::string& path, const std::string& filter, bool is_live); + + virtual ~PcapSource(); + + static PktSrc* Instantiate(const std::string& path, const std::string& filter, bool is_live); + +protected: + // PktSrc interface. + virtual void Open(); + virtual void Close(); + virtual int ExtractNextPacket(Packet* pkt); + virtual void DoneWithPacket(Packet* pkt); + virtual int PrecompileFilter(int index, const std::string& filter); + virtual int SetFilter(int index); + virtual void Statistics(Stats* stats); + virtual bool GetCurrentPacket(const pcap_pkthdr** hdr, const u_char** pkt); + +private: + void OpenLive(); + void OpenOffline(); + void PcapError(); + void SetHdrSize(); + + Properties props; + Stats stats; + + pcap_t *pd; + uint32 netmask; + PDict(BPF_Program) filters; + + struct pcap_pkthdr current_hdr; + struct pcap_pkthdr last_hdr; + const u_char* last_data; +}; + +} +} + +#endif diff --git a/src/main.cc b/src/main.cc index 53a0cb20ee..cf4bd423b6 100644 --- a/src/main.cc +++ b/src/main.cc @@ -62,6 +62,7 @@ extern "C" void OPENSSL_add_all_algorithms_conf(void); #include "plugin/Manager.h" #include "file_analysis/Manager.h" #include "broxygen/Manager.h" +#include "iosource/Manager.h" #include "binpac_bro.h" @@ -100,6 +101,7 @@ plugin::Manager* plugin_mgr = 0; analyzer::Manager* analyzer_mgr = 0; file_analysis::Manager* file_mgr = 0; broxygen::Manager* broxygen_mgr = 0; +iosource::Manager* iosource_mgr = 0; Stmt* stmts; EventHandlerPtr net_done = 0; RuleMatcher* rule_matcher = 0; @@ -116,7 +118,10 @@ int signal_val = 0; int optimize = 0; int do_notice_analysis = 0; int rule_bench = 0; +int generate_documentation = 0; +#if 0 SecondaryPath* secondary_path = 0; +#endif extern char version[]; char* command_line_policy = 0; vector params; @@ -379,7 +384,9 @@ void terminate_bro() delete event_serializer; delete state_serializer; delete event_registry; +#if 0 delete secondary_path; +#endif delete remote_serializer; delete analyzer_mgr; delete log_mgr; @@ -845,6 +852,7 @@ int main(int argc, char** argv) input_mgr = new input::Manager(); plugin_mgr = new plugin::Manager(); file_mgr = new file_analysis::Manager(); + iosource_mgr = new iosource::Manager(); plugin_mgr->InitPreScript(); analyzer_mgr->InitPreScript(); @@ -967,13 +975,15 @@ int main(int argc, char** argv) snaplen = internal_val("snaplen")->AsCount(); +#if 0 // Initialize the secondary path, if it's needed. secondary_path = new SecondaryPath(); +#endif if ( dns_type != DNS_PRIME ) net_init(interfaces, read_files, netflows, flow_files, writefile, "", - secondary_path->Filter(), do_watchdog); + "", do_watchdog); BroFile::SetDefaultRotation(log_rotate_interval, log_max_size); @@ -1132,9 +1142,9 @@ int main(int argc, char** argv) have_pending_timers = ! reading_traces && timer_mgr->Size() > 0; - io_sources.Register(thread_mgr, true); + iosource_mgr->Register(thread_mgr, true); - if ( io_sources.Size() > 0 || + if ( iosource_mgr->Size() > 0 || have_pending_timers || BifConst::exit_only_after_terminate ) { diff --git a/src/plugin/Component.cc b/src/plugin/Component.cc index ee18d55cdf..94de2c5446 100644 --- a/src/plugin/Component.cc +++ b/src/plugin/Component.cc @@ -43,6 +43,18 @@ void Component::Describe(ODesc* d) const d->Add("File Analyzer"); break; + case component::IOSOURCE: + d->Add("I/O Source"); + break; + + case component::PKTSRC: + d->Add("Packet Source"); + break; + + case component::PKTDUMPER: + d->Add("Packet Dumper"); + break; + default: reporter->InternalWarning("unknown component type in plugin::Component::Describe"); d->Add(""); diff --git a/src/plugin/Component.h b/src/plugin/Component.h index ad02dc7e4b..04248f54a9 100644 --- a/src/plugin/Component.h +++ b/src/plugin/Component.h @@ -3,6 +3,8 @@ #ifndef PLUGIN_COMPONENT_H #define PLUGIN_COMPONENT_H +#include + class ODesc; namespace plugin { @@ -16,7 +18,10 @@ enum Type { READER, /// An input reader (not currently used). WRITER, /// An logging writer (not currenly used). ANALYZER, /// A protocol analyzer. - FILE_ANALYZER /// A file analyzer. + FILE_ANALYZER, /// A file analyzer. + IOSOURCE, /// An I/O source, excluding packet sources. + PKTSRC, /// A packet source. + PKTDUMPER /// A packet dumper. }; } diff --git a/src/plugin/Macros.h b/src/plugin/Macros.h index 9362642e91..c2f8a2ed7a 100644 --- a/src/plugin/Macros.h +++ b/src/plugin/Macros.h @@ -130,4 +130,31 @@ #define BRO_PLUGIN_SUPPORT_ANALYZER(tag) \ AddComponent(new ::analyzer::Component(tag, 0)); +/** + * Defines a component implementing an IOSource (excluding packet sources). + * + * XXXX + * + */ +#define BRO_PLUGIN_IOSOURCE(tag, cls) \ + AddComponent(new ::iosource::pktsrc::Component(tag, iosource::cls::Instantiate)); + +/** + * Defines a component implementing a PktSrc. + * + * XXXX + * + */ +#define BRO_PLUGIN_PKTSRC(tag, prefix, type, cls) \ + AddComponent(new ::iosource::pktsrc::SourceComponent(tag, prefix, iosource::pktsrc::type, iosource::pktsrc::cls::Instantiate)); + +/** + * Defines a component implementing a PktDumper. + * + * XXXX + * + */ +#define BRO_PLUGIN_PKTDUMPER(tag, prefix, cls) \ + AddComponent(new ::iosource::pktsrc::DumperComponent(tag, prefix, iosource::pktsrc::cls::Instantiate)); + #endif diff --git a/src/threading/Manager.cc b/src/threading/Manager.cc index 4491cd42b5..fdc422bd1f 100644 --- a/src/threading/Manager.cc +++ b/src/threading/Manager.cc @@ -11,7 +11,7 @@ Manager::Manager() did_process = true; next_beat = 0; terminating = false; - idle = true; + SetIdle(true); } Manager::~Manager() @@ -47,8 +47,8 @@ void Manager::Terminate() all_threads.clear(); msg_threads.clear(); - idle = true; - closed = true; + SetIdle(true); + SetClosed(true); terminating = false; } @@ -56,7 +56,7 @@ void Manager::AddThread(BasicThread* thread) { DBG_LOG(DBG_THREADING, "Adding thread %s ...", thread->Name()); all_threads.push_back(thread); - idle = false; + SetIdle(false); } void Manager::AddMsgThread(MsgThread* thread) diff --git a/src/threading/Manager.h b/src/threading/Manager.h index e839749a91..c94cc41aaa 100644 --- a/src/threading/Manager.h +++ b/src/threading/Manager.h @@ -4,7 +4,7 @@ #include -#include "IOSource.h" +#include "iosource/IOSource.h" #include "BasicThread.h" #include "MsgThread.h" @@ -21,7 +21,7 @@ namespace threading { * their outgoing message queue on a regular basis and feeds data sent into * the rest of Bro. It also triggers the regular heartbeats. */ -class Manager : public IOSource +class Manager : public iosource::IOSource { public: /** diff --git a/src/util.cc b/src/util.cc index ad55e3f75e..358e88b0e2 100644 --- a/src/util.cc +++ b/src/util.cc @@ -43,6 +43,7 @@ #include "NetVar.h" #include "Net.h" #include "Reporter.h" +#include "iosource/Manager.h" /** * Return IP address without enclosing brackets and any leading 0x. @@ -1351,11 +1352,13 @@ double current_time(bool real) double t = double(tv.tv_sec) + double(tv.tv_usec) / 1e6; - if ( ! pseudo_realtime || real || pkt_srcs.length() == 0 ) + const iosource::Manager::PktSrcList& pkt_srcs(iosource_mgr->GetPktSrcs()); + + if ( ! pseudo_realtime || real || pkt_srcs.empty() ) return t; // This obviously only works for a single source ... - PktSrc* src = pkt_srcs[0]; + iosource::PktSrc* src = pkt_srcs.front(); if ( net_is_processing_suspended() ) return src->CurrentPacketTimestamp();