分布式 PostgreSQL 集群(Citus)官方示例 - 实时仪表盘

分布式 PostgreSQL 集群(Citus)官方示例 - 实时仪表盘

Citus 提供对大型数据集的实时查询。我们在 Citus 常见的一项工作负载涉及为事件数据的实时仪表板提供支持。

例如,您可以是帮助其他企业监控其 HTTP 流量的云服务提供商。每次您的一个客户端收到 HTTP 请求时,您的服务都会收到一条日志记录。您想要摄取所有这些记录并创建一个 HTTP 分析仪表板,为您的客户提供洞察力,例如他们的网站服务的 HTTP 错误数量。 重要的是,这些数据以尽可能少的延迟显示出来,这样您的客户就可以解决他们网站的问题。 仪表板显示历史趋势图也很重要。

或者,也许您正在建立一个广告网络,并希望向客户展示其广告系列的点击率。 在此示例中,延迟也很关键,原始数据量也很高,历史数据和实时数据都很重要。

在本节中,我们将演示如何构建第一个示例的一部分,但该架构同样适用于第二个和许多其他用例。

real-time-analytics-Hands-On-Lab-Hyperscale-Citus

https://github.com/citusdata/real-time-analytics-Hands-On-Lab-Hyperscale-Citus

Architecting Real-Time Analytics for your Customers

https://github.com/citusdata/postgres-analytics-tutorial

数据模型

我们正在处理的数据是不可变的日志数据流。我们将直接插入 Citus,但这些数据首先通过 Kafka 之类的东西进行路由也很常见。 这样做具有通常的优势,并且一旦数据量变得难以管理,就可以更容易地预先聚合数据。

我们将使用一个简单的 schema 来摄取 HTTP 事件数据。 这个 schema 作为一个例子来展示整体架构;一个真实的系统可能会使用额外的列。

-- this is run on the coordinator CREATE TABLE http_request ( site_id INT, ingest_time TIMESTAMPTZ DEFAULT now(), url TEXT, request_country TEXT, ip_address TEXT, status_code INT, response_time_msec INT ); SELECT create_distributed_table('http_request', 'site_id');

当我们调用 时,我们要求 Citus 使用 site_id 列对 http_request 进行 hash 分配。 这意味着特定站点的所有数据都将存在于同一个分片中。

create_distributed_table

UDF 使用分片计数的默认配置值。我们建议在集群中使用 。使用这么多分片可以让您在添加新的工作节点后重新平衡集群中的数据。

2-4 倍于 CPU 核的分片

Azure Database for PostgreSQL — 超大规模 (Citus) 使用流式复制来实现高可用性,因此维护分片副本将是多余的。在任何流复制不可用的生产环境中,您应该将 citus.shard_replication_factor 设置为 2 或更高以实现容错。

Azure Database for PostgreSQL

https://docs.microsoft.com/azure/postgresql/hyperscale/

流式复制

https://www.postgresql.org/docs/current/static/warm-standby.html

有了这个,系统就可以接受数据并提供查询了! 在继续执行本文中的其他命令时,让以下循环在后台的 psql 控制台中运行。 它每隔一两秒就会生成假数据。

DO $$ BEGIN LOOP INSERT INTO http_request ( site_id, ingest_time, url, request_country, ip_address, status_code, response_time_msec ) VALUES ( trunc(random()*32), clock_timestamp(), concat('http://example.com/', md5(random()::text)), ('{China,India,USA,Indonesia}'::text[])[ceil(random()*4)], concat( trunc(random()*250 + 2), '.', trunc(random()*250 + 2), '.', trunc(random()*250 + 2), '.', trunc(random()*250 + 2) )::inet, ('{200,404}'::int[])[ceil(random()*2)], 5+trunc(random()*150) ); COMMIT; PERFORM pg_sleep(random() * 0.25); END LOOP; END $$;

摄取数据后,您可以运行仪表板查询,例如:

SELECT site_id, date_trunc('minute', ingest_time) as minute, COUNT(1) AS request_count, SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count, SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count, SUM(response_time_msec) / COUNT(1) AS average_response_time_msec FROM http_request WHERE date_trunc('minute', ingest_time) > now() - '5 minutes'::interval GROUP BY site_id, minute ORDER BY minute ASC;

上述设置有效,但有两个缺点:

每次需要生成图表时,您的 HTTP 分析仪表板都必须遍历每一行。 例如,如果您的客户对过去一年的趋势感兴趣,您的查询将从头开始汇总过去一年的每一行。

您的存储成本将随着摄取率和可查询历史的长度成比例增长。 在实践中,您可能希望将原始事件保留较短的时间(一个月)并查看较长时间(年)的历史图表。

汇总

您可以通过将原始数据汇总为预聚合形式来克服这两个缺点。 在这里,我们将原始数据汇总到一个表中,该表存储 1 分钟间隔的摘要。 在生产系统中,您可能还需要类似 1 小时和 1 天的间隔,这些都对应于仪表板中的缩放级别。 当用户想要上个月的请求时间时,仪表板可以简单地读取并绘制过去 30 天每一天的值。

CREATE TABLE http_request_1min ( site_id INT, ingest_time TIMESTAMPTZ, -- which minute this row represents error_count INT, success_count INT, request_count INT, average_response_time_msec INT, CHECK (request_count = error_count + success_count), CHECK (ingest_time = date_trunc('minute', ingest_time)) ); SELECT create_distributed_table('http_request_1min', 'site_id'); CREATE INDEX http_request_1min_idx ON http_request_1min (site_id, ingest_time);

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzgywf.html