diff --git a/scripts/base/init-bare.zeek b/scripts/base/init-bare.zeek index 7d539f3948..a253dc1fca 100644 --- a/scripts/base/init-bare.zeek +++ b/scripts/base/init-bare.zeek @@ -4978,6 +4978,71 @@ export { }; } +module MQTT; +export { + type MQTT::ConnectMsg: record { + ## Protocol name + protocol_name : string; + ## Protocol version + protocol_version : count; + + ## Identifies the Client to the Server. + client_id : string; + ## The maximum time interval that is permitted to elapse between the + ## point at which the Client finishes transmitting one Control Packet + ## and the point it starts sending the next. + keep_alive : interval; + + ## The clean_session flag indicates if the server should or shouldn't + ## use a clean session or use existing previous session state. + clean_session : bool; + + ## Specifies if the Will Message is to be retained when it is published. + will_retain : bool; + ## Specifies the QoS level to be used when publishing the Will Message. + will_qos : count; + ## Topic to publish the Will message to. + will_topic : string &optional; + ## The actual Will message to publish. + will_msg : string &optional; + + ## Username to use for authentication to the server. + username : string &optional; + ## Pass to use for authentication to the server. + password : string &optional; + }; + + type MQTT::ConnectAckMsg: record { + ## Return code from the connack message + return_code: count; + + ## The Session present flag helps the client + ## establish whether the Client and Server + ## have a consistent view about whether there + ## is already stored Session state. + session_present: bool; + }; + + type MQTT::PublishMsg: record { + ## Indicates if this is the first attempt at publishing the message. + dup : bool; + + ## Indicates what level of QoS is enabled for this message. + qos : count; + + ## Indicates if the server should retain this message so that clients + ## subscribing to the topic in the future will receive this message + ## automatically. + retain : bool; + + ## Name of the topic the published message is directed into. + topic : string; + + ## Payload of the published message. + payload : string; + }; +} + module Cluster; export { type Cluster::Pool: record {}; diff --git a/scripts/base/init-default.zeek b/scripts/base/init-default.zeek index 8c8aca01f8..52fe7dca8a 100644 --- a/scripts/base/init-default.zeek +++ b/scripts/base/init-default.zeek @@ -54,6 +54,7 @@ @load base/protocols/irc @load base/protocols/krb @load base/protocols/modbus +@load base/protocols/mqtt @load base/protocols/mysql @load base/protocols/ntlm @load base/protocols/ntp diff --git a/scripts/base/protocols/mqtt/__load__.bro b/scripts/base/protocols/mqtt/__load__.bro new file mode 100644 index 0000000000..7b9b0d9a6c --- /dev/null +++ b/scripts/base/protocols/mqtt/__load__.bro @@ -0,0 +1,2 @@ +@load ./main +@load-sigs ./dpd.sig diff --git a/scripts/base/protocols/mqtt/consts.bro b/scripts/base/protocols/mqtt/consts.bro new file mode 100644 index 0000000000..fb5bc261dc --- /dev/null +++ b/scripts/base/protocols/mqtt/consts.bro @@ -0,0 +1,43 @@ +##! Constants definitions for MQTT + +module MQTT; + +export { + const msg_types = { + [1] = "connect", + [2] = "connack", + [3] = "publish", + [4] = "puback", + [5] = "pubrec", + [6] = "pubrel", + [7] = "pubcomp", + [8] = "subscribe", + [9] = "suback", + [10] = "unsubscribe", + [11] = "unsuback", + [12] = "pingreq", + [13] = "pingresp", + [14] = "disconnect", + } &default = function(n: count): string { return fmt("unknown-msg-type-%d", n); }; + + const versions = { + [3] = "3.1", + [4] = "3.1.1", + [5] = "5.0", + } &default = function(n: count): string { return fmt("unknown-version-%d", n); }; + + const qos_levels = { + [0] = "at most once", + [1] = "at least once", + [2] = "exactly once", + } &default = function(n: count): string { return fmt("unknown-qos-level-%d", n); }; + + const return_codes = { + [0] = "Connection Accepted", + [1] = "Refused: unacceptable protocol version", + [2] = "Refused: identifier rejected", + [3] = "Refused: server unavailable", + [4] = "Refused: bad user name or password", + [5] = "Refused: not authorized", + } &default = function(n: count): string { return fmt("unknown-return-code-%d", n); }; +} diff --git a/scripts/base/protocols/mqtt/dpd.sig b/scripts/base/protocols/mqtt/dpd.sig new file mode 100644 index 0000000000..206c20337f --- /dev/null +++ b/scripts/base/protocols/mqtt/dpd.sig @@ -0,0 +1,5 @@ + +signature dpd_mqtt { + payload /^.{4,7}MQ/ + enable "mqtt" +} diff --git a/scripts/base/protocols/mqtt/main.bro b/scripts/base/protocols/mqtt/main.bro new file mode 100644 index 0000000000..96a061a94e --- /dev/null +++ b/scripts/base/protocols/mqtt/main.bro @@ -0,0 +1,349 @@ +##! Implements base functionality for MQTT analysis. +##! Generates the mqtt.log file. + +module MQTT; + +@load ./consts.bro + +export { + redef enum Log::ID += { + CONNECT_LOG, + SUBSCRIBE_LOG, + PUBLISH_LOG, + }; + + type MQTT::SubUnsub: enum { + MQTT::SUBSCRIBE, + MQTT::UNSUBSCRIBE, + } &redef; + + type ConnectInfo: record { + ## Timestamp for when the event happened + ts: time &log; + ## Unique ID for the connection + uid: string &log; + ## The connection's 4-tuple of endpoint addresses/ports + id: conn_id &log; + + ## Indicates the protocol name + proto_name: string &log &optional; + ## The version of the protocol in use + proto_version: string &log &optional; + ## Unique identifier for the client + client_id: string &log &optional; + ## Status message from the server in response to the connect request + connect_status: string &log &optional; + + ## Topic to publish a "last will and testament" message to + will_topic: string &log &optional; + ## Payload to publish as a "last will and testament" + will_payload: string &log &optional; + }; + + type SubscribeInfo: record { + ## Timestamp for when the subscribe or unsubscribe request started + ts: time &log; + ## UID for the connection + uid: string &log; + ## ID fields for the connection + id: conn_id &log; + + ## Indicates if a subscribe or unsubscribe action is taking place + action: SubUnsub &log; + ## The topic (or topic pattern) being subscribed to + topic: string &log; + ## QoS level requested for messages from subscribed topics + qos_level: count &log &optional; + ## QoS level the server granted + granted_qos_level: count &log &optional; + ## Indicates if the request was acked by the server + ack: bool &log &default=F; + }; + + type PublishInfo: record { + ## Timestamp for when the publish message started + ts: time &log; + ## UID for the connection + uid: string &log; + ## ID fields for the connection + id: conn_id &log; + + ## Indicates if the message was published by the client of + ## this connection or published to the client. + from_client: bool &log; + ## Indicates if the message was to be retained by the server + retain: bool &log; + ## QoS level set for the message + qos: string &log; + ## Status of the published message. This will be set to "incomplete_qos" + ## if the full back and forth for the requested level of QoS was not seen. + ## Otherwise if it's successful the field will be "ok". + status: string &log &default="incomplete_qos"; + + ## Topic the message was published to + topic: string &log; + ## Payload of the message + payload: string &log; + + ## NOT LOGGED! Track if the message was acked + ack: bool &default=F; + ## NOT LOGGED! Indicates if the server sent the RECEIVED qos message + rec: bool &default=F; + ## NOT LOGGED! Indicates if the client sent the RELEASE qos message + rel: bool &default=F; + ## NOT LOGGED! Indicates if the server sent the COMPLETE qos message + comp: bool &default=F; + ## NOT LOGGED! Internally used for comparing numeric qos level + qos_level: count &default=0; + }; + + ## Event that can be handled to access the MQTT record as it is sent on + ## to the logging framework. + global MQTT::log_mqtt: event(rec: ConnectInfo); +} + +global publish_expire: function(tbl: table[count] of PublishInfo, idx: count): interval; +global subscribe_expire: function(tbl: table[count] of SubscribeInfo, idx: count): interval; + +type State: record { + publish: table[count] of PublishInfo &optional &write_expire=5secs &expire_func=publish_expire; + subscribe: table[count] of SubscribeInfo &optional &write_expire=5secs &expire_func=subscribe_expire; +}; + +function publish_expire(tbl: table[count] of PublishInfo, idx: count): interval + { + Log::write(PUBLISH_LOG, tbl[idx]); + return 0sec; + } + +function subscribe_expire(tbl: table[count] of SubscribeInfo, idx: count): interval + { + Log::write(SUBSCRIBE_LOG, tbl[idx]); + return 0sec; + } + +redef record connection += { + mqtt: ConnectInfo &optional; + mqtt_state: State &optional; +}; + +const ports = { 1883/tcp }; +redef likely_server_ports += { ports }; + +event bro_init() &priority=5 + { + Log::create_stream(MQTT::CONNECT_LOG, [$columns=ConnectInfo, $ev=log_mqtt, $path="mqtt_connect"]); + Log::create_stream(MQTT::SUBSCRIBE_LOG, [$columns=SubscribeInfo, $path="mqtt_subscribe"]); + Log::create_stream(MQTT::PUBLISH_LOG, [$columns=PublishInfo, $path="mqtt_publish"]); + + Analyzer::register_for_ports(Analyzer::ANALYZER_MQTT, ports); + } + +function set_session(c: connection): ConnectInfo + { + if ( ! c?$mqtt ) + c$mqtt = ConnectInfo($ts = network_time(), + $uid = c$uid, + $id = c$id); + + if ( ! c?$mqtt_state ) + { + c$mqtt_state = State(); + c$mqtt_state$publish = table(); + c$mqtt_state$subscribe = table(); + } + + return c$mqtt; + } + +event mqtt_connect(c: connection, msg: MQTT::ConnectMsg) &priority=5 + { + local info = set_session(c); + + info$proto_name = msg$protocol_name; + info$proto_version = versions[msg$protocol_version]; + info$client_id = msg$client_id; + if ( msg?$will_topic ) + info$will_topic = msg$will_topic; + if ( msg?$will_msg ) + info$will_payload = msg$will_msg; + } + +event mqtt_connack(c: connection, msg: MQTT::ConnectAckMsg) &priority=5 + { + local info = set_session(c); + + info$connect_status = return_codes[msg$return_code]; + + Log::write(CONNECT_LOG, info); + } + +event mqtt_publish(c: connection, is_orig: bool, msg_id: count, msg: MQTT::PublishMsg) &priority=5 + { + set_session(c); + + local pi = PublishInfo($ts=network_time(), + $uid=c$uid, + $id=c$id, + $from_client=is_orig, + $retain=msg$retain, + $qos=qos_levels[msg$qos], + $qos_level=msg$qos, + $topic=msg$topic, + $payload=msg$payload); + if ( pi$qos_level == 0 ) + pi$status="ok"; + + c$mqtt_state$publish[msg_id] = pi; + } + +event mqtt_publish(c: connection, is_orig: bool, msg_id: count, msg: MQTT::PublishMsg) &priority=-5 + { + local pi = c$mqtt_state$publish[msg_id]; + + if ( pi$qos_level == 0 ) + { + Log::write(PUBLISH_LOG, pi); + delete c$mqtt_state$publish[msg_id]; + } + } + +event mqtt_puback(c: connection, is_orig: bool, msg_id: count) &priority=5 + { + set_session(c); + + if ( msg_id in c$mqtt_state$publish ) + { + local pi = c$mqtt_state$publish[msg_id]; + pi$ack = T; + if ( pi$qos_level == 1 ) + pi$status = "ok"; + } + } + +event mqtt_puback(c: connection, is_orig: bool, msg_id: count) &priority=-5 + { + if ( msg_id in c$mqtt_state$publish ) + { + local pi = c$mqtt_state$publish[msg_id]; + + if ( pi$status == "ok" ) + { + Log::write(PUBLISH_LOG, pi); + delete c$mqtt_state$publish[msg_id]; + } + } + } + +event mqtt_pubrec(c: connection, is_orig: bool, msg_id: count) &priority=5 + { + set_session(c); + + if ( msg_id in c$mqtt_state$publish ) + { + local pi = c$mqtt_state$publish[msg_id]; + pi$rec = T; + } + } + +event mqtt_pubrel(c: connection, is_orig: bool, msg_id: count) &priority=5 + { + set_session(c); + + if ( msg_id in c$mqtt_state$publish ) + { + local pi = c$mqtt_state$publish[msg_id]; + pi$rel = T; + } + } + +event mqtt_pubcomp(c: connection, is_orig: bool, msg_id: count) &priority=5 + { + local info = set_session(c); + if ( msg_id !in c$mqtt_state$publish ) + return; + + local pi = c$mqtt_state$publish[msg_id]; + pi$comp = T; + + if ( pi$qos_level == 2 && pi$rec && pi$rel && pi$comp ) + pi$status = "ok"; + } + +event mqtt_pubcomp(c: connection, is_orig: bool, msg_id: count) &priority=-5 + { + if ( msg_id !in c$mqtt_state$publish ) + return; + + local pi = c$mqtt_state$publish[msg_id]; + if ( pi$status == "ok" ) + { + Log::write(PUBLISH_LOG, pi); + delete c$mqtt_state$publish[msg_id]; + } + } + + +event mqtt_subscribe(c: connection, msg_id: count, topic: string, requested_qos: count) &priority=5 + { + local info = set_session(c); + + local si = SubscribeInfo($ts = network_time(), + $uid = c$uid, + $id = c$id, + $action = MQTT::SUBSCRIBE, + $topic = topic, + $qos_level = requested_qos); + + c$mqtt_state$subscribe[msg_id] = si; + } + +event mqtt_suback(c: connection, msg_id: count, granted_qos: count) &priority=5 + { + set_session(c); + + local x = c$mqtt_state$subscribe[msg_id]; + x$granted_qos_level = granted_qos; + x$ack = T; + + Log::write(MQTT::SUBSCRIBE_LOG, x); + delete c$mqtt_state$subscribe[msg_id]; + } + +event mqtt_unsubscribe(c: connection, msg_id: count, topic: string) &priority=5 + { + set_session(c); + + local si = SubscribeInfo($ts = network_time(), + $uid = c$uid, + $id = c$id, + $action = MQTT::UNSUBSCRIBE, + $topic = topic); + + c$mqtt_state$subscribe[msg_id] = si; + } + +event mqtt_unsuback(c: connection, msg_id: count) &priority=-5 + { + set_session(c); + + local x = c$mqtt_state$subscribe[msg_id]; + x$ack = T; + + Log::write(MQTT::SUBSCRIBE_LOG, x); + delete c$mqtt_state$subscribe[msg_id]; + } + +#event mqtt_pingreq(c: connection) &priority=5 +# { +# } +# +#event mqtt_pingresp(c: connection) &priority=5 +# { +# } + +event mqtt_disconnect(c: connection) &priority=-5 + { + #Log::write(MQTT::CONNECT_LOG, info); + } + diff --git a/src/analyzer/protocol/CMakeLists.txt b/src/analyzer/protocol/CMakeLists.txt index d5024a2ff1..a6f091fa45 100644 --- a/src/analyzer/protocol/CMakeLists.txt +++ b/src/analyzer/protocol/CMakeLists.txt @@ -22,6 +22,7 @@ add_subdirectory(krb) add_subdirectory(login) add_subdirectory(mime) add_subdirectory(modbus) +add_subdirectory(mqtt) add_subdirectory(mysql) add_subdirectory(ncp) add_subdirectory(netbios) diff --git a/src/analyzer/protocol/mqtt/CMakeLists.txt b/src/analyzer/protocol/mqtt/CMakeLists.txt new file mode 100644 index 0000000000..5b3a7d5066 --- /dev/null +++ b/src/analyzer/protocol/mqtt/CMakeLists.txt @@ -0,0 +1,27 @@ + +include(BroPlugin) + +include_directories(BEFORE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) + +bro_plugin_begin(Bro MQTT) +bro_plugin_cc(MQTT.cc Plugin.cc) +bro_plugin_bif(types.bif events.bif) +bro_plugin_pac(mqtt.pac + mqtt-protocol.pac + commands/connect.pac + commands/connack.pac + commands/publish.pac + commands/puback.pac + commands/pubrec.pac + commands/pubrel.pac + commands/pubcomp.pac + commands/subscribe.pac + commands/suback.pac + commands/unsuback.pac + commands/unsubscribe.pac + commands/disconnect.pac + commands/pingreq.pac + commands/pingresp.pac + ) + +bro_plugin_end() diff --git a/src/analyzer/protocol/mqtt/MQTT.cc b/src/analyzer/protocol/mqtt/MQTT.cc new file mode 100644 index 0000000000..6aa924100a --- /dev/null +++ b/src/analyzer/protocol/mqtt/MQTT.cc @@ -0,0 +1,60 @@ +// See the file in the main distribution directory for copyright. + +#include "plugin/Plugin.h" + +#include "MQTT.h" +#include "analyzer/protocol/tcp/TCP_Reassembler.h" +#include "Reporter.h" +#include "events.bif.h" + +using namespace analyzer::MQTT; + +MQTT_Analyzer::MQTT_Analyzer(Connection* c) + : tcp::TCP_ApplicationAnalyzer("MQTT", c) + { + interp = new binpac::MQTT::MQTT_Conn(this); + + had_gap = false; + } + +MQTT_Analyzer::~MQTT_Analyzer() + { + delete interp; + } + +void MQTT_Analyzer::Done() + { + tcp::TCP_ApplicationAnalyzer::Done(); + + interp->FlowEOF(true); + interp->FlowEOF(false); + } + +void MQTT_Analyzer::EndpointEOF(bool is_orig) + { + tcp::TCP_ApplicationAnalyzer::EndpointEOF(is_orig); + interp->FlowEOF(is_orig); + } + +void MQTT_Analyzer::DeliverStream(int len, const u_char* data, bool orig) + { + tcp::TCP_ApplicationAnalyzer::DeliverStream(len, data, orig); + + assert(TCP()); + + try + { + interp->NewData(orig, data, data + len); + } + catch ( const binpac::Exception& e ) + { + ProtocolViolation(fmt("Binpac exception: %s", e.c_msg())); + } + } + +void MQTT_Analyzer::Undelivered(uint64 seq, int len, bool orig) + { + tcp::TCP_ApplicationAnalyzer::Undelivered(seq, len, orig); + had_gap = true; + interp->NewGap(orig, len); + } diff --git a/src/analyzer/protocol/mqtt/MQTT.h b/src/analyzer/protocol/mqtt/MQTT.h new file mode 100644 index 0000000000..3e30ffb06c --- /dev/null +++ b/src/analyzer/protocol/mqtt/MQTT.h @@ -0,0 +1,38 @@ +// Generated by binpac_quickstart + +#ifndef ANALYZER_PROTOCOL_MQTT_MQTT_H +#define ANALYZER_PROTOCOL_MQTT_MQTT_H + +#include "events.bif.h" + +#include "analyzer/protocol/tcp/TCP.h" + +#include "mqtt_pac.h" + +namespace analyzer { namespace MQTT { + +class MQTT_Analyzer: public tcp::TCP_ApplicationAnalyzer { + +public: + MQTT_Analyzer(Connection* conn); + ~MQTT_Analyzer() override; + + void Done() override; + void DeliverStream(int len, const u_char* data, bool orig) override; + void Undelivered(uint64 seq, int len, bool orig) override; + void EndpointEOF(bool is_orig) override; + + + static analyzer::Analyzer* InstantiateAnalyzer(Connection* conn) + { return new MQTT_Analyzer(conn); } + +protected: + binpac::MQTT::MQTT_Conn* interp; + + bool had_gap; + +}; + +} } // namespace analyzer::* + +#endif diff --git a/src/analyzer/protocol/mqtt/Plugin.cc b/src/analyzer/protocol/mqtt/Plugin.cc new file mode 100644 index 0000000000..f428344222 --- /dev/null +++ b/src/analyzer/protocol/mqtt/Plugin.cc @@ -0,0 +1,25 @@ +// See the file in the main distribution directory for copyright. + +#include "plugin/Plugin.h" + +#include "MQTT.h" + +namespace plugin { +namespace Bro_MQTT { + +class Plugin : public plugin::Plugin { +public: + plugin::Configuration Configure() + { + AddComponent(new ::analyzer::Component("MQTT", + ::analyzer::MQTT::MQTT_Analyzer::InstantiateAnalyzer)); + + plugin::Configuration config; + config.name = "Bro::MQTT"; + config.description = "Message Queuing Telemetry Transport v3.1.1 Protocol analyzer"; + return config; + } +} plugin; + +} +} diff --git a/src/analyzer/protocol/mqtt/commands/connack.pac b/src/analyzer/protocol/mqtt/commands/connack.pac new file mode 100644 index 0000000000..49b8eb131e --- /dev/null +++ b/src/analyzer/protocol/mqtt/commands/connack.pac @@ -0,0 +1,28 @@ +refine casetype Command += { + MQTT_CONNACK -> connack : MQTT_connack; +}; + +type MQTT_connack = record { + flags : uint8; + return_code : uint8; +} &let { + session_present : bool = (flags & 0x01) != 0; + proc: bool = $context.flow.proc_mqtt_connack(this); +}; + +refine flow MQTT_Flow += { + function proc_mqtt_connack(msg: MQTT_connack): bool + %{ + if ( mqtt_connack ) + { + auto m = new RecordVal(BifType::Record::MQTT::ConnectAckMsg); + m->Assign(0, val_mgr->GetBool(${msg.return_code})); + m->Assign(1, val_mgr->GetBool(${msg.session_present})); + BifEvent::generate_mqtt_connack(connection()->bro_analyzer(), + connection()->bro_analyzer()->Conn(), + m); + } + + return true; + %} +}; diff --git a/src/analyzer/protocol/mqtt/commands/connect.pac b/src/analyzer/protocol/mqtt/commands/connect.pac new file mode 100644 index 0000000000..92ff4abd90 --- /dev/null +++ b/src/analyzer/protocol/mqtt/commands/connect.pac @@ -0,0 +1,87 @@ +refine casetype Command += { + MQTT_CONNECT -> connect : MQTT_connect; +}; + +type MQTT_will = record { + topic : MQTT_string; + msg : MQTT_string; +}; + +type MQTT_connect = record { + protocol_name : MQTT_string; + protocol_version : int8; + connect_flags : uint8; + keep_alive : uint16; + client_id : MQTT_string; + + # payload starts + will_fields: case will_flag of { + true -> will : MQTT_will; + false -> nofield1 : empty; + }; + username_fields: case username of { + true -> uname : MQTT_string; + false -> nofield2 : empty; + }; + password_fields: case password of { + true -> pass : MQTT_string; + false -> nofield3 : empty; + }; +} &let { + username : bool = (connect_flags & 0x80) != 0; + password : bool = (connect_flags & 0x40) != 0; + will_retain : bool = (connect_flags & 0x20) != 0; + will_qos : uint8 = (connect_flags & 0x18) >> 3; + will_flag : bool = (connect_flags & 0x04) != 0; + clean_session : bool = (connect_flags & 0x02) != 0; + + proc: bool = $context.flow.proc_mqtt_connect(this); +}; + + +refine flow MQTT_Flow += { + function proc_mqtt_connect(msg: MQTT_connect): bool + %{ + if ( mqtt_connect ) + { + auto m = new RecordVal(BifType::Record::MQTT::ConnectMsg); + m->Assign(0, new StringVal(${msg.protocol_name.str}.length(), + reinterpret_cast(${msg.protocol_name.str}.begin()))); + m->Assign(1, val_mgr->GetCount(${msg.protocol_version})); + m->Assign(2, new StringVal(${msg.client_id.str}.length(), + reinterpret_cast(${msg.client_id.str}.begin()))); + m->Assign(3, new IntervalVal(double(${msg.keep_alive}), Seconds)); + + m->Assign(4, val_mgr->GetBool(${msg.clean_session})); + m->Assign(5, val_mgr->GetBool(${msg.will_retain})); + m->Assign(6, val_mgr->GetCount(${msg.will_qos})); + + if ( ${msg.will_flag} ) + { + m->Assign(7, new StringVal(${msg.will.topic.str}.length(), + reinterpret_cast(${msg.will.topic.str}.begin()))); + m->Assign(8, new StringVal(${msg.will.msg.str}.length(), + reinterpret_cast(${msg.will.msg.str}.begin()))); + } + + if ( ${msg.username} ) + { + m->Assign(9, new StringVal(${msg.uname.str}.length(), + reinterpret_cast(${msg.uname.str}.begin()))); + } + if ( ${msg.password} ) + { + m->Assign(10, new StringVal(${msg.pass.str}.length(), + reinterpret_cast(${msg.pass.str}.begin()))); + } + + BifEvent::generate_mqtt_connect(connection()->bro_analyzer(), + connection()->bro_analyzer()->Conn(), + m); + } + + // If a connect message was seen, let's say that confirms it. + connection()->bro_analyzer()->ProtocolConfirmation(); + return true; + %} +}; diff --git a/src/analyzer/protocol/mqtt/commands/disconnect.pac b/src/analyzer/protocol/mqtt/commands/disconnect.pac new file mode 100644 index 0000000000..16d4f9082e --- /dev/null +++ b/src/analyzer/protocol/mqtt/commands/disconnect.pac @@ -0,0 +1,20 @@ +refine casetype Command += { + MQTT_DISCONNECT -> disconnect : MQTT_disconnect; +}; + +type MQTT_disconnect = empty &let { + proc: bool = $context.flow.proc_mqtt_disconnect(this); +}; + +refine flow MQTT_Flow += { + function proc_mqtt_disconnect(msg: MQTT_disconnect): bool + %{ + if ( mqtt_disconnect ) + { + BifEvent::generate_mqtt_disconnect(connection()->bro_analyzer(), + connection()->bro_analyzer()->Conn()); + } + + return true; + %} +}; diff --git a/src/analyzer/protocol/mqtt/commands/pingreq.pac b/src/analyzer/protocol/mqtt/commands/pingreq.pac new file mode 100644 index 0000000000..6c40d89128 --- /dev/null +++ b/src/analyzer/protocol/mqtt/commands/pingreq.pac @@ -0,0 +1,20 @@ +refine casetype Command += { + MQTT_PINGREQ -> pingreq : MQTT_pingreq; +}; + +type MQTT_pingreq = empty &let { + proc: bool = $context.flow.proc_mqtt_pingreq(this); +}; + +refine flow MQTT_Flow += { + function proc_mqtt_pingreq(msg: MQTT_pingreq): bool + %{ + if ( mqtt_pingreq ) + { + BifEvent::generate_mqtt_pingreq(connection()->bro_analyzer(), + connection()->bro_analyzer()->Conn()); + } + + return true; + %} +}; diff --git a/src/analyzer/protocol/mqtt/commands/pingresp.pac b/src/analyzer/protocol/mqtt/commands/pingresp.pac new file mode 100644 index 0000000000..af8daed56c --- /dev/null +++ b/src/analyzer/protocol/mqtt/commands/pingresp.pac @@ -0,0 +1,20 @@ +refine casetype Command += { + MQTT_PINGRESP -> pingresp : MQTT_pingresp; +}; + +type MQTT_pingresp = empty &let { + proc: bool = $context.flow.proc_mqtt_pingresp(this); +}; + +refine flow MQTT_Flow += { + function proc_mqtt_pingresp(msg: MQTT_pingresp): bool + %{ + if ( mqtt_pingresp ) + { + BifEvent::generate_mqtt_pingresp(connection()->bro_analyzer(), + connection()->bro_analyzer()->Conn()); + } + + return true; + %} +}; diff --git a/src/analyzer/protocol/mqtt/commands/puback.pac b/src/analyzer/protocol/mqtt/commands/puback.pac new file mode 100644 index 0000000000..aecf16e9c3 --- /dev/null +++ b/src/analyzer/protocol/mqtt/commands/puback.pac @@ -0,0 +1,23 @@ +refine casetype Command += { + MQTT_PUBACK -> puback : MQTT_puback(pdu); +}; + +type MQTT_puback(pdu: MQTT_PDU) = record { + msg_id : uint16; +} &let { + proc: bool = $context.flow.proc_mqtt_puback(this, pdu.is_orig); +}; + +refine flow MQTT_Flow += { + function proc_mqtt_puback(msg: MQTT_puback, is_orig: bool): bool + %{ + if ( mqtt_puback ) + { + BifEvent::generate_mqtt_puback(connection()->bro_analyzer(), + connection()->bro_analyzer()->Conn(), + is_orig, + ${msg.msg_id}); + } + return true; + %} +}; diff --git a/src/analyzer/protocol/mqtt/commands/pubcomp.pac b/src/analyzer/protocol/mqtt/commands/pubcomp.pac new file mode 100644 index 0000000000..ec989b06c9 --- /dev/null +++ b/src/analyzer/protocol/mqtt/commands/pubcomp.pac @@ -0,0 +1,23 @@ +refine casetype Command += { + MQTT_PUBCOMP -> pubcomp : MQTT_pubcomp(pdu); +}; + +type MQTT_pubcomp(pdu: MQTT_PDU) = record { + msg_id : uint16; +} &let { + proc: bool = $context.flow.proc_mqtt_pubcomp(this, pdu.is_orig); +}; + +refine flow MQTT_Flow += { + function proc_mqtt_pubcomp(msg: MQTT_pubcomp, is_orig: bool): bool + %{ + if ( mqtt_pubcomp ) + { + BifEvent::generate_mqtt_pubcomp(connection()->bro_analyzer(), + connection()->bro_analyzer()->Conn(), + is_orig, + ${msg.msg_id}); + } + return true; + %} +}; diff --git a/src/analyzer/protocol/mqtt/commands/publish.pac b/src/analyzer/protocol/mqtt/commands/publish.pac new file mode 100644 index 0000000000..b059b99ab1 --- /dev/null +++ b/src/analyzer/protocol/mqtt/commands/publish.pac @@ -0,0 +1,47 @@ +refine casetype Command += { + MQTT_PUBLISH -> publish : MQTT_publish(pdu); +}; + +type MQTT_publish(pdu: MQTT_PDU) = record { + topic : MQTT_string; + # If qos is zero, there won't be a msg_id field. + has_msg_id: case qos of { + 0 -> none : empty; + default -> msg_id : uint16; + }; + payload : bytestring &restofdata; +} &let { + dup : bool = (pdu.fixed_header & 0x08) != 0; + qos : uint8 = (pdu.fixed_header & 0x06) >> 1; + retain : bool = (pdu.fixed_header & 0x01) != 0; + + proc: bool = $context.flow.proc_mqtt_publish(this, pdu); +}; + +refine flow MQTT_Flow += { + function proc_mqtt_publish(msg: MQTT_publish, pdu: MQTT_PDU): bool + %{ + if ( mqtt_publish ) + { + auto m = new RecordVal(BifType::Record::MQTT::PublishMsg); + m->Assign(0, val_mgr->GetBool(${msg.dup})); + m->Assign(1, val_mgr->GetCount(${msg.qos})); + m->Assign(2, val_mgr->GetBool(${msg.retain})); + m->Assign(3, new StringVal(${msg.topic.str}.length(), + reinterpret_cast(${msg.topic.str}.begin()))); + m->Assign(4, new StringVal(${msg.payload}.length(), + reinterpret_cast(${msg.payload}.begin()))); + + BifEvent::generate_mqtt_publish(connection()->bro_analyzer(), + connection()->bro_analyzer()->Conn(), + ${pdu.is_orig}, + ${msg.qos} == 0 ? 0 : ${msg.msg_id}, + m); + } + + // If a publish message was seen, let's say that confirms it. + connection()->bro_analyzer()->ProtocolConfirmation(); + + return true; + %} +}; diff --git a/src/analyzer/protocol/mqtt/commands/pubrec.pac b/src/analyzer/protocol/mqtt/commands/pubrec.pac new file mode 100644 index 0000000000..386244f512 --- /dev/null +++ b/src/analyzer/protocol/mqtt/commands/pubrec.pac @@ -0,0 +1,23 @@ +refine casetype Command += { + MQTT_PUBREC -> pubrec : MQTT_pubrec(pdu); +}; + +type MQTT_pubrec(pdu: MQTT_PDU) = record { + msg_id : uint16; +} &let { + proc: bool = $context.flow.proc_mqtt_pubrec(this, pdu.is_orig); +}; + +refine flow MQTT_Flow += { + function proc_mqtt_pubrec(msg: MQTT_pubrec, is_orig: bool): bool + %{ + if ( mqtt_pubrec ) + { + BifEvent::generate_mqtt_pubrec(connection()->bro_analyzer(), + connection()->bro_analyzer()->Conn(), + is_orig, + ${msg.msg_id}); + } + return true; + %} +}; diff --git a/src/analyzer/protocol/mqtt/commands/pubrel.pac b/src/analyzer/protocol/mqtt/commands/pubrel.pac new file mode 100644 index 0000000000..16b9f4fe45 --- /dev/null +++ b/src/analyzer/protocol/mqtt/commands/pubrel.pac @@ -0,0 +1,23 @@ +refine casetype Command += { + MQTT_PUBREL -> pubrel : MQTT_pubrel(pdu); +}; + +type MQTT_pubrel(pdu: MQTT_PDU) = record { + msg_id : uint16; +} &let { + proc: bool = $context.flow.proc_mqtt_pubrel(this, pdu.is_orig); +}; + +refine flow MQTT_Flow += { + function proc_mqtt_pubrel(msg: MQTT_pubrel, is_orig: bool): bool + %{ + if ( mqtt_pubrel ) + { + BifEvent::generate_mqtt_pubrel(connection()->bro_analyzer(), + connection()->bro_analyzer()->Conn(), + is_orig, + ${msg.msg_id}); + } + return true; + %} +}; diff --git a/src/analyzer/protocol/mqtt/commands/suback.pac b/src/analyzer/protocol/mqtt/commands/suback.pac new file mode 100644 index 0000000000..cf6049cf8b --- /dev/null +++ b/src/analyzer/protocol/mqtt/commands/suback.pac @@ -0,0 +1,25 @@ +refine casetype Command += { + MQTT_SUBACK -> suback : MQTT_suback; +}; + +type MQTT_suback = record { + msg_id : uint16; + granted_QoS : uint8; +} &let { + proc: bool = $context.flow.proc_mqtt_suback(this); +}; + +refine flow MQTT_Flow += { + function proc_mqtt_suback(msg: MQTT_suback): bool + %{ + if ( mqtt_suback ) + { + BifEvent::generate_mqtt_suback(connection()->bro_analyzer(), + connection()->bro_analyzer()->Conn(), + ${msg.msg_id}, + ${msg.granted_QoS}); + } + + return true; + %} +}; diff --git a/src/analyzer/protocol/mqtt/commands/subscribe.pac b/src/analyzer/protocol/mqtt/commands/subscribe.pac new file mode 100644 index 0000000000..bbe2ea2172 --- /dev/null +++ b/src/analyzer/protocol/mqtt/commands/subscribe.pac @@ -0,0 +1,37 @@ +refine casetype Command += { + MQTT_SUBSCRIBE -> subscribe : MQTT_subscribe; +}; + +type MQTT_subscribe_topic = record { + name : MQTT_string; + requested_QoS : uint8; +}; + +type MQTT_subscribe = record { + msg_id : uint16; + topics : MQTT_subscribe_topic[]; +} &let { + proc: bool = $context.flow.proc_mqtt_subscribe(this); +}; + +refine flow MQTT_Flow += { + function proc_mqtt_subscribe(msg: MQTT_subscribe): bool + %{ + if ( mqtt_subscribe ) + { + for (auto topic: *${msg.topics}) + { + auto subscribe_topic = new StringVal(${topic.name.str}.length(), + reinterpret_cast(${topic.name.str}.begin())); + + BifEvent::generate_mqtt_subscribe(connection()->bro_analyzer(), + connection()->bro_analyzer()->Conn(), + ${msg.msg_id}, + subscribe_topic, + ${topic.requested_QoS}); + } + } + + return true; + %} +}; diff --git a/src/analyzer/protocol/mqtt/commands/unsuback.pac b/src/analyzer/protocol/mqtt/commands/unsuback.pac new file mode 100644 index 0000000000..65e85ab981 --- /dev/null +++ b/src/analyzer/protocol/mqtt/commands/unsuback.pac @@ -0,0 +1,23 @@ +refine casetype Command += { + MQTT_UNSUBACK -> unsuback : MQTT_unsuback; +}; + +type MQTT_unsuback = record { + msg_id : uint16; +} &let { + proc: bool = $context.flow.proc_mqtt_unsuback(this); +}; + +refine flow MQTT_Flow += { + function proc_mqtt_unsuback(msg: MQTT_unsuback): bool + %{ + if ( mqtt_unsuback ) + { + BifEvent::generate_mqtt_unsuback(connection()->bro_analyzer(), + connection()->bro_analyzer()->Conn(), + ${msg.msg_id}); + } + + return true; + %} +}; diff --git a/src/analyzer/protocol/mqtt/commands/unsubscribe.pac b/src/analyzer/protocol/mqtt/commands/unsubscribe.pac new file mode 100644 index 0000000000..4c951881a0 --- /dev/null +++ b/src/analyzer/protocol/mqtt/commands/unsubscribe.pac @@ -0,0 +1,33 @@ +refine casetype Command += { + MQTT_UNSUBSCRIBE -> unsubscribe : MQTT_unsubscribe; +}; + +type MQTT_unsubscribe = record { + msg_id : uint16; + topics : MQTT_string[]; +} &let { + proc: bool = $context.flow.proc_mqtt_unsubscribe(this); +}; + +refine flow MQTT_Flow += { + function proc_mqtt_unsubscribe(msg: MQTT_unsubscribe): bool + %{ + if ( mqtt_unsubscribe ) + { + StringVal* unsubscribe_topic = 0; + + for (auto topic: *${msg.topics}) + { + unsubscribe_topic = new StringVal(${topic.str}.length(), + reinterpret_cast(${topic.str}.begin())); + } + + BifEvent::generate_mqtt_unsubscribe(connection()->bro_analyzer(), + connection()->bro_analyzer()->Conn(), + ${msg.msg_id}, + unsubscribe_topic); + } + + return true; + %} +}; diff --git a/src/analyzer/protocol/mqtt/events.bif b/src/analyzer/protocol/mqtt/events.bif new file mode 100644 index 0000000000..b079e1afc5 --- /dev/null +++ b/src/analyzer/protocol/mqtt/events.bif @@ -0,0 +1,107 @@ +## Generated for MQTT "client requests a connection" messages +## +## c: The connection +## +## msg: MQTT connect message fields. +event mqtt_connect%(c: connection, msg: MQTT::ConnectMsg%); + +## Generated for MQTT acknowledge connection messages +## +## c: The connection +## +## msg: MQTT connect ack message fields. +event mqtt_connack%(c: connection, msg: MQTT::ConnectAckMsg%); + +## Generated for MQTT publish messages +## +## c: The connection +## +## is_orig: Direction in which the message was sent +## +## msg: The MQTT publish message record. +event mqtt_publish%(c: connection, is_orig: bool, msg_id: count, msg: MQTT::PublishMsg%); + +## Generated for MQTT publish acknowledgement messages +## +## c: The connection +## +## is_orig: Direction in which the message was sent +## +## msg_id: The id value for the message. +event mqtt_puback%(c: connection, is_orig: bool, msg_id: count%); + +## Generated for MQTT publish received messages (QoS 2 publish received, part 1) +## +## c: The connection +## +## is_orig: Direction in which the message was sent +## +## msg_id: The id value for the message. +event mqtt_pubrec%(c: connection, is_orig: bool, msg_id: count%); + +## Generated for MQTT publish release messages (QoS 2 publish received, part 2) +## +## c: The connection +## +## is_orig: Direction in which the message was sent +## +## msg_id: The id value for the message. +event mqtt_pubrel%(c: connection, is_orig: bool, msg_id: count%); + +## Generated for MQTT publish complete messages (QoS 2 publish received, part 3) +## +## c: The connection +## +## is_orig: Direction in which the message was sent +## +## msg_id: The id value for the message. +event mqtt_pubcomp%(c: connection, is_orig: bool, msg_id: count%); + +## Generated for MQTT subscribe messages +## +## c: The connection +## +## is_orig: Direction in which the message was sent +## +## msg_id: The id value for the message. +event mqtt_subscribe%(c: connection, msg_id: count, topic: string, requested_qos: count%); + +## Generated for MQTT subscribe messages +## +## c: The connection +## +## is_orig: Direction in which the message was sent +## +## msg_id: The id value for the message. +event mqtt_suback%(c: connection, msg_id: count, granted_qos: count%); + +## Generated for MQTT unsubscribe messages sent by the client +## +## c: The connection +## +## msg_id: The id value for the message. +## +## topic: The topic being unsubscribed from +event mqtt_unsubscribe%(c: connection, msg_id: count, topic: string%); + +## Generated for MQTT unsubscribe acknowledgements sent by the server +## +## c: The connection +## +## msg_id: The id value for the message. +event mqtt_unsuback%(c: connection, msg_id: count%); + +## Generated for MQTT ping requests sent by the client. +## +## c: The connection +event mqtt_pingreq%(c: connection%); + +## Generated for MQTT ping responses sent by the server. +## +## c: The connection +event mqtt_pingresp%(c: connection%); + +## Generated for MQTT disconnect messages sent by the client when it is diconnecting cleanly. +## +## c: The connection +event mqtt_disconnect%(c: connection%); diff --git a/src/analyzer/protocol/mqtt/mqtt-protocol.pac b/src/analyzer/protocol/mqtt/mqtt-protocol.pac new file mode 100644 index 0000000000..0fc426cbd2 --- /dev/null +++ b/src/analyzer/protocol/mqtt/mqtt-protocol.pac @@ -0,0 +1,61 @@ +##! MQTT control packet parser, contributed by Supriya Sudharani Kumaraswamy + +enum MQTT_msg_type { + MQTT_RESERVED = 0, + MQTT_CONNECT = 1, + MQTT_CONNACK = 2, + MQTT_PUBLISH = 3, + MQTT_PUBACK = 4, + MQTT_PUBREC = 5, + MQTT_PUBREL = 6, + MQTT_PUBCOMP = 7, + MQTT_SUBSCRIBE = 8, + MQTT_SUBACK = 9, + MQTT_UNSUBSCRIBE = 10, + MQTT_UNSUBACK = 11, + MQTT_PINGREQ = 12, + MQTT_PINGRESP = 13, + MQTT_DISCONNECT = 14, +}; + +type MQTT_string = record { + len : uint16; + str : bytestring &length=len; +}; + +# These values are all defined in the commands/*.pac files... +type Command(pdu: MQTT_PDU, msg_type: uint8) = case msg_type of { + default -> unknown : empty; +}; + +type MQTT_PDU(is_orig: bool) = record { + fixed_header : uint8; + remaining_length : uint8[] &until(($element & 0x80) != 1); + command : Command(this, msg_type) &length=real_length; +} &let { + msg_type : uint8 = (fixed_header >> 4); + real_length = $context.connection.calc_header_length(remaining_length); +} &byteorder=bigendian; + +refine connection MQTT_Conn += { + # This implementation is ripped straight from the spec. + function calc_header_length(vals: uint8[]): uint32 + %{ + int multiplier = 1; + uint32_t value = 0; + for ( auto encoded_byte: *vals ) + { + value += (encoded_byte & 127) * multiplier; + multiplier *= 128; + if ( multiplier > 128*128*128 ) + { + // This is definitely a protocol violation + this->bro_analyzer()->ProtocolViolation("malformed 'remaining length'"); + return 0; + } + } + + return value; + %} +}; + diff --git a/src/analyzer/protocol/mqtt/mqtt.pac b/src/analyzer/protocol/mqtt/mqtt.pac new file mode 100644 index 0000000000..113198991b --- /dev/null +++ b/src/analyzer/protocol/mqtt/mqtt.pac @@ -0,0 +1,43 @@ +# Analyzer for MQTT Protocol + +%include binpac.pac +%include bro.pac + +%extern{ + #include "events.bif.h" + #include "types.bif.h" +%} + +analyzer MQTT withcontext { + connection: MQTT_Conn; + flow: MQTT_Flow; +}; + +# Our connection consists of two flows, one in each direction. +connection MQTT_Conn(bro_analyzer: BroAnalyzer) { + upflow = MQTT_Flow(true); + downflow = MQTT_Flow(false); +}; + +%include mqtt-protocol.pac + +flow MQTT_Flow(is_orig: bool) { + #flowunit = MQTT_PDU(is_orig) withcontext(connection, this); + datagram = MQTT_PDU(is_orig) withcontext(connection, this); +}; + +%include commands/connect.pac +%include commands/connack.pac +%include commands/publish.pac +%include commands/puback.pac +%include commands/pubrec.pac +%include commands/pubrel.pac +%include commands/pubcomp.pac +%include commands/subscribe.pac +%include commands/suback.pac +%include commands/unsuback.pac +%include commands/unsubscribe.pac +%include commands/disconnect.pac +%include commands/pingreq.pac +%include commands/pingresp.pac + diff --git a/src/analyzer/protocol/mqtt/types.bif b/src/analyzer/protocol/mqtt/types.bif new file mode 100644 index 0000000000..be464c2ff2 --- /dev/null +++ b/src/analyzer/protocol/mqtt/types.bif @@ -0,0 +1,4 @@ + +type MQTT::ConnectMsg: record; +type MQTT::ConnectAckMsg: record; +type MQTT::PublishMsg: record; diff --git a/testing/btest/Baseline/scripts.base.protocols.mqtt.mqtt/mqtt_connect.log b/testing/btest/Baseline/scripts.base.protocols.mqtt.mqtt/mqtt_connect.log new file mode 100644 index 0000000000..aaf42ce11b --- /dev/null +++ b/testing/btest/Baseline/scripts.base.protocols.mqtt.mqtt/mqtt_connect.log @@ -0,0 +1,11 @@ +#separator \x09 +#set_separator , +#empty_field (empty) +#unset_field - +#path mqtt_connect +#open 2019-07-29-16-44-12 +#fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p proto_name proto_version client_id connect_status will_topic will_payload +#types time string addr port addr port string string string string string string +1461170590.509491 CHhAvVGS1DHFjwGM9 10.0.1.4 49327 198.41.30.241 1883 MQIsdp 3.1 paho/34AAE54A75D839566E Connection Accepted - - +1461170596.653525 ClEkJM2Vm5giqnMf4h 10.0.1.4 49330 198.41.30.241 1883 MQIsdp 3.1 paho/DDE4DDAF4108D3E363 Connection Accepted - - +#close 2019-07-29-16-44-12 diff --git a/testing/btest/Baseline/scripts.base.protocols.mqtt.mqtt/mqtt_publish.log b/testing/btest/Baseline/scripts.base.protocols.mqtt.mqtt/mqtt_publish.log new file mode 100644 index 0000000000..a13b804639 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.protocols.mqtt.mqtt/mqtt_publish.log @@ -0,0 +1,12 @@ +#separator \x09 +#set_separator , +#empty_field (empty) +#unset_field - +#path mqtt_publish +#open 2019-07-29-16-44-12 +#fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p from_client retain qos status topic payload +#types time string addr port addr port bool bool string string string string +1461170591.219981 CHhAvVGS1DHFjwGM9 10.0.1.4 49327 198.41.30.241 1883 F T at most once ok SampleTopic Hello from the Paho blocking client +1461170596.653674 ClEkJM2Vm5giqnMf4h 10.0.1.4 49330 198.41.30.241 1883 T F at most once ok SampleTopic Hello MQTT +1461170596.891281 CHhAvVGS1DHFjwGM9 10.0.1.4 49327 198.41.30.241 1883 F F at most once ok SampleTopic Hello MQTT +#close 2019-07-29-16-44-12 diff --git a/testing/btest/Baseline/scripts.base.protocols.mqtt.mqtt/mqtt_subscribe.log b/testing/btest/Baseline/scripts.base.protocols.mqtt.mqtt/mqtt_subscribe.log new file mode 100644 index 0000000000..8fa637a072 --- /dev/null +++ b/testing/btest/Baseline/scripts.base.protocols.mqtt.mqtt/mqtt_subscribe.log @@ -0,0 +1,10 @@ +#separator \x09 +#set_separator , +#empty_field (empty) +#unset_field - +#path mqtt_subscribe +#open 2019-07-29-16-44-12 +#fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p action topic qos_level granted_qos_level ack +#types time string addr port addr port enum string count count bool +1461170590.745647 CHhAvVGS1DHFjwGM9 10.0.1.4 49327 198.41.30.241 1883 MQTT::SUBSCRIBE SampleTopic 0 0 T +#close 2019-07-29-16-44-12 diff --git a/testing/btest/Traces/mqtt.pcap b/testing/btest/Traces/mqtt.pcap new file mode 100644 index 0000000000..72c331129c Binary files /dev/null and b/testing/btest/Traces/mqtt.pcap differ diff --git a/testing/btest/scripts/base/protocols/mqtt/mqtt.test b/testing/btest/scripts/base/protocols/mqtt/mqtt.test new file mode 100644 index 0000000000..660fc370f4 --- /dev/null +++ b/testing/btest/scripts/base/protocols/mqtt/mqtt.test @@ -0,0 +1,6 @@ +# @TEST-EXEC: bro -b -r $TRACES/mqtt.pcap %INPUT >output +# @TEST-EXEC: btest-diff mqtt_connect.log +# @TEST-EXEC: btest-diff mqtt_subscribe.log +# @TEST-EXEC: btest-diff mqtt_publish.log + +@load base/protocols/mqtt