From 5af510d5d7a8bfbba5d81f21e007deed96d98d36 Mon Sep 17 00:00:00 2001 From: Evan Typanski Date: Tue, 10 Jun 2025 16:00:22 -0400 Subject: [PATCH] Handle Redis protocol `message` separately Closes #4504 Messages from pub/sub need separate handling in order to not mess up client request/reply correlation. --- .../base/protocols/redis/spicy-events.zeek | 8 ++++++ src/analyzer/protocol/redis/redis.spicy | 23 ++++++++++++++++-- src/analyzer/protocol/redis/resp.evt | 8 ++++-- .../output | 3 +++ .../redis.log | 10 +++++--- testing/btest/Traces/redis/pubsub.pcap | Bin 1520 -> 4850 bytes .../scripts/base/protocols/redis/pubsub.zeek | 10 +++++--- 7 files changed, 52 insertions(+), 10 deletions(-) diff --git a/scripts/base/protocols/redis/spicy-events.zeek b/scripts/base/protocols/redis/spicy-events.zeek index ad9a500f18..f397e019ca 100644 --- a/scripts/base/protocols/redis/spicy-events.zeek +++ b/scripts/base/protocols/redis/spicy-events.zeek @@ -101,3 +101,11 @@ global reply: event(c: connection, data: ReplyData); ## ## data: The server data sent to the client. global error: event(c: connection, data: ReplyData); + +## Generated for every message the server sends that is not a reply to a +## command. +## +## c: The connection. +## +## data: The server data sent to the client. +global server_message: event(c: connection, data: ReplyData); diff --git a/src/analyzer/protocol/redis/redis.spicy b/src/analyzer/protocol/redis/redis.spicy index 945ae8c83d..1f9f3c1f51 100644 --- a/src/analyzer/protocol/redis/redis.spicy +++ b/src/analyzer/protocol/redis/redis.spicy @@ -372,8 +372,27 @@ type ReplyData = struct { value: optional; }; -public function is_err(server_data: RESP::ServerData): bool { - return server_data.data?.simple_error || server_data.data?.bulk_error; +public type ReplyType = enum { + Reply, # A response to a command + Error, # An error response to a command + Message, # A server message that is not responding to a command +}; + +public function classify(server_data: RESP::ServerData): ReplyType { + if (server_data.data?.simple_error || server_data.data?.bulk_error) + return ReplyType::Error; + + # 'message' and 'pmessage' responses (from pub/sub) are handled specially. + if (server_data.data?.array) { + if (server_data.data.array.num_elements >= 3) { + local stringified = stringify(*server_data.data.array.elements.at(0)); + if (stringified && (*stringified == b"message" || *stringified == b"pmessage")) { + return ReplyType::Message; + } + } + } + + return ReplyType::Reply; } function bulk_string_content(bulk: RESP::BulkString): bytes { diff --git a/src/analyzer/protocol/redis/resp.evt b/src/analyzer/protocol/redis/resp.evt index f67f8fbc1c..0c840f5377 100644 --- a/src/analyzer/protocol/redis/resp.evt +++ b/src/analyzer/protocol/redis/resp.evt @@ -16,5 +16,9 @@ on RESP::ClientData if ( Redis::is_auth(self) ) -> event Redis::auth_command($co # All client data is a command on RESP::ClientData -> event Redis::command($conn, self.command); -on RESP::ServerData if ( ! Redis::is_err(self) ) -> event Redis::reply($conn, Redis::make_server_reply(self)); -on RESP::ServerData if ( Redis::is_err(self) ) -> event Redis::error($conn, Redis::make_server_reply(self)); +on RESP::ServerData if ( Redis::classify(self) == Redis::ReplyType::Reply ) -> + event Redis::reply($conn, Redis::make_server_reply(self)); +on RESP::ServerData if ( Redis::classify(self) == Redis::ReplyType::Error ) -> + event Redis::error($conn, Redis::make_server_reply(self)); +on RESP::ServerData if ( Redis::classify(self) == Redis::ReplyType::Message ) -> + event Redis::server_message($conn, Redis::make_server_reply(self)); diff --git a/testing/btest/Baseline/scripts.base.protocols.redis.pubsub/output b/testing/btest/Baseline/scripts.base.protocols.redis.pubsub/output index 49d861c74c..d8dc6c1fcc 100644 --- a/testing/btest/Baseline/scripts.base.protocols.redis.pubsub/output +++ b/testing/btest/Baseline/scripts.base.protocols.redis.pubsub/output @@ -1 +1,4 @@ ### BTest baseline data generated by btest-diff. Do not edit. Use "btest -U/-u" to update. Requires BTest >= 0.63. +Got published data!, [value=[message, Foo, Hi there :)]] +Got published data!, [value=[pmessage, F*, Foo, Hi there :)]] +Got published data!, [value=[pmessage, F*, FeeFooFiiFum, Hello! :)]] diff --git a/testing/btest/Baseline/scripts.base.protocols.redis.pubsub/redis.log b/testing/btest/Baseline/scripts.base.protocols.redis.pubsub/redis.log index c9d2b81974..be7e111203 100644 --- a/testing/btest/Baseline/scripts.base.protocols.redis.pubsub/redis.log +++ b/testing/btest/Baseline/scripts.base.protocols.redis.pubsub/redis.log @@ -7,7 +7,11 @@ #open XXXX-XX-XX-XX-XX-XX #fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p cmd.name cmd.key cmd.value success reply.value #types time string addr port addr port string string string bool string -XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 56162 127.0.0.1 6379 SUBSCRIBE - - T - -XXXXXXXXXX.XXXXXX ClEkJM2Vm5giqnMf4h 127.0.0.1 56163 127.0.0.1 6379 PUBLISH - - T - -XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 56162 127.0.0.1 6379 - - - T [message, my_channel, hello :)] +XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 60831 127.0.0.1 6379 SUBSCRIBE - - T - +XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 60831 127.0.0.1 6379 PSUBSCRIBE - - T - +XXXXXXXXXX.XXXXXX ClEkJM2Vm5giqnMf4h 127.0.0.1 60833 127.0.0.1 6379 PUBLISH - - T - +XXXXXXXXXX.XXXXXX C4J4Th3PJpwUYZZ6gc 127.0.0.1 60837 127.0.0.1 6379 PUBLISH - - T - +XXXXXXXXXX.XXXXXX CtPZjS20MLrsMUOJi2 127.0.0.1 60838 127.0.0.1 6379 SET sanity_check you_are_sane T OK +XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 60831 127.0.0.1 6379 RESET - - T RESET +XXXXXXXXXX.XXXXXX CHhAvVGS1DHFjwGM9 127.0.0.1 60831 127.0.0.1 6379 GET sanity_check - T you_are_sane #close XXXX-XX-XX-XX-XX-XX diff --git a/testing/btest/Traces/redis/pubsub.pcap b/testing/btest/Traces/redis/pubsub.pcap index 458070f948d89e2d4e528d39bd421720125154c1..d02630a7a49c53af4083ed1c72b91087da606b14 100644 GIT binary patch literal 4850 zcmb7|drXyO9LHY)&*0!O(H3XJwWCofP!rUtWb=e^)OkbkR^pJ$Q4#SFM~2R+FqF+^ znwS0%n}p>>HYF4%i=bLD|7Hyt{|*rMM+TF&dRhX zbK%M*eP>DE{hF_VgKQU-YMcwftX<7EeuT}MqE8oryJBosZn7Jjm<)F>!mVPsKQwEh z+{g`UyAVvyjx3Av4n(=6EWDizw^Eapha@-%w{Vz|qlQb_e0{u9HW9ybqjT6Okg!ap z3hAC?y5HC6Q@N451Dw%chA3}CmP-mdrEsk#+*)b(y>KFUH@-zV7UcIw#^QTJCj8nb zIM~7t4$W>{!mot@KxR{QU?xJSOg?^QIk@G=2KxO$!;Q?vmkxE2t1*8AGhn^KvmD%9 z9FQ+7LDWkQd33vHf+O8>>89ssPUQRzPAY2VlDW;4xgC5aH{oNq`xg9S^hueUG;K=i zWCyOeW9Z}@$Es!S0=FY^_)wdbTXgnLwMJEq6kF(3=N}JI#zT`!sv=1zd$Lhe)sVg*nR17norX-9w<3=COGmiJ?P(B*0sBaKGJm z^13Gms_HEKW~Fg%b=CEMo?qkKBJ-a|fthgIf9heyV{l_R3t!w~WHv^wJ4dd@v~&cn zw^ew?g8PJ-Z@@L>*^xsY-3FTANVg|+({nT@a)yBOE^6kIXE#p{g-^QAPVa*OQ@qdU z(>)8_Zj3`$PL8WE4@1$tVufdj7>V2|u6M@MRb#K%NpE}>91|mvr^3pob0m(Hw^8Hp zJ%>&|fwc;ibII!SW%XrR^@i@lNc>=p)_qBx?7sgQbMc&ax=_T6S9dVN;ev@=WJgR# zOPp4fP-ztJpxB4F2p0Lf;kkokyoO!>S?oi=_>9Q7GFFG`$gBg?a zwF!>w&@PLWRHp;HZf zqVI1BEidIMQYn%#XK6ntAi5d?)n()?-;MQ`|j0`n0 zeJ__;$kmvRhbG?Pg=a0e=UMNr2$35(mCB)Oiz9vi8e3i;JpMtFiW46nAvDrN@(Y zW;nDgubtMUbWhqe^+`KfQTW+TeAeWXHcfldo=X%Cu7-6DVVDu*)ZaASSE3f&#}Cn~ zoDWNi*7_MTIdQiFb7a65(N|ZP$(Cp>;LR;$krVP)FFEkp@={NZ*91qN(G{Fgv+5%` zqOQ#?b@1`M*I${0rs0x}Gf_6q@n_D6Z=GrUc|w%z^RRaB8_YTe&!_0oHtXzUbiB<< z?1+N?y*a*W=(TK=FAlDT(ki0K`4k){)VQP`o~BOzkK%-;huHbT#Wtu$U$>kp(b_<8 zVs&0uO?b(ctOhe1kDa?E+XPRPTUvDjUX^YA0{&>jz(H(#GBwWn?W|G#Q2~CuF^Qo? z^XaP}7B(yQ>A*fFuR%8I=T*Fy*QC|I^>JAH`t9Vjwf9q|B10~jBTih+@#r%-3Xa`= hZ_jr|j;d%JVMH&zA6QXFaS@RgG1BG7^igbm{4ai0Ge!Ua literal 1520 zcmb7^O=#0#7{}i>rAlg`+bLR64@Gd!vaP~q87@`_9e5B+Wr%cLYh9dO3&u^BxnEan}pbK)VNsheO$>X4>)I5^k~{F zd}!Ow=OnFK%`g1!=n;7D5rGd!g5l$*hk}D}+1(}Z6W2x(@@P7pObI+@4MSERWZgxEj(5820fEQdCy;CA z54|Ta6S5bv_hlQ+RmW>Hn2ntGhg_r|+;NycU?uFmV)!?4p!e&KnmO1hvb;WXC)ULx zcGNOA_JjYGZ=C7bH0)M1+vd?eeqo$rwtEJrY=Ogih(ZqqJmcDc(Zgyi z2Q2jY^}n1p;2hn`IryC;Cty!-*se%yR~{O>qRL{uaN*q~+(MS@y1o#CdmRIBY#isr z=gZp5y|?f?2LRA508FzzW?dDa^AX&cS5jkFt1;wMEx18f^4q(m=ioQB)UNBLPt9)r pOa2V{8E*U!3vTrHP62lX`meXMb3_jE(9L6k!`xQTt&w9m{R0q)i4*_; diff --git a/testing/btest/scripts/base/protocols/redis/pubsub.zeek b/testing/btest/scripts/base/protocols/redis/pubsub.zeek index 15f6d9bb59..fac5d85144 100644 --- a/testing/btest/scripts/base/protocols/redis/pubsub.zeek +++ b/testing/btest/scripts/base/protocols/redis/pubsub.zeek @@ -5,8 +5,12 @@ # @TEST-EXEC: btest-diff output # @TEST-EXEC: btest-diff redis.log -# Testing the example of pub sub in REDIS docs: -# https://redis.io/docs/latest/develop/interact/pubsub/ -# These are just commands between two different clients, one PUBLISH and one SUBSCRIBE +# Test pub/sub from Redis. This has two subscribers, one using a pattern. Then, the +# messages that were published get printed to output. @load base/protocols/redis + +event Redis::server_message(c: connection, data: Redis::ReplyData) + { + print "Got published data!", data; + }