diff --git a/scripts/base/protocols/redis/main.zeek b/scripts/base/protocols/redis/main.zeek index ab917df8c9..f214390e91 100644 --- a/scripts/base/protocols/redis/main.zeek +++ b/scripts/base/protocols/redis/main.zeek @@ -41,6 +41,11 @@ export { end: count &optional; }; + type RESPVersion: enum { + RESP2, + RESP3 + }; + type State: record { ## Pending commands. pending: table[count] of Info; @@ -52,14 +57,34 @@ export { ## Each range is one or two elements, one meaning it's unbounded, two meaning ## it begins at one and ends at the second. no_reply_ranges: vector of NoReplyRange; + ## The command indexes (from current_command and current_reply) that will + ## not get responses no matter what. + skip_commands: set[count]; ## We store if this analyzer had a violation to avoid logging if so. ## This should not be super necessary, but worth a shot. violation: bool &default=F; + ## If we are in "subscribed" mode + subscribed_mode: bool &default=F; + ## The RESP version + resp_version: RESPVersion &default=RESP2; }; # Redis specifically mentions 10k commands as a good pipelining threshold, so # we'll piggyback on that. option max_pending_commands = 10000; + + # These commands enter subscribed mode + global enter_subscribed_mode = [KnownCommand_PSUBSCRIBE, + KnownCommand_SSUBSCRIBE, KnownCommand_SUBSCRIBE]; + + # These commands exit subscribed mode + global exit_subscribed_mode = [KnownCommand_RESET, KnownCommand_QUIT]; + + # These commands don't expect a response (ever) - their replies are out of band. + global no_response_commands = [KnownCommand_PSUBSCRIBE, + KnownCommand_PUNSUBSCRIBE, KnownCommand_SSUBSCRIBE, + KnownCommand_SUBSCRIBE, KnownCommand_SUNSUBSCRIBE, + KnownCommand_UNSUBSCRIBE]; } redef record connection += { @@ -122,6 +147,15 @@ function is_last_interval_closed(c: connection): bool c$redis_state$no_reply_ranges[-1]?$end; } +event Redis::hello_command(c: connection, hello: HelloCommand) + { + if ( ! c?$redis_state ) + make_new_state(c); + + if ( hello?$requested_resp_version && hello$requested_resp_version == "3" ) + c$redis_state$resp_version = RESP3; + } + event Redis::command(c: connection, cmd: Command) { if ( ! c?$redis_state ) @@ -139,6 +173,26 @@ event Redis::command(c: connection, cmd: Command) } ++c$redis_state$current_command; + + if ( cmd?$known ) + { + if ( c$redis_state$resp_version == RESP2 ) + { + local should_enter = cmd$known in enter_subscribed_mode; + local should_exit = cmd$known in exit_subscribed_mode; + c$redis_state$subscribed_mode = should_enter && ! should_exit; + + # It's weird if it's in both - in the future users may be able to add that + if ( should_enter && should_exit ) + Reporter::conn_weird("Redis_command_enter_exit_subscribed_mode", c, cat( + cmd$known)); + } + if ( cmd$known in no_response_commands || c$redis_state$subscribed_mode ) + { + add c$redis_state$skip_commands[c$redis_state$current_command]; + } + } + # CLIENT commands can skip a number of replies and may be used with # pipelining. We need special logic in order to track the command/reply # pairs. @@ -177,6 +231,7 @@ event Redis::command(c: connection, cmd: Command) } } } + set_state(c, T); c$redis$cmd = cmd; @@ -187,17 +242,24 @@ event Redis::command(c: connection, cmd: Command) function reply_num(c: connection): count { local resp_num = c$redis_state$current_reply + 1; + local result = resp_num; for ( i in c$redis_state$no_reply_ranges ) { local range = c$redis_state$no_reply_ranges[i]; if ( ! range?$end && resp_num > range$begin ) { } # TODO: This is necessary if not using pipelining if ( range?$end && resp_num >= range$begin && resp_num < range$end ) - return range$end; + result = range$end; } - # Default: no disable/enable shenanigans - return resp_num; + # Account for commands that don't expect a response + while ( result in c$redis_state$skip_commands ) + { + delete c$redis_state$skip_commands[result]; + result += 1; + } + + return result; } # Logs up to and including the last seen command from the last reply @@ -234,6 +296,11 @@ event Redis::reply(c: connection, data: ReplyData) if ( ! c?$redis_state ) make_new_state(c); + if ( c$redis_state$subscribed_mode ) + { + event server_push(c, data); + return; + } local previous_reply_num = c$redis_state$current_reply; c$redis_state$current_reply = reply_num(c); set_state(c, F); @@ -241,6 +308,10 @@ event Redis::reply(c: connection, data: ReplyData) c$redis$reply = data; c$redis$success = T; log_from(c, previous_reply_num); + + # Tidy up the skip_commands when it's up to date + if ( c$redis_state$current_command == c$redis_state$current_reply ) + clear_table(c$redis_state$skip_commands); } event Redis::error(c: connection, data: ReplyData) diff --git a/scripts/base/protocols/redis/spicy-events.zeek b/scripts/base/protocols/redis/spicy-events.zeek index ad9a500f18..5eb9f6d6d6 100644 --- a/scripts/base/protocols/redis/spicy-events.zeek +++ b/scripts/base/protocols/redis/spicy-events.zeek @@ -37,6 +37,12 @@ export { password: string; }; + ## The Redis HELLO command (handshake). + type HelloCommand: record { + ## The sent requested RESP version, such as "2" or "3" + requested_resp_version: string &optional; + }; + ## A generic Redis command from the client. type Command: record { ## The raw command, exactly as parsed @@ -79,6 +85,13 @@ global get_command: event(c: connection, key: string); ## command: The AUTH command sent to the server and its data. global auth_command: event(c: connection, command: AuthCommand); +## Generated for Redis HELLO commands sent to the Redis server. +## +## c: The connection. +## +## command: The HELLO command sent to the server and its data. +global hello_command: event(c: connection, command: HelloCommand); + ## Generated for every command sent by the client to the Redis server. ## ## c: The connection. @@ -87,11 +100,15 @@ global auth_command: event(c: connection, command: AuthCommand); global command: event(c: connection, cmd: Command); ## Generated for every successful response sent by the Redis server to the -## client. +## client. For RESP2, this includes "push" messages, which are out of band. +## These will also raise a server_push event. RESP3 push messages will only +## raise a server_push event. ## ## c: The connection. ## ## data: The server data sent to the client. +## +## .. zeek:see:: Redis::server_push global reply: event(c: connection, data: ReplyData); ## Generated for every error response sent by the Redis server to the @@ -101,3 +118,11 @@ global reply: event(c: connection, data: ReplyData); ## ## data: The server data sent to the client. global error: event(c: connection, data: ReplyData); + +## Generated for out-of-band data, outside of the request-response +## model. +## +## c: The connection. +## +## data: The server data sent to the client. +global server_push: event(c: connection, data: ReplyData); diff --git a/src/analyzer/protocol/redis/redis.spicy b/src/analyzer/protocol/redis/redis.spicy index 945ae8c83d..777857f81d 100644 --- a/src/analyzer/protocol/redis/redis.spicy +++ b/src/analyzer/protocol/redis/redis.spicy @@ -34,6 +34,7 @@ public type KnownCommand = enum { GETRANGE, GETSET, HDEL, + HELLO, HGET, HSET, INCR, @@ -43,11 +44,19 @@ public type KnownCommand = enum { MOVE, MSET, PERSIST, + PSUBSCRIBE, + PUNSUBSCRIBE, + QUIT, RENAME, + RESET, SET, STRLEN, + SUBSCRIBE, + SSUBSCRIBE, + SUNSUBSCRIBE, TTL, TYPE, + UNSUBSCRIBE, }; type Command = struct { @@ -228,7 +237,6 @@ function parse_command(raw: vector): Command { function command_from(cmd_bytes: bytes): optional { local cmd: optional = Null; switch (cmd_bytes.lower()) { - case b"set": cmd = KnownCommand::SET; case b"append": cmd = KnownCommand::APPEND; case b"auth": cmd = KnownCommand::AUTH; case b"bitcount": cmd = KnownCommand::BITCOUNT; @@ -257,6 +265,7 @@ function command_from(cmd_bytes: bytes): optional { case b"getrange": cmd = KnownCommand::GETRANGE; case b"getset": cmd = KnownCommand::GETSET; case b"hdel": cmd = KnownCommand::HDEL; + case b"hello": cmd = KnownCommand::HELLO; case b"hget": cmd = KnownCommand::HGET; case b"hset": cmd = KnownCommand::HSET; case b"incr": cmd = KnownCommand::INCR; @@ -266,10 +275,19 @@ function command_from(cmd_bytes: bytes): optional { case b"move": cmd = KnownCommand::MOVE; case b"mset": cmd = KnownCommand::MSET; case b"persist": cmd = KnownCommand::PERSIST; + case b"psubscribe": cmd = KnownCommand::PSUBSCRIBE; + case b"punsubscribe": cmd = KnownCommand::PUNSUBSCRIBE; + case b"quit": cmd = KnownCommand::QUIT; case b"rename": cmd = KnownCommand::RENAME; + case b"reset": cmd = KnownCommand::RESET; + case b"set": cmd = KnownCommand::SET; case b"strlen": cmd = KnownCommand::STRLEN; + case b"ssubscribe": cmd = KnownCommand::SSUBSCRIBE; + case b"subscribe": cmd = KnownCommand::SUBSCRIBE; + case b"sunsubscribe": cmd = KnownCommand::SUNSUBSCRIBE; case b"ttl": cmd = KnownCommand::TTL; case b"type": cmd = KnownCommand::TYPE; + case b"unsubscribe": cmd = KnownCommand::UNSUBSCRIBE; default: cmd = Null; } @@ -368,12 +386,40 @@ public function is_auth(data: RESP::ClientData): bool { return data.command.known && *(data.command.known) == KnownCommand::AUTH && |data.command.raw| >= 2; } +type Hello = struct { + requested_resp_version: optional; +}; + +public function make_hello(command: Command): Hello { + local hi: Hello = [$requested_resp_version = Null]; + if (|command.raw| > 1) + hi.requested_resp_version = command.raw[1]; + return hi; +} + +public function is_hello(data: RESP::ClientData): bool { + return data.command.known && *(data.command.known) == KnownCommand::HELLO; +} + type ReplyData = struct { value: optional; }; -public function is_err(server_data: RESP::ServerData): bool { - return server_data.data?.simple_error || server_data.data?.bulk_error; +public type ReplyType = enum { + Reply, # A response to a command + Error, # An error response to a command + Push, # A server message that is not responding to a command +}; + +public function classify(server_data: RESP::ServerData): ReplyType { + if (server_data.data?.simple_error || server_data.data?.bulk_error) + return ReplyType::Error; + + # We can tell with RESP3 this is push here, but RESP2 relies on scripts + if (server_data.data?.push) + return ReplyType::Push; + + return ReplyType::Reply; } function bulk_string_content(bulk: RESP::BulkString): bytes { @@ -397,10 +443,10 @@ function stringify(data: RESP::Data): optional { return bulk_string_content(data.verbatim_string); else if (data?.boolean) return data.boolean.val ? b"T" : b"F"; - else if (data?.array) { + else if (data?.array || data?.push) { local res = b"["; local first = True; - for (ele in data.array.elements) { + for (ele in data?.array ? data.array.elements : data.push.elements) { if (!first) res += b", "; local ele_stringified = stringify(ele); diff --git a/src/analyzer/protocol/redis/resp.evt b/src/analyzer/protocol/redis/resp.evt index f67f8fbc1c..c84ab36084 100644 --- a/src/analyzer/protocol/redis/resp.evt +++ b/src/analyzer/protocol/redis/resp.evt @@ -12,9 +12,14 @@ export Redis::KnownCommand; on RESP::ClientData if ( Redis::is_set(self) ) -> event Redis::set_command($conn, Redis::make_set(self.command)); on RESP::ClientData if ( Redis::is_get(self) ) -> event Redis::get_command($conn, Redis::make_get(self.command).key); on RESP::ClientData if ( Redis::is_auth(self) ) -> event Redis::auth_command($conn, Redis::make_auth(self.command)); +on RESP::ClientData if ( Redis::is_hello(self) ) -> event Redis::hello_command($conn, Redis::make_hello(self.command)); # All client data is a command on RESP::ClientData -> event Redis::command($conn, self.command); -on RESP::ServerData if ( ! Redis::is_err(self) ) -> event Redis::reply($conn, Redis::make_server_reply(self)); -on RESP::ServerData if ( Redis::is_err(self) ) -> event Redis::error($conn, Redis::make_server_reply(self)); +on RESP::ServerData if ( Redis::classify(self) == Redis::ReplyType::Reply ) -> + event Redis::reply($conn, Redis::make_server_reply(self)); +on RESP::ServerData if ( Redis::classify(self) == Redis::ReplyType::Error ) -> + event Redis::error($conn, Redis::make_server_reply(self)); +on RESP::ServerData if ( Redis::classify(self) == Redis::ReplyType::Push ) -> + event Redis::server_push($conn, Redis::make_server_reply(self)); diff --git a/src/analyzer/protocol/redis/resp.spicy b/src/analyzer/protocol/redis/resp.spicy index 3216fd6d9b..2fb73277dc 100644 --- a/src/analyzer/protocol/redis/resp.spicy +++ b/src/analyzer/protocol/redis/resp.spicy @@ -79,6 +79,11 @@ public type ServerData = unit { %synchronize-after = b"\x0d\x0a"; var depth: uint8& = new uint8; data: Data(self.depth); + + var type_: Redis::ReplyType; + on %done { + self.type_ = Redis::classify(self); + } }; type Data = unit(depth: uint8&) { diff --git a/testing/btest/Baseline/coverage.record-fields/out.default b/testing/btest/Baseline/coverage.record-fields/out.default index 1be477339c..213a4bc1e0 100644 --- a/testing/btest/Baseline/coverage.record-fields/out.default +++ b/testing/btest/Baseline/coverage.record-fields/out.default @@ -615,6 +615,9 @@ connection { } * pending: table[count] of record Redis::Info, log=F, optional=F Redis::Info { ... } + * resp_version: enum Redis::RESPVersion, log=F, optional=T + * skip_commands: set[count], log=F, optional=F + * subscribed_mode: bool, log=F, optional=T * violation: bool, log=F, optional=T } * removal_hooks: set[func], log=F, optional=T diff --git a/testing/btest/Baseline/scripts.base.protocols.redis.pubsub-resp3/output b/testing/btest/Baseline/scripts.base.protocols.redis.pubsub-resp3/output new file mode 100644 index 0000000000..60f704875f --- /dev/null +++ b/testing/btest/Baseline/scripts.base.protocols.redis.pubsub-resp3/output @@ -0,0 +1,6 @@ +### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Got published data!, [value=] +Got published data!, [value=] +Got published data!, [value=[message, Foo, Hi:)]] +Got published data!, [value=[pmessage, F*, Foo, Hi:)]] +Got published data!, [value=[pmessage, F*, Foobar, Hello!]] diff --git a/testing/btest/Baseline/scripts.base.protocols.redis.pubsub/output b/testing/btest/Baseline/scripts.base.protocols.redis.pubsub/output index 49d861c74c..460df0d529 100644 --- a/testing/btest/Baseline/scripts.base.protocols.redis.pubsub/output +++ b/testing/btest/Baseline/scripts.base.protocols.redis.pubsub/output @@ -1 +1,6 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Got published data!, [value=] +Got published data!, [value=] +Got published data!, [value=[message, Foo, Hi there :)]] +Got published data!, [value=[pmessage, F*, Foo, Hi there :)]] +Got published data!, [value=[pmessage, F*, FeeFooFiiFum, Hello! :)]] diff --git a/testing/btest/Baseline/scripts.base.protocols.redis.pubsub/redis.log b/testing/btest/Baseline/scripts.base.protocols.redis.pubsub/redis.log index c9d2b81974..11ed195257 100644 --- a/testing/btest/Baseline/scripts.base.protocols.redis.pubsub/redis.log +++ b/testing/btest/Baseline/scripts.base.protocols.redis.pubsub/redis.log @@ -7,7 +7,11 @@ #open XXXX-XX-XX-XX-XX-XX #fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p cmd.name cmd.key cmd.value success reply.value #types time string addr port addr port string string string bool string -XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 56162 127.0.0.1 6379 SUBSCRIBE - - T - -XXXXXXXXXX.XXXXXX ClEkJM2Vm5giqnMf4h 127.0.0.1 56163 127.0.0.1 6379 PUBLISH - - T - -XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 56162 127.0.0.1 6379 - - - T [message, my_channel, hello :)] +XXXXXXXXXX.XXXXXX ClEkJM2Vm5giqnMf4h 127.0.0.1 60833 127.0.0.1 6379 PUBLISH - - T - +XXXXXXXXXX.XXXXXX C4J4Th3PJpwUYZZ6gc 127.0.0.1 60837 127.0.0.1 6379 PUBLISH - - T - +XXXXXXXXXX.XXXXXX CtPZjS20MLrsMUOJi2 127.0.0.1 60838 127.0.0.1 6379 SET sanity_check you_are_sane T OK +XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 60831 127.0.0.1 6379 SUBSCRIBE - - - - +XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 60831 127.0.0.1 6379 PSUBSCRIBE - - - - +XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 60831 127.0.0.1 6379 RESET - - T RESET +XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 60831 127.0.0.1 6379 GET sanity_check - T you_are_sane #close XXXX-XX-XX-XX-XX-XX diff --git a/testing/btest/Traces/redis/pubsub-resp3.pcap b/testing/btest/Traces/redis/pubsub-resp3.pcap new file mode 100644 index 0000000000..de591be1ce Binary files /dev/null and b/testing/btest/Traces/redis/pubsub-resp3.pcap differ diff --git a/testing/btest/Traces/redis/pubsub.pcap b/testing/btest/Traces/redis/pubsub.pcap index 458070f948..d02630a7a4 100644 Binary files a/testing/btest/Traces/redis/pubsub.pcap and b/testing/btest/Traces/redis/pubsub.pcap differ diff --git a/testing/btest/scripts/base/protocols/redis/pubsub-resp3.zeek b/testing/btest/scripts/base/protocols/redis/pubsub-resp3.zeek new file mode 100644 index 0000000000..59d324edbe --- /dev/null +++ b/testing/btest/scripts/base/protocols/redis/pubsub-resp3.zeek @@ -0,0 +1,16 @@ +# @TEST-DOC: Test Zeek parsing pubsub commands in RESP3 +# @TEST-REQUIRES: have-spicy +# +# @TEST-EXEC: zeek -b -r $TRACES/redis/pubsub-resp3.pcap %INPUT >output +# @TEST-EXEC: btest-diff output + +# Test pub/sub from Redis. This has two subscribers, one using a pattern. Then, the +# messages that were published get printed to output. + +@load base/protocols/redis + +event Redis::server_push(c: connection, data: Redis::ReplyData) + { + # The first 2 are SUBSCRIBE replies, the other 3 are message and pmessage + print "Got published data!", data; + } diff --git a/testing/btest/scripts/base/protocols/redis/pubsub.zeek b/testing/btest/scripts/base/protocols/redis/pubsub.zeek index 15f6d9bb59..bd79325c86 100644 --- a/testing/btest/scripts/base/protocols/redis/pubsub.zeek +++ b/testing/btest/scripts/base/protocols/redis/pubsub.zeek @@ -5,8 +5,12 @@ # @TEST-EXEC: btest-diff output # @TEST-EXEC: btest-diff redis.log -# Testing the example of pub sub in REDIS docs: -# https://redis.io/docs/latest/develop/interact/pubsub/ -# These are just commands between two different clients, one PUBLISH and one SUBSCRIBE +# Test pub/sub from Redis. This has two subscribers, one using a pattern. Then, the +# messages that were published get printed to output. @load base/protocols/redis + +event Redis::server_push(c: connection, data: Redis::ReplyData) + { + print "Got published data!", data; + }