spicy-redis: Separate client/server

This makes the parser more official and splits the client/server out
from each other. Apparently they're different enough to be separate.
This commit is contained in:
Evan Typanski 2024-10-23 11:10:44 -04:00
parent f0f2969a66
commit 757cbbf902
28 changed files with 809 additions and 702 deletions

View file

@ -1,12 +1,12 @@
@load base/protocols/conn/removal-hooks
module RESP;
module Redis;
export {
## Log stream identifier.
redef enum Log::ID += { LOG };
## The ports to register RESP for.
## The ports to register Redis for.
const ports = { 6379/tcp } &redef;
type SetCommand: record {
@ -36,9 +36,18 @@ export {
key: string &log &optional;
## The value, if this command is known to have a value
value: string &log &optional;
## The command in an enum if it was known
known: Redis::KnownCommand &optional;
};
## Record type containing the column fields of the RESP log.
type ServerData: record {
## Was this an error?
err: bool &log;
## The string response, if it was a simple string or error
data: string &log &optional;
};
## Record type containing the column fields of the Redis log.
type Info: record {
## Timestamp for when the activity happened.
ts: time &log;
@ -47,18 +56,34 @@ export {
## The connection's 4-tuple of endpoint addresses/ports.
id: conn_id &log;
## The Redis command
cmd: Command &log;
cmd: Command &log &optional;
## The response for the command
response: ServerData &log &optional;
};
## A default logging policy hook for the stream.
global log_policy: Log::PolicyHook;
## Default hook into RESP logging.
## Default hook into Redis logging.
global log_resp: event(rec: Info);
global finalize_redis: Conn::RemovalHook;
type State: record {
## Pending requests.
pending: table[count] of Info;
## Current request in the pending queue.
current_request: count &default=0;
## Current response in the pending queue.
current_response: count &default=0;
};
}
redef record connection += {
# TODO: Rename
redis_resp: Info &optional;
redis_state: State &optional;
};
redef likely_server_ports += { ports };
@ -69,42 +94,101 @@ redef likely_server_ports += { ports };
#
# function get_file_handle(c: connection, is_orig: bool): string
# {
# return cat(Analyzer::ANALYZER_SPICY_RESP, c$start_time, c$id, is_orig);
# return cat(Analyzer::ANALYZER_SPICY_REDIS, c$start_time, c$id, is_orig);
# }
event zeek_init() &priority=5
{
Log::create_stream(RESP::LOG, [ $columns=Info, $ev=log_resp, $path="resp",
Log::create_stream(Redis::LOG, [ $columns=Info, $ev=log_resp, $path="resp",
$policy=log_policy ]);
Analyzer::register_for_ports(Analyzer::ANALYZER_SPICY_RESP, ports);
Analyzer::register_for_ports(Analyzer::ANALYZER_SPICY_REDIS, ports);
# TODO: To activate the file handle function above, uncomment this.
# Files::register_protocol(Analyzer::ANALYZER_SPICY_RESP, [$get_file_handle=RESP::get_file_handle ]);
# Files::register_protocol(Analyzer::ANALYZER_SPICY_REDIS, [$get_file_handle=Redis::get_file_handle ]);
}
# Initialize logging state.
hook set_session(c: connection, cmd: Command)
function new_redis_session(c: connection): Info
{
if ( c?$redis_resp )
return;
c$redis_resp = Info($ts=network_time(), $uid=c$uid, $id=c$id, $cmd=cmd);
return Info($ts=network_time(), $uid=c$uid, $id=c$id);
}
function emit_log(c: connection)
function set_state(c: connection, is_orig: bool)
{
if ( ! c?$redis_resp )
return;
if ( ! c?$redis_state )
{
local s: State;
c$redis_state = s;
Conn::register_removal_hook(c, finalize_redis);
}
Log::write(RESP::LOG, c$redis_resp);
delete c$redis_resp;
if ( is_orig )
{
if ( c$redis_state$current_request !in c$redis_state$pending )
c$redis_state$pending[c$redis_state$current_request] = new_redis_session(c);
c$redis_resp = c$redis_state$pending[c$redis_state$current_request];
}
else
{
if ( c$redis_state$current_response !in c$redis_state$pending )
c$redis_state$pending[c$redis_state$current_response] = new_redis_session(c);
c$redis_resp = c$redis_state$pending[c$redis_state$current_response];
}
}
event RESP::command(c: connection, is_orig: bool, command: Command)
event Redis::command(c: connection, is_orig: bool, command: Command)
{
hook set_session(c, command);
#hook set_session(c, command);
local info = c$redis_resp;
emit_log(c);
# TODO: We need to care about whether the reply was suppressed with
# CLIENT REPLY [OFF|SKIP]
#local info = c$redis_resp;
#emit_log(c);
# TODO refactor this since it's used a couple times
if ( ! c?$redis_state )
{
local s: State;
c$redis_state = s;
Conn::register_removal_hook(c, finalize_redis);
}
++c$redis_state$current_request;
set_state(c, T);
c$redis_resp$cmd = command;
}
event Redis::server_data(c: connection, is_orig: bool, data: ServerData)
{
if ( ! c?$redis_state )
{
local s: State;
c$redis_state = s;
Conn::register_removal_hook(c, finalize_redis);
}
++c$redis_state$current_response;
set_state(c, F);
c$redis_resp$response = data;
# TODO: Do stuff with pending so that finalize_redis and pipelining work
Log::write(Redis::LOG, c$redis_resp);
delete c$redis_state$pending[c$redis_state$current_response];
}
hook finalize_redis(c: connection)
{
# Flush all pending but incomplete request/response pairs.
if ( c?$redis_state )
{
for ( r, info in c$redis_state$pending )
{
# We don't use pending elements at index 0.
if ( r == 0 ) next;
#Log::write(HTTP::LOG, info);
Log::write(Redis::LOG, info);
#delete c$redis_resp;
}
}
}