logging/Manager: Implement DelayTokenType as an actual opaque

With a bit of tweaking in the JavaScript plugin to support opaque types, this
will allow the delay functionality to work there, too.

Making the LogDelayToken an actual opaque seems reasonable, too. It's not
supposed to be user inspected.
This commit is contained in:
Arne Welzel 2023-11-27 18:18:08 +01:00
parent 2dbb467ba2
commit 5e046eee58
13 changed files with 130 additions and 32 deletions

View file

@ -13,6 +13,7 @@
#include "zeek/Func.h"
#include "zeek/IntrusivePtr.h"
#include "zeek/NetVar.h"
#include "zeek/OpaqueVal.h"
#include "zeek/RunState.h"
#include "zeek/Type.h"
#include "zeek/broker/Manager.h"
@ -28,6 +29,12 @@
using namespace std;
namespace zeek::detail {
extern zeek::OpaqueTypePtr log_delay_token_type;
};
namespace zeek::logging {
namespace detail {
@ -37,10 +44,43 @@ class DelayInfo;
using DelayInfoPtr = std::shared_ptr<DelayInfo>;
constexpr zeek::TypeTag ZeekDelayTokenTypeTag = TYPE_COUNT;
class LogDelayTokenVal : public OpaqueVal {
public:
explicit LogDelayTokenVal(DelayTokenType token) : OpaqueVal(zeek::detail::log_delay_token_type), token(token) {}
virtual ~LogDelayTokenVal() = default;
ValPtr DoClone(CloneState* state) override;
DelayTokenType Token() { return token; }
protected:
explicit LogDelayTokenVal() : LogDelayTokenVal(0) {}
DECLARE_OPAQUE_VALUE(LogDelayTokenVal)
private:
DelayTokenType token;
};
ValPtr LogDelayTokenVal::DoClone(CloneState* state) {
return state->NewClone(this, make_intrusive<LogDelayTokenVal>(Token()));
}
// Delay tokens are only valid on the same worker.
broker::expected<broker::data> LogDelayTokenVal::DoSerialize() const {
return broker::make_error(broker::ec::invalid_data, "cannot serialize delay tokens");
}
bool LogDelayTokenVal::DoUnserialize(const broker::data&) { return false; }
IMPLEMENT_OPAQUE_VALUE(LogDelayTokenVal)
DelayTokenType to_internal_delay_token(const ValPtr& val) {
assert(val->GetType()->Tag() == ZeekDelayTokenTypeTag);
return val->AsCount();
assert(val->GetType()->Tag() == TYPE_OPAQUE);
if ( auto* optr = dynamic_cast<LogDelayTokenVal*>(val.get()) )
return optr->Token();
zeek::reporter->Error("dynamic_cast of LogDelayToken failed: wrong opaque type provided?");
return 0;
}
using DelayWriteMap = std::map<WriteContext, DelayInfoPtr>;
@ -1151,12 +1191,12 @@ bool Manager::WriteToFilters(const Manager::Stream* stream, zeek::RecordValPtr c
ValPtr Manager::Delay(const EnumValPtr& id, const RecordValPtr record, FuncPtr post_delay_cb) {
if ( active_writes.size() == 0 ) {
reporter->Error("invalid Log::delay() call: no active write context available");
return zeek::val_mgr->Count(0);
return make_intrusive<detail::LogDelayTokenVal>();
}
const auto& active_write_ctx = active_writes.back();
if ( active_write_ctx.id != id || active_write_ctx.record != record ) {
reporter->Error("invalid Log::delay() call: active write mismatch");
return zeek::val_mgr->Count(0);
return make_intrusive<detail::LogDelayTokenVal>();
}
DBG_LOG(DBG_LOGGING, "Delay() for %p RefCnt=%d", record.get(), record->RefCnt());
@ -1175,7 +1215,7 @@ ValPtr Manager::Delay(const EnumValPtr& id, const RecordValPtr record, FuncPtr p
// This is the first time this Log::write() is delayed, allocate a
// new token an return it to script land.
detail::DelayTokenType token = ++last_delay_token;
token_val = zeek::val_mgr->Count(token);
token_val = zeek::make_intrusive<detail::LogDelayTokenVal>(token);
double expire_time = run_state::network_time + stream->max_delay_interval;
auto new_delay_info = std::make_shared<detail::DelayInfo>(active_write_ctx, token_val, expire_time);
if ( post_delay_cb )
@ -1201,12 +1241,22 @@ bool Manager::DelayFinish(const EnumValPtr& id, const RecordValPtr& record, cons
if ( ! stream )
return false;
if ( token_val->GetType()->Tag() != detail::ZeekDelayTokenTypeTag ) {
if ( token_val->GetType()->Tag() != TYPE_OPAQUE ) {
reporter->Error("invalid delay token type %s", zeek::type_name(token_val->GetType()->Tag()));
return false;
}
const auto& token_type_name = token_val->GetType<zeek::OpaqueType>()->Name();
if ( token_type_name != std::string_view{"LogDelayToken"} ) {
reporter->Error("invalid Log::delay_finish() call: wrong opaque token type: %s", token_type_name.c_str());
return false;
}
detail::DelayTokenType token = detail::to_internal_delay_token(token_val);
if ( token == 0 ) {
reporter->Error("invalid Log::delay_finish() call: invalid token provided");
return false;
}
DBG_LOG(DBG_LOGGING, "DelayFinish() for %p RefCnt=%d token=%ld", record.get(), record->RefCnt(), token);
const auto& it = stream->delay_tokens.find(token);