Merge remote-tracking branch 'origin/master' into topic/jsiwek/gh-893-intrusive-ptr-migration

This commit is contained in:
Jon Siwek 2020-05-21 14:44:55 -07:00
commit 2cbf36721c
35 changed files with 594 additions and 387 deletions

View file

@ -37,7 +37,7 @@ public:
/**
* Returns a descriptive name for the thread. If not set via
* SetName(). If not set, a default name is choosen automatically.
* SetName(), a default name is chosen automatically.
*
* This method is safe to call from any thread.
*/

View file

@ -5,6 +5,7 @@
#include "NetVar.h"
#include "iosource/Manager.h"
#include "Event.h"
using namespace threading;
@ -128,6 +129,60 @@ void Manager::StartHeartbeatTimer()
timer_mgr->Add(new HeartbeatTimer(network_time + zeek::BifConst::Threading::heartbeat_interval));
}
// Raise everything in here as warnings so it is passed to scriptland without
// looking "fatal". In addition to these warnings, ReaderBackend will queue
// one reporter message.
bool Manager::SendEvent(MsgThread* thread, const std::string& name, const int num_vals, Value* *vals) const
{
EventHandler* handler = event_registry->Lookup(name);
if ( handler == nullptr )
{
reporter->Warning("Thread %s: Event %s not found", thread->Name(), name.c_str());
Value::delete_value_ptr_array(vals, num_vals);
return false;
}
#ifdef DEBUG
DBG_LOG(DBG_INPUT, "Thread %s: SendEvent for event %s with %d vals",
thread->Name(), name.c_str(), num_vals);
#endif
const auto& type = handler->GetType()->Params();
int num_event_vals = type->NumFields();
if ( num_vals != num_event_vals )
{
reporter->Warning("Thread %s: Wrong number of values for event %s", thread->Name(), name.c_str());
Value::delete_value_ptr_array(vals, num_vals);
return false;
}
bool convert_error = false;
zeek::Args vl;
vl.reserve(num_vals);
for ( int j = 0; j < num_vals; j++)
{
Val* v = Value::ValueToVal(std::string("thread ") + thread->Name(), vals[j], convert_error);
vl.emplace_back(AdoptRef{}, v);
if ( v && ! convert_error && ! same_type(type->GetFieldType(j).get(), v->GetType().get()) )
{
convert_error = true;
type->GetFieldType(j)->Error("SendEvent types do not match", v->GetType().get());
}
}
Value::delete_value_ptr_array(vals, num_vals);
if ( convert_error )
return false;
else if ( handler )
mgr.Enqueue(handler, std::move(vl), SOURCE_LOCAL);
return true;
}
void Manager::Flush()
{
bool do_beat = false;

View file

@ -88,6 +88,18 @@ public:
*/
void KillThreads();
/**
* Allows threads to directly send Zeek events. The num_vals and vals must be
* the same the named event expects. Takes ownership of threading::Value fields.
*
* @param thread Thread raising the event
* @param name Name of event being raised
* @param num_vals Number of values passed to the event
* @param vals Values passed to the event
* @returns True on success false on failure.
*/
bool SendEvent(MsgThread* thread, const std::string& name, const int num_vals, threading::Value* *vals) const;
protected:
friend class BasicThread;
friend class MsgThread;

View file

@ -126,6 +126,31 @@ private:
}
// An event that the child wants to pass into the main event queue
class SendEventMessage final : public OutputMessage<MsgThread> {
public:
SendEventMessage(MsgThread* thread, const char* name, const int num_vals, Value* *val)
: OutputMessage<MsgThread>("SendEvent", thread),
name(copy_string(name)), num_vals(num_vals), val(val) {}
~SendEventMessage() override { delete [] name; }
bool Process() override
{
bool success = thread_mgr->SendEvent(Object(), name, num_vals, val);
if ( ! success )
reporter->Error("SendEvent for event %s failed", name);
return true; // We do not want to die if sendEvent fails because the event did not return.
}
private:
const char* name;
const int num_vals;
Value* *val;
};
////// Methods.
Message::~Message()
@ -363,6 +388,11 @@ void MsgThread::SendOut(BasicOutputMessage* msg, bool force)
flare.Fire();
}
void MsgThread::SendEvent(const char* name, const int num_vals, Value* *vals)
{
SendOut(new SendEventMessage(this, name, num_vals, vals));
}
BasicOutputMessage* MsgThread::RetrieveOut()
{
BasicOutputMessage* msg = queue_out.Get();

View file

@ -60,6 +60,18 @@ public:
*/
void SendOut(BasicOutputMessage* msg) { return SendOut(msg, false); }
/**
* Allows the child thread to send a specified Zeek event. The given Vals
* must match the values expected by the event.
*
* @param name name of the bro event to send
*
* @param num_vals number of entries in \a vals
*
* @param vals the values to be given to the event
*/
void SendEvent(const char* name, const int num_vals, threading::Value* *vals);
/**
* Reports an informational message from the child thread. The main
* thread will pass this to the Reporter once received.
@ -393,7 +405,7 @@ protected:
};
/**
* A paremeterized InputMessage that stores a pointer to an argument object.
* A parameterized InputMessage that stores a pointer to an argument object.
* Normally, the objects will be used from the Process() callback.
*/
template<typename O>

View file

@ -4,6 +4,14 @@
#include "SerialTypes.h"
#include "SerializationFormat.h"
#include "Reporter.h"
// The following are required for ValueToVal.
#include "Val.h"
#include "BroString.h"
#include "RE.h"
#include "module_util.h"
#include "ID.h"
#include "Expr.h"
#include "Scope.h"
using namespace threading;
@ -346,7 +354,6 @@ bool Value::Write(SerializationFormat* fmt) const
case IPv6:
return fmt->Write((char)6, "addr-family")
&& fmt->Write(val.addr_val.in.in6, "addr-in6");
break;
}
// Can't be reached.
@ -366,7 +373,6 @@ bool Value::Write(SerializationFormat* fmt) const
case IPv6:
return fmt->Write((char)6, "subnet-family")
&& fmt->Write(val.subnet_val.prefix.in.in6, "subnet-in6");
break;
}
// Can't be reached.
@ -417,5 +423,212 @@ bool Value::Write(SerializationFormat* fmt) const
type_name(type));
}
// unreachable
return false;
}
void Value::delete_value_ptr_array(Value** vals, int num_fields)
{
for ( int i = 0; i < num_fields; ++i )
delete vals[i];
delete [] vals;
}
Val* Value::ValueToVal(const std::string& source, const Value* val, bool& have_error)
{
if ( have_error )
return nullptr;
if ( ! val->present )
return nullptr; // unset field
switch ( val->type ) {
case TYPE_BOOL:
return val_mgr->Bool(val->val.int_val)->Ref();
case TYPE_INT:
return val_mgr->Int(val->val.int_val).release();
case TYPE_COUNT:
case TYPE_COUNTER:
return val_mgr->Count(val->val.int_val).release();
case TYPE_DOUBLE:
case TYPE_TIME:
case TYPE_INTERVAL:
return new Val(val->val.double_val, val->type);
case TYPE_STRING:
{
BroString *s = new BroString((const u_char*)val->val.string_val.data, val->val.string_val.length, true);
return new StringVal(s);
}
case TYPE_PORT:
return val_mgr->Port(val->val.port_val.port, val->val.port_val.proto)->Ref();
case TYPE_ADDR:
{
IPAddr* addr = nullptr;
switch ( val->val.addr_val.family ) {
case IPv4:
addr = new IPAddr(val->val.addr_val.in.in4);
break;
case IPv6:
addr = new IPAddr(val->val.addr_val.in.in6);
break;
default:
assert(false);
}
AddrVal* addrval = new AddrVal(*addr);
delete addr;
return addrval;
}
case TYPE_SUBNET:
{
IPAddr* addr = nullptr;
switch ( val->val.subnet_val.prefix.family ) {
case IPv4:
addr = new IPAddr(val->val.subnet_val.prefix.in.in4);
break;
case IPv6:
addr = new IPAddr(val->val.subnet_val.prefix.in.in6);
break;
default:
assert(false);
}
SubNetVal* subnetval = new SubNetVal(*addr, val->val.subnet_val.length);
delete addr;
return subnetval;
}
case TYPE_PATTERN:
{
RE_Matcher* re = new RE_Matcher(val->val.pattern_text_val);
re->Compile();
return new PatternVal(re);
}
case TYPE_TABLE:
{
IntrusivePtr<TypeList> set_index;
if ( val->val.set_val.size == 0 && val->subtype == TYPE_VOID )
// don't know type - unspecified table.
set_index = make_intrusive<TypeList>();
else
{
// all entries have to have the same type...
TypeTag stag = val->subtype;
if ( stag == TYPE_VOID )
TypeTag stag = val->val.set_val.vals[0]->type;
IntrusivePtr<BroType> index_type;
if ( stag == TYPE_ENUM )
{
// Enums are not a base-type, so need to look it up.
const auto& sv = val->val.set_val.vals[0]->val.string_val;
std::string enum_name(sv.data, sv.length);
const auto& enum_id = global_scope()->Find(enum_name);
if ( ! enum_id )
{
reporter->Warning("Value '%s' of source '%s' is not a valid enum.",
enum_name.data(), source.c_str());
have_error = true;
return nullptr;
}
index_type = enum_id->GetType();
}
else
index_type = base_type(stag);
set_index = make_intrusive<TypeList>(index_type);
set_index->Append(std::move(index_type));
}
auto s = make_intrusive<SetType>(std::move(set_index), nullptr);
TableVal* t = new TableVal(std::move(s));
for ( int j = 0; j < val->val.set_val.size; j++ )
{
Val* assignval = ValueToVal(source, val->val.set_val.vals[j], have_error);
t->Assign({AdoptRef{}, assignval}, nullptr);
}
return t;
}
case TYPE_VECTOR:
{
IntrusivePtr<BroType> type;
if ( val->val.vector_val.size == 0 && val->subtype == TYPE_VOID )
// don't know type - unspecified table.
type = base_type(TYPE_ANY);
else
{
// all entries have to have the same type...
if ( val->subtype == TYPE_VOID )
type = base_type(val->val.vector_val.vals[0]->type);
else
type = base_type(val->subtype);
}
auto vt = make_intrusive<VectorType>(std::move(type));
auto v = make_intrusive<VectorVal>(std::move(vt));
for ( int j = 0; j < val->val.vector_val.size; j++ )
v->Assign(j, ValueToVal(source, val->val.vector_val.vals[j], have_error));
return v.release();
}
case TYPE_ENUM: {
// Convert to string first to not have to deal with missing
// \0's...
std::string enum_string(val->val.string_val.data, val->val.string_val.length);
// let's try looking it up by global ID.
const auto& id = lookup_ID(enum_string.c_str(), GLOBAL_MODULE_NAME);
if ( ! id || ! id->IsEnumConst() )
{
reporter->Warning("Value '%s' for source '%s' is not a valid enum.",
enum_string.c_str(), source.c_str());
have_error = true;
return nullptr;
}
EnumType* t = id->GetType()->AsEnumType();
int intval = t->Lookup(id->ModuleName(), id->Name());
if ( intval < 0 )
{
reporter->Warning("Enum value '%s' for source '%s' not found.",
enum_string.c_str(), source.c_str());
have_error = true;
return nullptr;
}
auto rval = t->GetVal(intval);
return rval.release();
}
default:
reporter->InternalError("Unsupported type in SerialTypes::ValueToVal from source %s", source.c_str());
}
assert(false);
return nullptr;
}

View file

@ -187,6 +187,26 @@ struct Value {
* method is thread-safe. */
static bool IsCompatibleType(BroType* t, bool atomic_only=false);
/**
* Convenience function to delete an array of value pointers.
* @param vals Array of values
* @param num_fields Number of members
*/
static void delete_value_ptr_array(Value** vals, int num_fields);
/**
* Convert threading::Value to an internal Zeek type, just using the information given in the threading::Value.
*
* @param source Name of the source of this threading value. This is used for warnings that are raised
* in case an error occurs.
* @param val Threading Value to convert to a Zeek Val.
* @param have_error Reference to a boolean. This should be set to false when passed in and is set to true
* in case an error occurs. If this is set to false when the function is called, the function
* immediately aborts.
* @return Val representation of the threading::Value. nullptr on error.
*/
static Val* ValueToVal(const std::string& source, const threading::Value* val, bool& have_error);
private:
friend class ::IPAddr;
Value(const Value& other) { } // Disabled.