数据加载
之前的案例中,介绍了 TigerGraph 从文件加载以及 HTTP 导入数据的方法,本案例会再介绍另一种数据导入方式,即从 Kafka 中导入。
从 Kafka 导入边,最大的好处是可以很大程度提升数据的时效性,如果原始日志也是从 Kafka 中拉取,配合上一节介绍的 Pipeline,即可以以接近实时的方式,完成边的创建工作。
创建 Data Loading Job
/*
sample edge in json format:
{
"src_node": "u1",
"tgt_node": "u2",
"edge_type": "co_ip",
"edge_attrs": {
"context", "1.1.1.1",
"create_time": 1583024431,
"time_diff": 30
}
}
co_ip edge schema:
UNDIRECTED EDGE co_ip(FROM Account, TO Account, ip STRING, create_time DATETIME, time_diff INT)
*/
DROP JOB load_co_context_edges_json
CREATE LOADING JOB load_co_context_edges_json FOR GRAPH MyGraph {
DEFINE FILENAME jsonfile;
LOAD jsonfile
TO EDGE co_ip VALUES (
$"src_node",
$"tgt_node",
$"edge_attrs":"context",
REDUCE(max($"edge_attrs":"create_time")),
REDUCE(min($"edge_attrs": "time_diff"))
) WHERE $"edge_type" == "co_ip"
USING JSON_FILE="true";
}
$ gsql -g MyGraph co_context_edges_loader.gsql
co_ip
边第三个字段需要填写的是 ip
信息,他来自于 JSON 数据中的 edge_attrs
下的 context
字段,可以通过冒号来访问下一层级的字段。比如这里,通过 $"edge_attrs":"context"
来获得 ip
信息。
在 TigerGraph 中,两个节点之间,相同类型的边,只能有一条。比如 u1
和 u2
之间,只能有一条 co_ip
边,如果数据中出现了多条,那么后面的数据将覆盖之前的数据。对于 co_ip
边,两个账号非常有可能在后续的时间再次在同一个 IP 下共现的,这里我们希望将 create_time
设置成他们最后一次共现的时间,将 time_diff
设置成他们曾今时间最接近的时候的时间差。这里我们就需要用到 Reducer Functions。
REDUCE(max($"edge_attrs":"create_time"))
用来保留最大的 create_time
REDUCE(min($"edge_attrs": "time_diff"))
用来保留最小的 time_diff
除了min
和 max
外,还有很多 Reducer Functions,譬如可以用 add
来统计这条边出现过来几次,等等。详细可以在 TigerGraph 官方文档中搜索 Reducer Functions。
之前有说过,除了 co_ip
边外,后续可能还有其他的 co_context
边,即数据源中除了 co_ip
外还有其他类型的边,因此我们需要增加一个过滤条件:
WHERE $"edge_type" == "co_ip"
创建 Kafka Data Source
首先,需要写一个配置文件,用来定义 Kafka 集群的地址,消费组等信息。假设集群地址为 192.168.5.90:9092
,给 TigerGraph 定义的消费组 id 为 tigergraph
:
{
"broker": "192.168.5.60:9092",
"kafka_config": {
"group.id": "tigergraph"
}
}
将该配置文件放置在 TigerGraph 所在服务器的 /home/tigergraph
目录下,进入 GSQL 客户端:
GSQL-Dev > CREATE DATA_SOURCE KAFKA k1 = "/home/tigergraph/kafka.config" FOR GRAPH MyGraph
上述命令定义里一个名为 k1
的 DATA_SOURCE
配置 Topic 信息
除了 DATA_SOURCE 外,我们还需要配置之后要消费的 Topic
在 TigerGraph 所在服务器的 /home/tigergraph
目录下,创建一个关于 co-context
边的 topics 配置。假设我们在 ETL Pipeline 的最后,将生成的边存放到了名为 co-context-edges
的 topic 中:
{
"topic": "co-context-edges",
"partition_list": [
{
"start_offset": -1,
"partition": 0
}
]
}
上面 start_offset = -1
表示,从这个队列中最新的数据开始消费,start_offset = -2
表示,从这个队列中第一条消息开始消费。
执行 Loading Job
进入 GSQL 客户端
GSQL-Dev > USE GRAPH MyGraph
Using graph 'MyGraph'
GSQL-Dev > RUN LOADING JOB load_co_context_edges_json USING jsonfile="$k1:/home/tigergraph/kafka_co_context_edges_topic_partition.conf"Try to list topic metadata from Kafka broker '192.168.5.60:9092', timeout: 3 sec ...
[Tip: Use "CTRL + C" to stop displaying the loading status update, then use "SHOW LOADING STATUS jobid" to track the loading progress again]
[Tip: Manage loading jobs with "ABORT/RESUME LOADING JOB jobid"]
Starting the following job, i.e.
JobName: load_co_context_edges_json, jobid: MyGraph.load_co_context_edges_json.kafka.k1.1.1585207057408
Loading log: '/home/tigergraph/tigergraph/logs/restpp/restpp_kafka_loader_logs/MyGraph/MyGraph.load_co_context_edges_json.kafka.k1.1.1585207057408.log'
Job "MyGraph.load_co_context_edges_json.kafka.k1.1.1585207057408" loading status
[RUNNING] m1 ( Total: 1 )
+---------------------------------------------------------------------------------+
| TOPIC PARTITION | LOADED MESSAGES | AVG SPEED | DURATION | LOADED SIZE|
|co-context-edges:0 | 149600 | 51 kl/s | 2.90 s | 2.94 MB|
+---------------------------------------------------------------------------------+
Job "MyGraph.load_co_context_edges_json.kafka.k1.1.1585205329349" loading status
Job 后面跟着的是 jobid
,可以看到从 job 开始起,一共消费了 149600 条消息,平均每秒消费 5 万条消息,这速度还是挺惊人的。
后续可以通过这个 jobid
查看最新的运行情况。
$ gsql -g MyGraph "SHOW LOADING STATUS MyGraph.load_co_context_edges_json.kafka.k1.1.1585205329349"
Connecting to 192.168.5.60
If there is any relative path, it is relative to tigergraph/dev/gdk/gsql
Job "MyGraph.load_co_context_edges_json.kafka.k1.1.1585207057408" loading status
[RUNNING] m1 ( Total: 1 )
+---------------------------------------------------------------------------------+
| TOPIC PARTITION | LOADED MESSAGES | AVG SPEED | DURATION | LOADED SIZE|
|co-context-edges:0 | 149600 | 51 kl/s | 2.90 s | 2.94 MB|
+---------------------------------------------------------------------------------+
如果忘记了 jobid
也没有关系,直接用 SHOW LOADING STATUS ALL
。
最后更新于
这有帮助吗?