mirror of
https://github.com/zeek/zeek.git
synced 2025-10-11 11:08:20 +00:00
Add JSON storage serializer, use with existing backends/tests
This commit is contained in:
parent
201d4508e6
commit
88786a28a2
18 changed files with 161 additions and 59 deletions
|
@ -254,24 +254,30 @@ OperationResult Redis::DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool
|
|||
|
||||
auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex);
|
||||
|
||||
std::string format = "SET %s:%s %s";
|
||||
std::string format = "SET %s:%b %b";
|
||||
if ( ! overwrite )
|
||||
format.append(" NX");
|
||||
|
||||
auto json_key = key->ToJSON()->ToStdString();
|
||||
auto json_value = value->ToJSON()->ToStdString();
|
||||
auto key_data = serializer->Serialize(key);
|
||||
if ( ! key_data )
|
||||
return {ReturnCode::SERIALIZATION_FAILED, "Failed to serialize key"};
|
||||
|
||||
auto val_data = serializer->Serialize(value);
|
||||
if ( ! val_data )
|
||||
return {ReturnCode::SERIALIZATION_FAILED, "Failed to serialize value"};
|
||||
|
||||
int status;
|
||||
// Use built-in expiration if reading live data, since time will move
|
||||
// forward consistently. If reading pcaps, we'll do something else.
|
||||
if ( expiration_time > 0.0 && ! zeek::run_state::reading_traces ) {
|
||||
format.append(" PXAT %" PRIu64);
|
||||
status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), json_key.data(),
|
||||
json_value.data(), static_cast<uint64_t>(expiration_time * 1e3));
|
||||
status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), key_data->data(),
|
||||
key_data->size(), val_data->data(), val_data->size(),
|
||||
static_cast<uint64_t>(expiration_time * 1e3));
|
||||
}
|
||||
else
|
||||
status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), json_key.data(),
|
||||
json_value.data());
|
||||
status = redisAsyncCommand(async_ctx, redisPut, cb, format.c_str(), key_prefix.data(), key_data->data(),
|
||||
key_data->size(), val_data->data(), val_data->size());
|
||||
|
||||
if ( connected && status == REDIS_ERR )
|
||||
return {ReturnCode::OPERATION_FAILED, util::fmt("Failed to queue put operation: %s", async_ctx->errstr)};
|
||||
|
@ -284,10 +290,10 @@ OperationResult Redis::DoPut(ResultCallback* cb, ValPtr key, ValPtr value, bool
|
|||
format = "ZADD %s_expire";
|
||||
if ( ! overwrite )
|
||||
format.append(" NX");
|
||||
format += " %f %s";
|
||||
format += " %f %b";
|
||||
|
||||
status = redisAsyncCommand(async_ctx, redisZADD, NULL, format.c_str(), key_prefix.data(), expiration_time,
|
||||
json_key.data());
|
||||
key_data->data(), key_data->size());
|
||||
if ( connected && status == REDIS_ERR )
|
||||
return {ReturnCode::OPERATION_FAILED, util::fmt("ZADD operation failed: %s", async_ctx->errstr)};
|
||||
|
||||
|
@ -307,8 +313,12 @@ OperationResult Redis::DoGet(ResultCallback* cb, ValPtr key) {
|
|||
|
||||
auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex);
|
||||
|
||||
int status = redisAsyncCommand(async_ctx, redisGet, cb, "GET %s:%s", key_prefix.data(),
|
||||
key->ToJSON()->ToStdStringView().data());
|
||||
auto key_data = serializer->Serialize(key);
|
||||
if ( ! key_data )
|
||||
return {ReturnCode::SERIALIZATION_FAILED, "Failed to serialize key"};
|
||||
|
||||
int status =
|
||||
redisAsyncCommand(async_ctx, redisGet, cb, "GET %s:%b", key_prefix.data(), key_data->data(), key_data->size());
|
||||
|
||||
if ( connected && status == REDIS_ERR )
|
||||
return {ReturnCode::OPERATION_FAILED, util::fmt("Failed to queue get operation: %s", async_ctx->errstr)};
|
||||
|
@ -330,8 +340,12 @@ OperationResult Redis::DoErase(ResultCallback* cb, ValPtr key) {
|
|||
|
||||
auto locked_scope = conditionally_lock(zeek::run_state::reading_traces, expire_mutex);
|
||||
|
||||
int status = redisAsyncCommand(async_ctx, redisErase, cb, "DEL %s:%s", key_prefix.data(),
|
||||
key->ToJSON()->ToStdStringView().data());
|
||||
auto key_data = serializer->Serialize(key);
|
||||
if ( ! key_data )
|
||||
return {ReturnCode::SERIALIZATION_FAILED, "Failed to serialize key"};
|
||||
|
||||
int status = redisAsyncCommand(async_ctx, redisErase, cb, "DEL %s:%b", key_prefix.data(), key_data->data(),
|
||||
key_data->size());
|
||||
|
||||
if ( connected && status == REDIS_ERR )
|
||||
return {ReturnCode::OPERATION_FAILED, async_ctx->errstr};
|
||||
|
@ -439,7 +453,7 @@ void Redis::HandleGetResult(redisReply* reply, ResultCallback* callback) {
|
|||
else if ( reply->type == REDIS_REPLY_ERROR )
|
||||
res = ParseReplyError("get", reply->str);
|
||||
else {
|
||||
auto val = zeek::detail::ValFromJSON(reply->str, val_type, Func::nil);
|
||||
auto val = serializer->Unserialize({(std::byte*)reply->str, reply->len}, val_type);
|
||||
if ( val )
|
||||
res = {ReturnCode::SUCCESS, "", val.value()};
|
||||
else
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue