数据加载

之前的案例中,介绍了 TigerGraph 从文件加载以及 HTTP 导入数据的方法,本案例会再介绍另一种数据导入方式,即从 Kafka 中导入。

从 Kafka 导入边,最大的好处是可以很大程度提升数据的时效性,如果原始日志也是从 Kafka 中拉取,配合上一节介绍的 Pipeline,即可以以接近实时的方式,完成边的创建工作。

注意: 这里说的从 Kafka 导入边,指的是在前面所说的 ETL Pipeline 中,将最后产出的边,存放在 Kafka 的一个 Topic 中,TigerGraph 会自动从该 Topic 中将新生成的边同步到数据库里。

创建 Data Loading Job

co_context_edges_loader.gsql
/*
sample edge in json format:

{
  "src_node": "u1", 
  "tgt_node": "u2", 
  "edge_type": "co_ip", 
  "edge_attrs": {
      "context", "1.1.1.1",
      "create_time": 1583024431, 
      "time_diff": 30
  }
 }
 
 co_ip edge schema:
 
 UNDIRECTED EDGE co_ip(FROM Account, TO Account, ip STRING, create_time DATETIME, time_diff INT)
 
*/
DROP JOB load_co_context_edges_json
CREATE LOADING JOB load_co_context_edges_json FOR GRAPH MyGraph {
    DEFINE FILENAME jsonfile;
    LOAD jsonfile
        TO EDGE co_ip VALUES (
            $"src_node", 
            $"tgt_node", 
            $"edge_attrs":"context", 
            REDUCE(max($"edge_attrs":"create_time")),
            REDUCE(min($"edge_attrs": "time_diff"))
        ) WHERE $"edge_type" == "co_ip"
    USING JSON_FILE="true";
}

co_ip 边第三个字段需要填写的是 ip 信息,他来自于 JSON 数据中的 edge_attrs 下的 context 字段,可以通过冒号来访问下一层级的字段。比如这里,通过 $"edge_attrs":"context" 来获得 ip 信息。

在 TigerGraph 中,两个节点之间,相同类型的边,只能有一条。比如 u1u2 之间,只能有一条 co_ip 边,如果数据中出现了多条,那么后面的数据将覆盖之前的数据。对于 co_ip 边,两个账号非常有可能在后续的时间再次在同一个 IP 下共现的,这里我们希望将 create_time 设置成他们最后一次共现的时间,将 time_diff 设置成他们曾今时间最接近的时候的时间差。这里我们就需要用到 Reducer Functions

REDUCE(max($"edge_attrs":"create_time")) 用来保留最大的 create_time

REDUCE(min($"edge_attrs": "time_diff")) 用来保留最小的 time_diff

除了minmax 外,还有很多 Reducer Functions,譬如可以用 add 来统计这条边出现过来几次,等等。详细可以在 TigerGraph 官方文档中搜索 Reducer Functions。

之前有说过,除了 co_ip 边外,后续可能还有其他的 co_context 边,即数据源中除了 co_ip 外还有其他类型的边,因此我们需要增加一个过滤条件:

WHERE $"edge_type" == "co_ip"

创建 Kafka Data Source

首先,需要写一个配置文件,用来定义 Kafka 集群的地址,消费组等信息。假设集群地址为 192.168.5.90:9092 ,给 TigerGraph 定义的消费组 id 为 tigergraph :

将该配置文件放置在 TigerGraph 所在服务器的 /home/tigergraph 目录下,进入 GSQL 客户端:

上述命令定义里一个名为 k1 的 DATA_SOURCE

配置 Topic 信息

除了 DATA_SOURCE 外,我们还需要配置之后要消费的 Topic

在 TigerGraph 所在服务器的 /home/tigergraph 目录下,创建一个关于 co-context 边的 topics 配置。假设我们在 ETL Pipeline 的最后,将生成的边存放到了名为 co-context-edges 的 topic 中:

上面 start_offset = -1 表示,从这个队列中最新的数据开始消费,start_offset = -2 表示,从这个队列中第一条消息开始消费。

执行 Loading Job

进入 GSQL 客户端

Job 后面跟着的是 jobid ,可以看到从 job 开始起,一共消费了 149600 条消息,平均每秒消费 5 万条消息,这速度还是挺惊人的。

后续可以通过这个 jobid 查看最新的运行情况。

如果忘记了 jobid 也没有关系,直接用 SHOW LOADING STATUS ALL

最后更新于

这有帮助吗?