mirror of
https://github.com/zeek/zeek.git
synced 2025-10-02 14:48:21 +00:00
BrokerStore<->Zeek tables: load persistent tables on startup.
This currently only handles the most basic case, and is not thoroughly tested. When initializing a master store, we now check if there already is data in it. If yes, we load it directly into the zeek table when the store is created. We assume that this is happening at Zeek startup - and are supremely evil and just load it synchronously. Which could block execution for a bit for larger stores. That being said - this might sidestep other issues that would arise when doing this async (like scripts already inserting data). Next step: check if this approach also works for clones.
This commit is contained in:
parent
38a3d67643
commit
b9fe79c697
5 changed files with 180 additions and 5 deletions
|
@ -1608,15 +1608,66 @@ StoreHandleVal* Manager::MakeMaster(const string& name, broker::backend type,
|
||||||
iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this);
|
iosource_mgr->RegisterFd(handle->proxy.mailbox().descriptor(), this);
|
||||||
CheckForwarding(name);
|
CheckForwarding(name);
|
||||||
|
|
||||||
if ( bstate->endpoint.use_real_time() )
|
if ( ! bstate->endpoint.use_real_time() )
|
||||||
return handle;
|
|
||||||
|
|
||||||
// Wait for master to become available/responsive.
|
// Wait for master to become available/responsive.
|
||||||
// Possibly avoids timeouts in scripts during unit tests.
|
// Possibly avoids timeouts in scripts during unit tests.
|
||||||
handle->store.exists("");
|
handle->store.exists("");
|
||||||
|
|
||||||
|
BrokerStoreToZeekTable(name, handle);
|
||||||
|
|
||||||
return handle;
|
return handle;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Manager::BrokerStoreToZeekTable(const std::string& name, const StoreHandleVal* handle)
|
||||||
|
{
|
||||||
|
// consider if it might be wise to disable &on_change while filling the table
|
||||||
|
if ( ! handle->forward_to )
|
||||||
|
return;
|
||||||
|
|
||||||
|
auto keys = handle->store.keys();
|
||||||
|
if ( ! keys )
|
||||||
|
return;
|
||||||
|
|
||||||
|
auto set = caf::get_if<broker::set>(&(keys->get_data()));
|
||||||
|
auto table = handle->forward_to;
|
||||||
|
const auto& its = table->GetType()->AsTableType()->IndexTypes();
|
||||||
|
bool is_set = table->GetType()->IsSet();
|
||||||
|
assert( its.size() == 1 );
|
||||||
|
for ( const auto key : *set )
|
||||||
|
{
|
||||||
|
auto zeek_key = data_to_val(key, its[0].get());
|
||||||
|
if ( ! zeek_key )
|
||||||
|
{
|
||||||
|
reporter->Error("Failed to convert key %s while importing broker store to table for store %s. Aborting import.", to_string(key).c_str(), name.c_str());
|
||||||
|
// just abort - this probably means the types are incompatible
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( is_set )
|
||||||
|
{
|
||||||
|
table->Assign(zeek_key, nullptr, false);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto value = handle->store.get(key);
|
||||||
|
if ( ! value )
|
||||||
|
{
|
||||||
|
reporter->Error("Failed to load value for key %s while importing broker store %s to table", to_string(key).c_str(), name.c_str());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto zeek_value = data_to_val(*value, table->GetType()->Yield().get());
|
||||||
|
if ( ! zeek_value )
|
||||||
|
{
|
||||||
|
reporter->Error("Could not convert %s to table value while trying to import broker store %s. Aborting import.", to_string(value).c_str(), name.c_str());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
table->Assign(zeek_key, zeek_value, false);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
StoreHandleVal* Manager::MakeClone(const string& name, double resync_interval,
|
StoreHandleVal* Manager::MakeClone(const string& name, double resync_interval,
|
||||||
double stale_interval,
|
double stale_interval,
|
||||||
double mutation_buffer_interval)
|
double mutation_buffer_interval)
|
||||||
|
|
|
@ -355,7 +355,11 @@ private:
|
||||||
void ProcessError(broker::error err);
|
void ProcessError(broker::error err);
|
||||||
void ProcessStoreResponse(StoreHandleVal*, broker::store::response response);
|
void ProcessStoreResponse(StoreHandleVal*, broker::store::response response);
|
||||||
void FlushPendingQueries();
|
void FlushPendingQueries();
|
||||||
|
// Check if a broker store is associated to a table on the Zeek side.
|
||||||
void CheckForwarding(const std::string& name);
|
void CheckForwarding(const std::string& name);
|
||||||
|
// Send the content of a broker store to the backing table. This is typically used
|
||||||
|
// when a master/clone is created.
|
||||||
|
void BrokerStoreToZeekTable(const std::string& name, const StoreHandleVal* handle);
|
||||||
|
|
||||||
void Error(const char* format, ...)
|
void Error(const char* format, ...)
|
||||||
__attribute__((format (printf, 2, 3)));
|
__attribute__((format (printf, 2, 3)));
|
||||||
|
|
|
@ -0,0 +1,20 @@
|
||||||
|
{
|
||||||
|
[b] = 3,
|
||||||
|
[whatever] = 5,
|
||||||
|
[a] = 5
|
||||||
|
}
|
||||||
|
{
|
||||||
|
I am really a set!,
|
||||||
|
Believe me - I am a set,
|
||||||
|
I am a set!
|
||||||
|
}
|
||||||
|
{
|
||||||
|
[b] = [a=2, b=d, c={
|
||||||
|
elem1,
|
||||||
|
elem2
|
||||||
|
}],
|
||||||
|
[a] = [a=1, b=c, c={
|
||||||
|
elem1,
|
||||||
|
elem2
|
||||||
|
}]
|
||||||
|
}
|
|
@ -0,0 +1,20 @@
|
||||||
|
{
|
||||||
|
[b] = 3,
|
||||||
|
[whatever] = 5,
|
||||||
|
[a] = 5
|
||||||
|
}
|
||||||
|
{
|
||||||
|
I am really a set!,
|
||||||
|
Believe me - I am a set,
|
||||||
|
I am a set!
|
||||||
|
}
|
||||||
|
{
|
||||||
|
[b] = [a=2, b=d, c={
|
||||||
|
elem1,
|
||||||
|
elem2
|
||||||
|
}],
|
||||||
|
[a] = [a=1, b=c, c={
|
||||||
|
elem1,
|
||||||
|
elem2
|
||||||
|
}]
|
||||||
|
}
|
80
testing/btest/broker/store/brokerstore-attr-persistence.zeek
Normal file
80
testing/btest/broker/store/brokerstore-attr-persistence.zeek
Normal file
|
@ -0,0 +1,80 @@
|
||||||
|
# @TEST-PORT: BROKER_PORT
|
||||||
|
|
||||||
|
# @TEST-EXEC: zeek -B broker -b one.zeek > output1
|
||||||
|
# @TEST-EXEC: zeek -B broker -b two.zeek > output2
|
||||||
|
# @TEST-EXEC: btest-diff output1
|
||||||
|
# @TEST-EXEC: btest-diff output2
|
||||||
|
# @TEST-EXEC: diff output1 output2
|
||||||
|
|
||||||
|
# the first test writes out the sqlite files...
|
||||||
|
|
||||||
|
@TEST-START-FILE one.zeek
|
||||||
|
|
||||||
|
module TestModule;
|
||||||
|
|
||||||
|
global tablestore: opaque of Broker::Store;
|
||||||
|
global setstore: opaque of Broker::Store;
|
||||||
|
global recordstore: opaque of Broker::Store;
|
||||||
|
|
||||||
|
type testrec: record {
|
||||||
|
a: count;
|
||||||
|
b: string;
|
||||||
|
c: set[string];
|
||||||
|
};
|
||||||
|
|
||||||
|
global t: table[string] of count &broker_store="table";
|
||||||
|
global s: set[string] &broker_store="set";
|
||||||
|
global r: table[string] of testrec &broker_store="rec";
|
||||||
|
|
||||||
|
event zeek_init()
|
||||||
|
{
|
||||||
|
tablestore = Broker::create_master("table", Broker::SQLITE);
|
||||||
|
setstore = Broker::create_master("set", Broker::SQLITE);
|
||||||
|
recordstore = Broker::create_master("rec", Broker::SQLITE);
|
||||||
|
t["a"] = 5;
|
||||||
|
t["b"] = 3;
|
||||||
|
t["c"] = 4;
|
||||||
|
t["whatever"] = 5;
|
||||||
|
delete t["c"];
|
||||||
|
add s["I am a set!"];
|
||||||
|
add s["I am really a set!"];
|
||||||
|
add s["Believe me - I am a set"];
|
||||||
|
r["a"] = testrec($a=1, $b="b", $c=set("elem1", "elem2"));
|
||||||
|
r["a"] = testrec($a=1, $b="c", $c=set("elem1", "elem2"));
|
||||||
|
r["b"] = testrec($a=2, $b="d", $c=set("elem1", "elem2"));
|
||||||
|
print t;
|
||||||
|
print s;
|
||||||
|
print r;
|
||||||
|
}
|
||||||
|
|
||||||
|
@TEST-END-FILE
|
||||||
|
@TEST-START-FILE two.zeek
|
||||||
|
|
||||||
|
# the second one reads them in again
|
||||||
|
|
||||||
|
module TestModule;
|
||||||
|
|
||||||
|
global tablestore: opaque of Broker::Store;
|
||||||
|
global setstore: opaque of Broker::Store;
|
||||||
|
global recordstore: opaque of Broker::Store;
|
||||||
|
|
||||||
|
type testrec: record {
|
||||||
|
a: count;
|
||||||
|
b: string;
|
||||||
|
c: set[string];
|
||||||
|
};
|
||||||
|
|
||||||
|
global t: table[string] of count &broker_store="table";
|
||||||
|
global s: set[string] &broker_store="set";
|
||||||
|
global r: table[string] of testrec &broker_store="rec";
|
||||||
|
|
||||||
|
event zeek_init()
|
||||||
|
{
|
||||||
|
tablestore = Broker::create_master("table", Broker::SQLITE);
|
||||||
|
setstore = Broker::create_master("set", Broker::SQLITE);
|
||||||
|
recordstore = Broker::create_master("rec", Broker::SQLITE);
|
||||||
|
print t;
|
||||||
|
print s;
|
||||||
|
print r;
|
||||||
|
}
|
||||||
|
@TEST-END-FILE
|
Loading…
Add table
Add a link
Reference in a new issue