mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
GH-474: use topic vectors for MQTT (un)subscribe events/logs
This commit is contained in:
parent
649d9f502b
commit
c43e809a69
5 changed files with 40 additions and 27 deletions
|
@ -50,10 +50,10 @@ export {
|
||||||
|
|
||||||
## Indicates if a subscribe or unsubscribe action is taking place
|
## Indicates if a subscribe or unsubscribe action is taking place
|
||||||
action: SubUnsub &log;
|
action: SubUnsub &log;
|
||||||
## The topic (or topic pattern) being subscribed to
|
## The topics (or topic patterns) being subscribed to
|
||||||
topic: string &log;
|
topics: string_vec &log;
|
||||||
## QoS level requested for messages from subscribed topics
|
## QoS levels requested for messages from subscribed topics
|
||||||
qos_level: count &log &optional;
|
qos_levels: index_vec &log &optional;
|
||||||
## QoS level the server granted
|
## QoS level the server granted
|
||||||
granted_qos_level: count &log &optional;
|
granted_qos_level: count &log &optional;
|
||||||
## Indicates if the request was acked by the server
|
## Indicates if the request was acked by the server
|
||||||
|
@ -284,7 +284,7 @@ event mqtt_pubcomp(c: connection, is_orig: bool, msg_id: count) &priority=-5
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
event mqtt_subscribe(c: connection, msg_id: count, topic: string, requested_qos: count) &priority=5
|
event mqtt_subscribe(c: connection, msg_id: count, topics: string_vec, requested_qos: index_vec) &priority=5
|
||||||
{
|
{
|
||||||
local info = set_session(c);
|
local info = set_session(c);
|
||||||
|
|
||||||
|
@ -292,8 +292,8 @@ event mqtt_subscribe(c: connection, msg_id: count, topic: string, requested_qos:
|
||||||
$uid = c$uid,
|
$uid = c$uid,
|
||||||
$id = c$id,
|
$id = c$id,
|
||||||
$action = MQTT::SUBSCRIBE,
|
$action = MQTT::SUBSCRIBE,
|
||||||
$topic = topic,
|
$topics = topics,
|
||||||
$qos_level = requested_qos);
|
$qos_levels = requested_qos);
|
||||||
|
|
||||||
c$mqtt_state$subscribe[msg_id] = si;
|
c$mqtt_state$subscribe[msg_id] = si;
|
||||||
}
|
}
|
||||||
|
@ -313,7 +313,7 @@ event mqtt_suback(c: connection, msg_id: count, granted_qos: count) &priority=5
|
||||||
delete c$mqtt_state$subscribe[msg_id];
|
delete c$mqtt_state$subscribe[msg_id];
|
||||||
}
|
}
|
||||||
|
|
||||||
event mqtt_unsubscribe(c: connection, msg_id: count, topic: string) &priority=5
|
event mqtt_unsubscribe(c: connection, msg_id: count, topics: string_vec) &priority=5
|
||||||
{
|
{
|
||||||
set_session(c);
|
set_session(c);
|
||||||
|
|
||||||
|
@ -321,7 +321,7 @@ event mqtt_unsubscribe(c: connection, msg_id: count, topic: string) &priority=5
|
||||||
$uid = c$uid,
|
$uid = c$uid,
|
||||||
$id = c$id,
|
$id = c$id,
|
||||||
$action = MQTT::UNSUBSCRIBE,
|
$action = MQTT::UNSUBSCRIBE,
|
||||||
$topic = topic);
|
$topics = topics);
|
||||||
|
|
||||||
c$mqtt_state$subscribe[msg_id] = si;
|
c$mqtt_state$subscribe[msg_id] = si;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,17 +19,23 @@ refine flow MQTT_Flow += {
|
||||||
%{
|
%{
|
||||||
if ( mqtt_subscribe )
|
if ( mqtt_subscribe )
|
||||||
{
|
{
|
||||||
|
auto topics = new VectorVal(string_vec);
|
||||||
|
auto qos_levels = new VectorVal(index_vec);
|
||||||
|
|
||||||
for (auto topic: *${msg.topics})
|
for (auto topic: *${msg.topics})
|
||||||
{
|
{
|
||||||
auto subscribe_topic = new StringVal(${topic.name.str}.length(),
|
auto subscribe_topic = new StringVal(${topic.name.str}.length(),
|
||||||
reinterpret_cast<const char*>(${topic.name.str}.begin()));
|
reinterpret_cast<const char*>(${topic.name.str}.begin()));
|
||||||
|
auto qos = val_mgr->GetCount(${topic.requested_QoS});
|
||||||
BifEvent::generate_mqtt_subscribe(connection()->bro_analyzer(),
|
topics->Assign(topics->Size(), subscribe_topic);
|
||||||
connection()->bro_analyzer()->Conn(),
|
qos_levels->Assign(qos_levels->Size(), qos);
|
||||||
${msg.msg_id},
|
|
||||||
subscribe_topic,
|
|
||||||
${topic.requested_QoS});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
BifEvent::generate_mqtt_subscribe(connection()->bro_analyzer(),
|
||||||
|
connection()->bro_analyzer()->Conn(),
|
||||||
|
${msg.msg_id},
|
||||||
|
topics,
|
||||||
|
qos_levels);
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -14,16 +14,19 @@ refine flow MQTT_Flow += {
|
||||||
%{
|
%{
|
||||||
if ( mqtt_unsubscribe )
|
if ( mqtt_unsubscribe )
|
||||||
{
|
{
|
||||||
|
auto topics = new VectorVal(string_vec);
|
||||||
|
|
||||||
for (auto topic: *${msg.topics})
|
for (auto topic: *${msg.topics})
|
||||||
{
|
{
|
||||||
auto unsubscribe_topic = new StringVal(${topic.str}.length(),
|
auto unsubscribe_topic = new StringVal(${topic.str}.length(),
|
||||||
reinterpret_cast<const char*>(${topic.str}.begin()));
|
reinterpret_cast<const char*>(${topic.str}.begin()));
|
||||||
|
topics->Assign(topics->Size(), unsubscribe_topic);
|
||||||
BifEvent::generate_mqtt_unsubscribe(connection()->bro_analyzer(),
|
|
||||||
connection()->bro_analyzer()->Conn(),
|
|
||||||
${msg.msg_id},
|
|
||||||
unsubscribe_topic);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
BifEvent::generate_mqtt_unsubscribe(connection()->bro_analyzer(),
|
||||||
|
connection()->bro_analyzer()->Conn(),
|
||||||
|
${msg.msg_id},
|
||||||
|
topics);
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -64,7 +64,11 @@ event mqtt_pubcomp%(c: connection, is_orig: bool, msg_id: count%);
|
||||||
## is_orig: Direction in which the message was sent
|
## is_orig: Direction in which the message was sent
|
||||||
##
|
##
|
||||||
## msg_id: The id value for the message.
|
## msg_id: The id value for the message.
|
||||||
event mqtt_subscribe%(c: connection, msg_id: count, topic: string, requested_qos: count%);
|
##
|
||||||
|
## topics: The topics being subscribed to
|
||||||
|
##
|
||||||
|
## requested_qos: The desired QoS option associated with each topic.
|
||||||
|
event mqtt_subscribe%(c: connection, msg_id: count, topics: string_vec, requested_qos: index_vec%);
|
||||||
|
|
||||||
## Generated for MQTT subscribe messages
|
## Generated for MQTT subscribe messages
|
||||||
##
|
##
|
||||||
|
@ -81,8 +85,8 @@ event mqtt_suback%(c: connection, msg_id: count, granted_qos: count%);
|
||||||
##
|
##
|
||||||
## msg_id: The id value for the message.
|
## msg_id: The id value for the message.
|
||||||
##
|
##
|
||||||
## topic: The topic being unsubscribed from
|
## topics: The topics being unsubscribed from
|
||||||
event mqtt_unsubscribe%(c: connection, msg_id: count, topic: string%);
|
event mqtt_unsubscribe%(c: connection, msg_id: count, topics: string_vec%);
|
||||||
|
|
||||||
## Generated for MQTT unsubscribe acknowledgements sent by the server
|
## Generated for MQTT unsubscribe acknowledgements sent by the server
|
||||||
##
|
##
|
||||||
|
|
|
@ -3,8 +3,8 @@
|
||||||
#empty_field (empty)
|
#empty_field (empty)
|
||||||
#unset_field -
|
#unset_field -
|
||||||
#path mqtt_subscribe
|
#path mqtt_subscribe
|
||||||
#open 2019-07-29-16-44-12
|
#open 2019-08-02-20-39-45
|
||||||
#fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p action topic qos_level granted_qos_level ack
|
#fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p action topics qos_levels granted_qos_level ack
|
||||||
#types time string addr port addr port enum string count count bool
|
#types time string addr port addr port enum vector[string] vector[count] count bool
|
||||||
1461170590.745647 CHhAvVGS1DHFjwGM9 10.0.1.4 49327 198.41.30.241 1883 MQTT::SUBSCRIBE SampleTopic 0 0 T
|
1461170590.745647 CHhAvVGS1DHFjwGM9 10.0.1.4 49327 198.41.30.241 1883 MQTT::SUBSCRIBE SampleTopic 0 0 T
|
||||||
#close 2019-07-29-16-44-12
|
#close 2019-08-02-20-39-45
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue