②Structured Streaming脚本建立
#!/usr/bin/env python3 import re from functools import partial from pyspark.sql.functions import * from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession \ .builder \ .appName("StructuredKafkaWordCount") \ .getOrCreate() spark.sparkContext.setLogLevel('WARN') #只提示警示信息 lines = spark \ #使用spark streaming则是基于KakfkaUtils包使用createDirectStream .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", 'cdczztar') \ #要消费的topic .load().selectExpr("CAST(value AS STRING)") #lines.printSchema() #正则处理,根据实际数据处理,kafka获取后是oracle日志,在这只提取表插入的值 pattern = 'data":(.+)}' fields = partial(regexp_extract, str="value", pattern=pattern) words = lines.select(fields(idx=1).alias("values")) #输出模式:存入文件 query = words \ .writeStream \ .outputMode("append") \ .format("csv") \ .option("path","file:///tmp/filesink") \ #存到服务器地址 .option("checkpointLocation","file:///tmp/file-sink-cp") \ .trigger(processingTime="10 seconds") \ .start() query.awaitTermination() #新开一个服务器窗口运行,这边已经在代码目录下 /usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 spark.py③运行python实时打开写入的文件,提取信息并推送到微信端
import csv import pyinotify #这个包只支持linux,如果是window系统可以使用watchdog,一个原理及写法 import time import requests import json import datetime import pandas as pd CORPID = "******" #企业微信id SECRET = "*******" #企业微信密钥 AGENTID = 1000041 #企业微信端口 multi_event = pyinotify.IN_CREATE #只对create这个动作做监控 wm = pyinotify.WatchManager() #继承ProcessEvent后,对process_IN_CREATE方法重写 class MyHandler(pyinotify.ProcessEvent): def send_msg_to_wechat(self, content): record = '{}\n'.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')) s = requests.session() url1 = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={0}&corpsecret={1}".format(CORPID, SECRET) rep = s.get(url1) record += "{}\n".format(json.loads(rep.content)) if rep.status_code == 200: token = json.loads(rep.content)['access_token'] record += "获取token成功\n" else: record += "获取token失败\n" token = None url2 = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}".format(token) header = { "Content-Type": "application/json" } form_data = { "touser": "@all", "toparty": " PartyID1 | PartyID2 ", "totag": " TagID1 | TagID2 ", "msgtype": "text", "agentid": AGENTID, "text": { "content": content }, "safe": 0 } rep = s.post(url2, data=json.dumps(form_data).encode('utf-8'), headers=header) if rep.status_code == 200: res = json.loads(rep.content) record += "发送成功\n" else: record += "发送失败\n" res = None return res def process_IN_CREATE(self, event): try: if '_spark_metadata' in event.pathname or '.crc' in event.pathname: pass else: print(event.pathname) f_path = event.pathname #此处坑,streaming那边生成文件还没写入数据就会触发该任务,不sleep打开的是空白文件 time.sleep(5) df = pd.read_csv(r'' + f_path, encoding='utf8', names=['value'], sep='http://www.likecs.com/') send_str = df.iloc[0, 0].replace('\\', '').replace(',"before":null}', '').replace('"','') print(send_str) self.send_msg_to_wechat('中间库预警:' + send_str) except: pass handler = MyHandler() notifier = pyinotify.Notifier(wm,handler) wm.add_watch('/tmp/filesink/',multi_event) notifier.loop()微信端消息如下:
四、问题点
还有下面几个问题还没实现,有思路还请随时评论私信交流,感谢
在structured streaming消费了kafka信息后,是否可以直接把消息推送到微信端口?
python监控文件有新增文件路径可以即时获取,但是要获取内容需要等待数据写入,sleep的方式不稳定,是否有方法可以判断数据已经写完就读取该文件?
学习交流,有任何问题还请随时评论指出交流。