Handle Redis protocol message separately

Closes #4504

Messages from pub/sub need separate handling in order to not mess up
client request/reply correlation.
This commit is contained in:
Evan Typanski 2025-06-10 16:00:22 -04:00
parent bbca02fe70
commit 5af510d5d7
7 changed files with 52 additions and 10 deletions

View file

@ -101,3 +101,11 @@ global reply: event(c: connection, data: ReplyData);
## ##
## data: The server data sent to the client. ## data: The server data sent to the client.
global error: event(c: connection, data: ReplyData); global error: event(c: connection, data: ReplyData);
## Generated for every message the server sends that is not a reply to a
## command.
##
## c: The connection.
##
## data: The server data sent to the client.
global server_message: event(c: connection, data: ReplyData);

View file

@ -372,8 +372,27 @@ type ReplyData = struct {
value: optional<bytes>; value: optional<bytes>;
}; };
public function is_err(server_data: RESP::ServerData): bool { public type ReplyType = enum {
return server_data.data?.simple_error || server_data.data?.bulk_error; Reply, # A response to a command
Error, # An error response to a command
Message, # A server message that is not responding to a command
};
public function classify(server_data: RESP::ServerData): ReplyType {
if (server_data.data?.simple_error || server_data.data?.bulk_error)
return ReplyType::Error;
# 'message' and 'pmessage' responses (from pub/sub) are handled specially.
if (server_data.data?.array) {
if (server_data.data.array.num_elements >= 3) {
local stringified = stringify(*server_data.data.array.elements.at(0));
if (stringified && (*stringified == b"message" || *stringified == b"pmessage")) {
return ReplyType::Message;
}
}
}
return ReplyType::Reply;
} }
function bulk_string_content(bulk: RESP::BulkString): bytes { function bulk_string_content(bulk: RESP::BulkString): bytes {

View file

@ -16,5 +16,9 @@ on RESP::ClientData if ( Redis::is_auth(self) ) -> event Redis::auth_command($co
# All client data is a command # All client data is a command
on RESP::ClientData -> event Redis::command($conn, self.command); on RESP::ClientData -> event Redis::command($conn, self.command);
on RESP::ServerData if ( ! Redis::is_err(self) ) -> event Redis::reply($conn, Redis::make_server_reply(self)); on RESP::ServerData if ( Redis::classify(self) == Redis::ReplyType::Reply ) ->
on RESP::ServerData if ( Redis::is_err(self) ) -> event Redis::error($conn, Redis::make_server_reply(self)); event Redis::reply($conn, Redis::make_server_reply(self));
on RESP::ServerData if ( Redis::classify(self) == Redis::ReplyType::Error ) ->
event Redis::error($conn, Redis::make_server_reply(self));
on RESP::ServerData if ( Redis::classify(self) == Redis::ReplyType::Message ) ->
event Redis::server_message($conn, Redis::make_server_reply(self));

View file

@ -1 +1,4 @@
### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63.
Got published data!, [value=[message, Foo, Hi there :)]]
Got published data!, [value=[pmessage, F*, Foo, Hi there :)]]
Got published data!, [value=[pmessage, F*, FeeFooFiiFum, Hello! :)]]

View file

@ -7,7 +7,11 @@
#open XXXX-XX-XX-XX-XX-XX #open XXXX-XX-XX-XX-XX-XX
#fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p cmd.name cmd.key cmd.value success reply.value #fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p cmd.name cmd.key cmd.value success reply.value
#types time string addr port addr port string string string bool string #types time string addr port addr port string string string bool string
XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 56162 127.0.0.1 6379 SUBSCRIBE - - T - XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 60831 127.0.0.1 6379 SUBSCRIBE - - T -
XXXXXXXXXX.XXXXXX ClEkJM2Vm5giqnMf4h 127.0.0.1 56163 127.0.0.1 6379 PUBLISH - - T - XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 60831 127.0.0.1 6379 PSUBSCRIBE - - T -
XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 56162 127.0.0.1 6379 - - - T [message, my_channel, hello :)] XXXXXXXXXX.XXXXXX ClEkJM2Vm5giqnMf4h 127.0.0.1 60833 127.0.0.1 6379 PUBLISH - - T -
XXXXXXXXXX.XXXXXX C4J4Th3PJpwUYZZ6gc 127.0.0.1 60837 127.0.0.1 6379 PUBLISH - - T -
XXXXXXXXXX.XXXXXX CtPZjS20MLrsMUOJi2 127.0.0.1 60838 127.0.0.1 6379 SET sanity_check you_are_sane T OK
XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 60831 127.0.0.1 6379 RESET - - T RESET
XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 60831 127.0.0.1 6379 GET sanity_check - T you_are_sane
#close XXXX-XX-XX-XX-XX-XX #close XXXX-XX-XX-XX-XX-XX

View file

@ -5,8 +5,12 @@
# @TEST-EXEC: btest-diff output # @TEST-EXEC: btest-diff output
# @TEST-EXEC: btest-diff redis.log # @TEST-EXEC: btest-diff redis.log
# Testing the example of pub sub in REDIS docs: # Test pub/sub from Redis. This has two subscribers, one using a pattern. Then, the
# https://redis.io/docs/latest/develop/interact/pubsub/ # messages that were published get printed to output.
# These are just commands between two different clients, one PUBLISH and one SUBSCRIBE
@load base/protocols/redis @load base/protocols/redis
event Redis::server_message(c: connection, data: Redis::ReplyData)
{
print "Got published data!", data;
}