Handle Redis protocol message separately

Closes #4504

Messages are not typical responses, so they need special handling. This
is different between RESP2 and 3, so this is the first instance where
the script layer needs to tell the difference.
This commit is contained in:
Evan Typanski 2025-06-10 16:00:22 -04:00
parent 8b914f4714
commit a4ce682bc9
13 changed files with 207 additions and 17 deletions

View file

@ -41,6 +41,11 @@ export {
end: count &optional; end: count &optional;
}; };
type RESPVersion: enum {
RESP2,
RESP3
};
type State: record { type State: record {
## Pending commands. ## Pending commands.
pending: table[count] of Info; pending: table[count] of Info;
@ -52,14 +57,34 @@ export {
## Each range is one or two elements, one meaning it's unbounded, two meaning ## Each range is one or two elements, one meaning it's unbounded, two meaning
## it begins at one and ends at the second. ## it begins at one and ends at the second.
no_reply_ranges: vector of NoReplyRange; no_reply_ranges: vector of NoReplyRange;
## The command indexes (from current_command and current_reply) that will
## not get responses no matter what.
skip_commands: set[count];
## We store if this analyzer had a violation to avoid logging if so. ## We store if this analyzer had a violation to avoid logging if so.
## This should not be super necessary, but worth a shot. ## This should not be super necessary, but worth a shot.
violation: bool &default=F; violation: bool &default=F;
## If we are in "subscribed" mode
subscribed_mode: bool &default=F;
## The RESP version
resp_version: RESPVersion &default=RESP2;
}; };
# Redis specifically mentions 10k commands as a good pipelining threshold, so # Redis specifically mentions 10k commands as a good pipelining threshold, so
# we'll piggyback on that. # we'll piggyback on that.
option max_pending_commands = 10000; option max_pending_commands = 10000;
# These commands enter subscribed mode
global enter_subscribed_mode = [KnownCommand_PSUBSCRIBE,
KnownCommand_SSUBSCRIBE, KnownCommand_SUBSCRIBE];
# These commands exit subscribed mode
global exit_subscribed_mode = [KnownCommand_RESET, KnownCommand_QUIT];
# These commands don't expect a response (ever) - their replies are out of band.
global no_response_commands = [KnownCommand_PSUBSCRIBE,
KnownCommand_PUNSUBSCRIBE, KnownCommand_SSUBSCRIBE,
KnownCommand_SUBSCRIBE, KnownCommand_SUNSUBSCRIBE,
KnownCommand_UNSUBSCRIBE];
} }
redef record connection += { redef record connection += {
@ -122,6 +147,15 @@ function is_last_interval_closed(c: connection): bool
c$redis_state$no_reply_ranges[-1]?$end; c$redis_state$no_reply_ranges[-1]?$end;
} }
event Redis::hello_command(c: connection, hello: HelloCommand)
{
if ( ! c?$redis_state )
make_new_state(c);
if ( hello?$requested_resp_version && hello$requested_resp_version == "3" )
c$redis_state$resp_version = RESP3;
}
event Redis::command(c: connection, cmd: Command) event Redis::command(c: connection, cmd: Command)
{ {
if ( ! c?$redis_state ) if ( ! c?$redis_state )
@ -139,6 +173,26 @@ event Redis::command(c: connection, cmd: Command)
} }
++c$redis_state$current_command; ++c$redis_state$current_command;
if ( cmd?$known )
{
if ( c$redis_state$resp_version == RESP2 )
{
local should_enter = cmd$known in enter_subscribed_mode;
local should_exit = cmd$known in exit_subscribed_mode;
c$redis_state$subscribed_mode = should_enter && ! should_exit;
# It's weird if it's in both - in the future users may be able to add that
if ( should_enter && should_exit )
Reporter::conn_weird("Redis_command_enter_exit_subscribed_mode", c, cat(
cmd$known));
}
if ( cmd$known in no_response_commands || c$redis_state$subscribed_mode )
{
add c$redis_state$skip_commands[c$redis_state$current_command];
}
}
# CLIENT commands can skip a number of replies and may be used with # CLIENT commands can skip a number of replies and may be used with
# pipelining. We need special logic in order to track the command/reply # pipelining. We need special logic in order to track the command/reply
# pairs. # pairs.
@ -177,6 +231,7 @@ event Redis::command(c: connection, cmd: Command)
} }
} }
} }
set_state(c, T); set_state(c, T);
c$redis$cmd = cmd; c$redis$cmd = cmd;
@ -187,17 +242,24 @@ event Redis::command(c: connection, cmd: Command)
function reply_num(c: connection): count function reply_num(c: connection): count
{ {
local resp_num = c$redis_state$current_reply + 1; local resp_num = c$redis_state$current_reply + 1;
local result = resp_num;
for ( i in c$redis_state$no_reply_ranges ) for ( i in c$redis_state$no_reply_ranges )
{ {
local range = c$redis_state$no_reply_ranges[i]; local range = c$redis_state$no_reply_ranges[i];
if ( ! range?$end && resp_num > range$begin ) if ( ! range?$end && resp_num > range$begin )
{ } # TODO: This is necessary if not using pipelining { } # TODO: This is necessary if not using pipelining
if ( range?$end && resp_num >= range$begin && resp_num < range$end ) if ( range?$end && resp_num >= range$begin && resp_num < range$end )
return range$end; result = range$end;
} }
# Default: no disable/enable shenanigans # Account for commands that don't expect a response
return resp_num; while ( result in c$redis_state$skip_commands )
{
delete c$redis_state$skip_commands[result];
result += 1;
}
return result;
} }
# Logs up to and including the last seen command from the last reply # Logs up to and including the last seen command from the last reply
@ -234,6 +296,11 @@ event Redis::reply(c: connection, data: ReplyData)
if ( ! c?$redis_state ) if ( ! c?$redis_state )
make_new_state(c); make_new_state(c);
if ( c$redis_state$subscribed_mode )
{
event server_push(c, data);
return;
}
local previous_reply_num = c$redis_state$current_reply; local previous_reply_num = c$redis_state$current_reply;
c$redis_state$current_reply = reply_num(c); c$redis_state$current_reply = reply_num(c);
set_state(c, F); set_state(c, F);
@ -241,6 +308,10 @@ event Redis::reply(c: connection, data: ReplyData)
c$redis$reply = data; c$redis$reply = data;
c$redis$success = T; c$redis$success = T;
log_from(c, previous_reply_num); log_from(c, previous_reply_num);
# Tidy up the skip_commands when it's up to date
if ( c$redis_state$current_command == c$redis_state$current_reply )
clear_table(c$redis_state$skip_commands);
} }
event Redis::error(c: connection, data: ReplyData) event Redis::error(c: connection, data: ReplyData)

View file

@ -37,6 +37,12 @@ export {
password: string; password: string;
}; };
## The Redis HELLO command (handshake).
type HelloCommand: record {
## The sent requested RESP version, such as "2" or "3"
requested_resp_version: string &optional;
};
## A generic Redis command from the client. ## A generic Redis command from the client.
type Command: record { type Command: record {
## The raw command, exactly as parsed ## The raw command, exactly as parsed
@ -79,6 +85,13 @@ global get_command: event(c: connection, key: string);
## command: The AUTH command sent to the server and its data. ## command: The AUTH command sent to the server and its data.
global auth_command: event(c: connection, command: AuthCommand); global auth_command: event(c: connection, command: AuthCommand);
## Generated for Redis HELLO commands sent to the Redis server.
##
## c: The connection.
##
## command: The HELLO command sent to the server and its data.
global hello_command: event(c: connection, command: HelloCommand);
## Generated for every command sent by the client to the Redis server. ## Generated for every command sent by the client to the Redis server.
## ##
## c: The connection. ## c: The connection.
@ -87,11 +100,15 @@ global auth_command: event(c: connection, command: AuthCommand);
global command: event(c: connection, cmd: Command); global command: event(c: connection, cmd: Command);
## Generated for every successful response sent by the Redis server to the ## Generated for every successful response sent by the Redis server to the
## client. ## client. For RESP2, this includes "push" messages, which are out of band.
## These will also raise a server_push event. RESP3 push messages will only
## raise a server_push event.
## ##
## c: The connection. ## c: The connection.
## ##
## data: The server data sent to the client. ## data: The server data sent to the client.
##
## .. zeek:see:: Redis::server_push
global reply: event(c: connection, data: ReplyData); global reply: event(c: connection, data: ReplyData);
## Generated for every error response sent by the Redis server to the ## Generated for every error response sent by the Redis server to the
@ -101,3 +118,11 @@ global reply: event(c: connection, data: ReplyData);
## ##
## data: The server data sent to the client. ## data: The server data sent to the client.
global error: event(c: connection, data: ReplyData); global error: event(c: connection, data: ReplyData);
## Generated for out-of-band data, outside of the request-response
## model.
##
## c: The connection.
##
## data: The server data sent to the client.
global server_push: event(c: connection, data: ReplyData);

View file

@ -34,6 +34,7 @@ public type KnownCommand = enum {
GETRANGE, GETRANGE,
GETSET, GETSET,
HDEL, HDEL,
HELLO,
HGET, HGET,
HSET, HSET,
INCR, INCR,
@ -43,11 +44,19 @@ public type KnownCommand = enum {
MOVE, MOVE,
MSET, MSET,
PERSIST, PERSIST,
PSUBSCRIBE,
PUNSUBSCRIBE,
QUIT,
RENAME, RENAME,
RESET,
SET, SET,
STRLEN, STRLEN,
SUBSCRIBE,
SSUBSCRIBE,
SUNSUBSCRIBE,
TTL, TTL,
TYPE, TYPE,
UNSUBSCRIBE,
}; };
type Command = struct { type Command = struct {
@ -228,7 +237,6 @@ function parse_command(raw: vector<bytes>): Command {
function command_from(cmd_bytes: bytes): optional<KnownCommand> { function command_from(cmd_bytes: bytes): optional<KnownCommand> {
local cmd: optional<KnownCommand> = Null; local cmd: optional<KnownCommand> = Null;
switch (cmd_bytes.lower()) { switch (cmd_bytes.lower()) {
case b"set": cmd = KnownCommand::SET;
case b"append": cmd = KnownCommand::APPEND; case b"append": cmd = KnownCommand::APPEND;
case b"auth": cmd = KnownCommand::AUTH; case b"auth": cmd = KnownCommand::AUTH;
case b"bitcount": cmd = KnownCommand::BITCOUNT; case b"bitcount": cmd = KnownCommand::BITCOUNT;
@ -257,6 +265,7 @@ function command_from(cmd_bytes: bytes): optional<KnownCommand> {
case b"getrange": cmd = KnownCommand::GETRANGE; case b"getrange": cmd = KnownCommand::GETRANGE;
case b"getset": cmd = KnownCommand::GETSET; case b"getset": cmd = KnownCommand::GETSET;
case b"hdel": cmd = KnownCommand::HDEL; case b"hdel": cmd = KnownCommand::HDEL;
case b"hello": cmd = KnownCommand::HELLO;
case b"hget": cmd = KnownCommand::HGET; case b"hget": cmd = KnownCommand::HGET;
case b"hset": cmd = KnownCommand::HSET; case b"hset": cmd = KnownCommand::HSET;
case b"incr": cmd = KnownCommand::INCR; case b"incr": cmd = KnownCommand::INCR;
@ -266,10 +275,19 @@ function command_from(cmd_bytes: bytes): optional<KnownCommand> {
case b"move": cmd = KnownCommand::MOVE; case b"move": cmd = KnownCommand::MOVE;
case b"mset": cmd = KnownCommand::MSET; case b"mset": cmd = KnownCommand::MSET;
case b"persist": cmd = KnownCommand::PERSIST; case b"persist": cmd = KnownCommand::PERSIST;
case b"psubscribe": cmd = KnownCommand::PSUBSCRIBE;
case b"punsubscribe": cmd = KnownCommand::PUNSUBSCRIBE;
case b"quit": cmd = KnownCommand::QUIT;
case b"rename": cmd = KnownCommand::RENAME; case b"rename": cmd = KnownCommand::RENAME;
case b"reset": cmd = KnownCommand::RESET;
case b"set": cmd = KnownCommand::SET;
case b"strlen": cmd = KnownCommand::STRLEN; case b"strlen": cmd = KnownCommand::STRLEN;
case b"ssubscribe": cmd = KnownCommand::SSUBSCRIBE;
case b"subscribe": cmd = KnownCommand::SUBSCRIBE;
case b"sunsubscribe": cmd = KnownCommand::SUNSUBSCRIBE;
case b"ttl": cmd = KnownCommand::TTL; case b"ttl": cmd = KnownCommand::TTL;
case b"type": cmd = KnownCommand::TYPE; case b"type": cmd = KnownCommand::TYPE;
case b"unsubscribe": cmd = KnownCommand::UNSUBSCRIBE;
default: cmd = Null; default: cmd = Null;
} }
@ -368,12 +386,40 @@ public function is_auth(data: RESP::ClientData): bool {
return data.command.known && *(data.command.known) == KnownCommand::AUTH && |data.command.raw| >= 2; return data.command.known && *(data.command.known) == KnownCommand::AUTH && |data.command.raw| >= 2;
} }
type Hello = struct {
requested_resp_version: optional<bytes>;
};
public function make_hello(command: Command): Hello {
local hi: Hello = [$requested_resp_version = Null];
if (|command.raw| > 1)
hi.requested_resp_version = command.raw[1];
return hi;
}
public function is_hello(data: RESP::ClientData): bool {
return data.command.known && *(data.command.known) == KnownCommand::HELLO;
}
type ReplyData = struct { type ReplyData = struct {
value: optional<bytes>; value: optional<bytes>;
}; };
public function is_err(server_data: RESP::ServerData): bool { public type ReplyType = enum {
return server_data.data?.simple_error || server_data.data?.bulk_error; Reply, # A response to a command
Error, # An error response to a command
Push, # A server message that is not responding to a command
};
public function classify(server_data: RESP::ServerData): ReplyType {
if (server_data.data?.simple_error || server_data.data?.bulk_error)
return ReplyType::Error;
# We can tell with RESP3 this is push here, but RESP2 relies on scripts
if (server_data.data?.push)
return ReplyType::Push;
return ReplyType::Reply;
} }
function bulk_string_content(bulk: RESP::BulkString): bytes { function bulk_string_content(bulk: RESP::BulkString): bytes {
@ -397,10 +443,10 @@ function stringify(data: RESP::Data): optional<bytes> {
return bulk_string_content(data.verbatim_string); return bulk_string_content(data.verbatim_string);
else if (data?.boolean) else if (data?.boolean)
return data.boolean.val ? b"T" : b"F"; return data.boolean.val ? b"T" : b"F";
else if (data?.array) { else if (data?.array || data?.push) {
local res = b"["; local res = b"[";
local first = True; local first = True;
for (ele in data.array.elements) { for (ele in data?.array ? data.array.elements : data.push.elements) {
if (!first) if (!first)
res += b", "; res += b", ";
local ele_stringified = stringify(ele); local ele_stringified = stringify(ele);

View file

@ -12,9 +12,14 @@ export Redis::KnownCommand;
on RESP::ClientData if ( Redis::is_set(self) ) -> event Redis::set_command($conn, Redis::make_set(self.command)); on RESP::ClientData if ( Redis::is_set(self) ) -> event Redis::set_command($conn, Redis::make_set(self.command));
on RESP::ClientData if ( Redis::is_get(self) ) -> event Redis::get_command($conn, Redis::make_get(self.command).key); on RESP::ClientData if ( Redis::is_get(self) ) -> event Redis::get_command($conn, Redis::make_get(self.command).key);
on RESP::ClientData if ( Redis::is_auth(self) ) -> event Redis::auth_command($conn, Redis::make_auth(self.command)); on RESP::ClientData if ( Redis::is_auth(self) ) -> event Redis::auth_command($conn, Redis::make_auth(self.command));
on RESP::ClientData if ( Redis::is_hello(self) ) -> event Redis::hello_command($conn, Redis::make_hello(self.command));
# All client data is a command # All client data is a command
on RESP::ClientData -> event Redis::command($conn, self.command); on RESP::ClientData -> event Redis::command($conn, self.command);
on RESP::ServerData if ( ! Redis::is_err(self) ) -> event Redis::reply($conn, Redis::make_server_reply(self)); on RESP::ServerData if ( Redis::classify(self) == Redis::ReplyType::Reply ) ->
on RESP::ServerData if ( Redis::is_err(self) ) -> event Redis::error($conn, Redis::make_server_reply(self)); event Redis::reply($conn, Redis::make_server_reply(self));
on RESP::ServerData if ( Redis::classify(self) == Redis::ReplyType::Error ) ->
event Redis::error($conn, Redis::make_server_reply(self));
on RESP::ServerData if ( Redis::classify(self) == Redis::ReplyType::Push ) ->
event Redis::server_push($conn, Redis::make_server_reply(self));

View file

@ -79,6 +79,11 @@ public type ServerData = unit {
%synchronize-after = b"\x0d\x0a"; %synchronize-after = b"\x0d\x0a";
var depth: uint8& = new uint8; var depth: uint8& = new uint8;
data: Data(self.depth); data: Data(self.depth);
var type_: Redis::ReplyType;
on %done {
self.type_ = Redis::classify(self);
}
}; };
type Data = unit(depth: uint8&) { type Data = unit(depth: uint8&) {

View file

@ -615,6 +615,9 @@ connection {
} }
* pending: table[count] of record Redis::Info, log=F, optional=F * pending: table[count] of record Redis::Info, log=F, optional=F
Redis::Info { ... } Redis::Info { ... }
* resp_version: enum Redis::RESPVersion, log=F, optional=T
* skip_commands: set[count], log=F, optional=F
* subscribed_mode: bool, log=F, optional=T
* violation: bool, log=F, optional=T * violation: bool, log=F, optional=T
} }
* removal_hooks: set[func], log=F, optional=T * removal_hooks: set[func], log=F, optional=T

View file

@ -0,0 +1,6 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Got published data!, [value=<uninitialized>]
Got published data!, [value=<uninitialized>]
Got published data!, [value=[message, Foo, Hi:)]]
Got published data!, [value=[pmessage, F*, Foo, Hi:)]]
Got published data!, [value=[pmessage, F*, Foobar, Hello!]]

View file

@ -1 +1,6 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Got published data!, [value=<uninitialized>]
Got published data!, [value=<uninitialized>]
Got published data!, [value=[message, Foo, Hi there :)]]
Got published data!, [value=[pmessage, F*, Foo, Hi there :)]]
Got published data!, [value=[pmessage, F*, FeeFooFiiFum, Hello! :)]]

View file

@ -7,7 +7,11 @@
#open XXXX-XX-XX-XX-XX-XX #open XXXX-XX-XX-XX-XX-XX
#fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p cmd.name cmd.key cmd.value success reply.value #fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p cmd.name cmd.key cmd.value success reply.value
#types time string addr port addr port string string string bool string #types time string addr port addr port string string string bool string
XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 56162 127.0.0.1 6379 SUBSCRIBE - - T - XXXXXXXXXX.XXXXXX ClEkJM2Vm5giqnMf4h 127.0.0.1 60833 127.0.0.1 6379 PUBLISH - - T -
XXXXXXXXXX.XXXXXX ClEkJM2Vm5giqnMf4h 127.0.0.1 56163 127.0.0.1 6379 PUBLISH - - T - XXXXXXXXXX.XXXXXX C4J4Th3PJpwUYZZ6gc 127.0.0.1 60837 127.0.0.1 6379 PUBLISH - - T -
XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 56162 127.0.0.1 6379 - - - T [message, my_channel, hello :)] XXXXXXXXXX.XXXXXX CtPZjS20MLrsMUOJi2 127.0.0.1 60838 127.0.0.1 6379 SET sanity_check you_are_sane T OK
XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 60831 127.0.0.1 6379 SUBSCRIBE - - - -
XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 60831 127.0.0.1 6379 PSUBSCRIBE - - - -
XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 60831 127.0.0.1 6379 RESET - - T RESET
XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 60831 127.0.0.1 6379 GET sanity_check - T you_are_sane
#close XXXX-XX-XX-XX-XX-XX #close XXXX-XX-XX-XX-XX-XX

Binary file not shown.

View file

@ -0,0 +1,16 @@
# @TEST-DOC: Test Zeek parsing pubsub commands in RESP3
# @TEST-REQUIRES: have-spicy
#
# @TEST-EXEC: zeek -b -r $TRACES/redis/pubsub-resp3.pcap %INPUT >output
# @TEST-EXEC: btest-diff output
# Test pub/sub from Redis. This has two subscribers, one using a pattern. Then, the
# messages that were published get printed to output.
@load base/protocols/redis
event Redis::server_push(c: connection, data: Redis::ReplyData)
{
# The first 2 are SUBSCRIBE replies, the other 3 are message and pmessage
print "Got published data!", data;
}

View file

@ -5,8 +5,12 @@
# @TEST-EXEC: btest-diff output # @TEST-EXEC: btest-diff output
# @TEST-EXEC: btest-diff redis.log # @TEST-EXEC: btest-diff redis.log
# Testing the example of pub sub in REDIS docs: # Test pub/sub from Redis. This has two subscribers, one using a pattern. Then, the
# https://redis.io/docs/latest/develop/interact/pubsub/ # messages that were published get printed to output.
# These are just commands between two different clients, one PUBLISH and one SUBSCRIBE
@load base/protocols/redis @load base/protocols/redis
event Redis::server_push(c: connection, data: Redis::ReplyData)
{
print "Got published data!", data;
}