确保elasticsearch已部署好;
执行以下语句即可创建es表,请按照您自己的es信息调整下面的参数:
CREATE TABLE pv_per_minute ( start_time STRING, end_time STRING, pv_cnt BIGINT ) WITH ( 'connector.type' = 'elasticsearch', -- 类型 'connector.version' = '6', -- elasticsearch版本 'connector.hosts' = 'http://192.168.133.173:9200', -- elasticsearch地址 'connector.index' = 'pv_per_minute', -- 索引名,相当于数据库表名 'connector.document-type' = 'user_behavior', -- type,相当于数据库库名 'connector.bulk-flush.max-actions' = '1', -- 每条数据都刷新 'format.type' = 'json', -- 输出数据格式json 'update-mode' = 'append' );执行以下语句,就会将每分钟的pv总数写入es的pv_per_minute索引:
INSERT INTO pv_per_minute SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL '1' MINUTE), 'yyyy-MM-dd hh:mm:ss') AS start_time, DATE_FORMAT(TUMBLE_END(ts, INTERVAL '1' MINUTE), 'yyyy-MM-dd hh:mm:ss') AS end_time, COUNT(*) AS pv_cnt FROM user_behavior WHERE behavior = 'pv' GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE);用es-head查看,发现数据已成功写入:
当前user_behavior表的category_id表示商品类目,例如11120表示计算机书籍,61626表示牛仔裤,本次实战的数据集中,这样的类目共有五千多种;
如果我们将这五千多种类目分成6个大类,例如11120属于教育类,61626属于服装类,那么应该有个大类和类目的关系表;
这个大类和类目的关系表在MySQL创建,表名叫category_info,建表语句如下:
CREATE TABLE `category_info`( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `parent_id` bigint , `category_id` bigint , PRIMARY KEY ( `id` ) ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;表category_info所有数据来自对原始数据中category_id字段的提取,并且随机将它们划分为6个大类,该表的数据请在我的GitHub下载:https://raw.githubusercontent.com/zq2599/blog_demos/master/files/category_info.sql
请在MySQL上建表category_info,并将上述数据全部写进去;
在Flink SQL Client执行以下语句创建这个维表,mysql信息请按您自己配置调整:
CREATE TABLE category_info ( parent_id BIGINT, -- 商品大类 category_id BIGINT -- 商品详细类目 ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://192.168.50.43:3306/flinkdemo', 'connector.table' = 'category_info', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = '123456', 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10min' );尝试联表查询:
SELECT U.user_id, U.item_id, U.behavior, C.parent_id, C.category_id FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C ON U.category_id = C.category_id;如下图,联表查询成功,每条记录都能对应大类:
再试试联表统计,每个大类的总浏览量:
SELECT C.parent_id, COUNT(*) AS pv_count FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C ON U.category_id = C.category_id WHERE behavior = 'pv' GROUP BY C.parent_id;如下图,数据是动态更新的:
执行以下语句,可以在统计时将大类ID转成中文名:
SELECT CASE C.parent_id WHEN 1 THEN '服饰鞋包' WHEN 2 THEN '家装家饰' WHEN 3 THEN '家电' WHEN 4 THEN '美妆' WHEN 5 THEN '母婴' WHEN 6 THEN '3C数码' ELSE '其他' END AS category_name, COUNT(*) AS pv_count FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C ON U.category_id = C.category_id WHERE behavior = 'pv' GROUP BY C.parent_id;效果如下图:
至此,我们借助Flink SQL Client体验了Flink SQL丰富的功能,如果您也在学习Flink SQL,希望本文能给您一些参考; 欢迎关注公众号:程序员欣宸
微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...
https://github.com/zq2599/blog_demos