同一个 CC 中的账号,在时间上本来就具有很高程度的协同,也就是说,针对 u1u2u3u4 的查询,很有可能是在几乎相同的时间过来的。此时 u1 的查询还没有完全结束,u2u3u4 的查询就已经开始,缓存没有起到作用。
在 CC 计算完成之后,需要将 CC 大小信息写入到该 CC 下的所有节点中,会拖慢响应时间。如果再遇到 1 中所述情况,则短期会面临更大的写入风暴。
另一种常见的思路,是将计算与查询分开。在后台常驻一个进程,用来负责循环更新全图所有节点的 CC 信息,查询则只负责返回缓存结果。
1. 每次更新一个 batch
该 query 每次选取上次更新时间距离当前时间最远的一批节点,然后使用 FOREACH,每次从中挑选 1 个节点,更新这个节点以及该节点所在 CC 的其他节点的 cc_size 。详细可以查看如下代码以及注释。
update_cc_size_in_batch.gsql
CREATE QUERY update_cc_size_in_batch_v2(
INT batch_size=100,
DATETIME start_date_time,
DATETIME end_date_time
) FOR GRAPH MyGraph {
OrAccum<BOOL> @visited;
MaxAccum<DATETIME> @cc_update_time;
MapAccum<VERTEX, BOOL> @@vertex_update_status;
SetAccum<VERTEX> @@batch_accounts;
MinAccum<DATETIME> @@last_update_time;
INT num_cc_updated = 0;
INT num_vertices_updated = 0;
all_accounts = {Account.*};
/*
按照 cc_update_time 排序,选取 cc 信息“最老”的一批节点
*/
samples =
SELECT t
FROM all_accounts:t
ORDER BY t.cc_update_time ASC
LIMIT batch_size
;
samples =
SELECT t
FROM samples:t
POST-ACCUM
@@vertex_update_status += (t -> FALSE),
@@batch_accounts += t,
@@last_update_time += t.cc_update_time
;
/*
每次从 1 个节点出发,找到这个节点所在 CC 的所有节点,将 cc_size
信息更新到每个节点中
*/
FOREACH account IN @@batch_accounts DO
/*
如果当前账号和前面循环中更新的任何一个账号属于同一个 cc,
那么这个账号无须再更新
*/
IF @@vertex_update_status.get(account) == TRUE THEN
CONTINUE;
END;
seed = {account};
comp_vs = seed;
WHILE seed.size() > 0 DO
seed =
SELECT t
FROM seed -(co_ip:e)-> Account:t
WHERE
(t.@visited == FALSE) AND
(e.create_time BETWEEN start_date_time AND end_date_time)
POST-ACCUM
t.@visited = TRUE,
@@vertex_update_status += (t -> TRUE)
;
comp_vs = comp_vs UNION seed;
END;
UPDATE s FROM comp_vs:s
SET s.cc_size = comp_vs.size(),
s.cc_update_time = now()
;
num_cc_updated = num_cc_updated + 1;
num_vertices_updated = num_vertices_updated + comp_vs.size();
END;
PRINT num_cc_updated,
num_vertices_updated,
@@last_update_time AS last_update_time,
now() AS current_time
;
}
最后我们将打印出本轮更新涉及到的 CC 数量,节点数量,以及本轮更新之前,该批次节点中,"最早的 CC 更新时间",以及 TigerGraph 系统当前时间。如果"最早的CC更新时间"与当前时间差异很小,说明数据库中所有节点的 cc_size 信息,都很新鲜,此时可以考虑降低该Query的调用频次,如果差异很大,则可以立马继续调用该 Query。
注意到 query 中有两个参数,start_date_time 与 end_date_time,这两个参数用来筛选出在这个时间区间内创建的边,这样可以避免一个 CC 无限增长下去。
上述方法,每次取样一批节点,更新这些节点所处的 CC 中所有节点的 cc_size 信息,通过多次调用,以更新所有节点的 cc_size。由于每一次调用时取样的节点不同,更新所涉及的节点数量也不同,因此计算所需用时也不同。
CREATE QUERY update_cc_size_whole_graph(
DATETIME start_date_time,
DATETIME end_date_time
) FOR GRAPH MyGraph {
MinAccum<INT> @cc_id = 0;
MinAccum<INT> @old_id = 0;
OrAccum<BOOL> @active;
MapAccum<INT, INT> @@comp_sizes;
all_accounts = {Account.*};
start =
SELECT s
FROM all_accounts:s
POST-ACCUM
s.@cc_id = getvid(s),
s.@old_id = getvid(s)
;
WHILE start.size() > 0 DO
start =
SELECT t
FROM start:s -(co_ip:e)-> Account:t
WHERE (e.create_time BETWEEN start_date_time AND end_date_time)
ACCUM t.@cc_id += s.@cc_id
POST-ACCUM
CASE
WHEN
t.@old_id != t.@cc_id
THEN
t.@old_id = t.@cc_id,
t.@active = TRUE
ELSE
t.@active = FALSE
END
HAVING t.@active == TRUE
;
END;
all_accounts =
SELECT s
FROM all_accounts:s
POST-ACCUM @@comp_sizes += (s.@cc_id -> 1)
;
UPDATE s FROM all_accounts:s
SET s.cc_size = @@comp_sizes.get(s.@cc_id),
s.cc_update_time = now()
;
PRINT
@@comp_sizes.size() AS num_cc_updated,
all_accounts.size() AS num_vertices_updated,
now() AS current_time
;
}
这种方法从全图所有的节点同时出发,通知自己的邻居,自己的 vid 是多少,每个节点对比自己的 vid 和邻居的 vid,将较小的 vid 设置为自己的 cc_id,然后进行下一次迭代。每次迭代之后,只保留在本轮迭代中,更新过 cc_id 的节点,直到全图所有节点的 cc_id 都不再变化,则算法结束。
实验用的数据统计如下:
Type
Number
Total Vertex
95,827
Total Edge
92,413
Vertex "Account"
95,827
Edge "co_ip"
92,413
在相同的机器上,使用前面所说的 batch 更新方法,更新全图所有节点的 cc 信息,共需要约 15 秒,使用全图更新算法,用时约 5 秒。
CREATE QUERY delete_edges(
STRING edge_type,
DATETIME start_date_time,
DATETIME end_date_time
) FOR GRAPH MyGraph {
V = {ANY};
DELETE e
FROM V -(:e)-> ANY
WHERE (e.type == edge_type) AND
(e.create_time BETWEEN start_date_time AND end_date_time)
;
}