使用 Qt 获取 UDP 数据并显示成图片

一个项目,要接收 UDP 数据包,解析并获取其中的数据,主要根据解析出来的行号和序号将数据拼接起来,然后将拼接起来的数据(最重要的数据是 R、G、B 三个通道的像素值)显示在窗口中。考虑到每秒钟要接收的数据包的数量较大,Python 的处理速度可能没有那么快,而且之前对 Qt 也比较熟悉了,所以用Qt 作为客户端接收处理数据包,用近期学习的 Python 模拟发送数据包。

数据格式

在 TCP/IP 协议中,UDP 数据包的大小是由限制的,因此用 UDP 传输数据时,还要在 UDP 层上再封装一层自定义的协议。这个自定义的协议比较简单,每个 UDP 包的大小为 1432 个字节,分为几个部分:

部分 起始字节 字节长度 说明
Start   0   4   包头部的 Magic Number,设为 0x53746172  
PartialCnt   4   1   分包总数,一个字节(0-255)以内  
PartialIdx   5   1   分包序号  
SampleLine   6   1   采样率  
RGB   7   1   rgb 通道标识符  
LineIdx   8   4   行号,每一行可以包含 RGB 三个通道的数据,每个通道由多个分包组成  
ValidDataLen   12   4   数据部分有效字节数  
LineBytes   16   4   每行数据包含的字节总数  
Reserve   20   128   保留部分  
Data   148   1280   数据部分  
end   1428   4   包尾部的 Magic Number,设为 0x54456e64  

上述表格描述的就是一个完整的 UDP 包。这里的一个 UDP 数据包包含的是 RGB 某个通道的某一部分的数据。换种说法:

一行数据

R 通道数据(若干个分包组成)

G 通道数据(若干个分包组成)

B 通道数据(若干个分包组成)

所以要生成/解析 UDP 包,最重要的是 PartialCnt、PartialIdx、RGB、LineIdx、Data 这几个部分。清楚了自定义协议就可以开始编写模拟包的生成和相应的接收逻辑了。

使用 Python 模拟 UDP 发包

由于本地开发的时候缺少必要的硬件环境,为了方便开发,用 Python 编写一个简单的 UDPServer,发送模拟生成的数据包。根据上述协议,可以写出如下的 CameraData 类来表示 UDP 数据包:

# -*- coding: utf-8 -*- DATA_START_MAGIC = bytearray(4) DATA_START_MAGIC[0] = 0x53 # S DATA_START_MAGIC[1] = 0x74 # t DATA_START_MAGIC[2] = 0x61 # a DATA_START_MAGIC[3] = 0x72 # r DATA_END_MAGIC = bytearray(4) DATA_END_MAGIC[0] = 0x54 # T DATA_END_MAGIC[1] = 0x45 # E DATA_END_MAGIC[2] = 0x6e # n DATA_END_MAGIC[3] = 0x64 # d slice_start_magic = slice(0, 4) slice_partial_cnt = 4 slice_partial_idx = 5 slice_sample_line = 6 slice_rgb_extern = 7 slice_line_idx = slice(8, 12) slice_valid_data_len = slice(12, 16) slice_line_bytes = slice(16, 20) slice_resv = slice(20, 148) slice_data = slice(148, 1428) slice_end_magic = slice(1428, 1432) import numpy as np class CameraData(object): def __init__(self): # self.new() # self.rawdata = rawdata self.dataLow = 10 self.dataHigh = 20 self.new() def genRandomByte(self, by=4): r = bytearray(by) for i in range(by): r[i] = np.random.randint(0, 255) def setPackageIdx(self, i = 0): self.rawdata[slice_partial_idx] = i def setRGB(self, c = 1): self.rawdata[slice_rgb_extern] = c def setLineIdx(self, line): start = slice_line_idx.start self.rawdata[start+3] = 0x000000ff & line self.rawdata[start+2] = (0x0000ff00 & line) >> 8 self.rawdata[start+1] = (0x00ff0000 & line) >> 16 self.rawdata[start+0] = (0xff000000 & line) >> 24 def setValidDataLen(self, len): start = slice_valid_data_len.start self.rawdata[start+3] = 0x000000ff & len self.rawdata[start+2] = (0x0000ff00 & len) >> 8 self.rawdata[start+1] = (0x00ff0000 & len) >> 16 self.rawdata[start+0] = (0xff000000 & len) >> 24 def setLineBytes(self, len): start = slice_line_bytes.start self.rawdata[start+3] = 0x000000ff & len self.rawdata[start+2] = (0x0000ff00 & len) >> 8 self.rawdata[start+1] = (0x00ff0000 & len) >> 16 self.rawdata[start+0] = (0xff000000 & len) >> 24 def randomData(self): size = slice_data.stop - slice_data.start arr = np.random.randint(self.dataLow, self.dataHigh, size, dtype=np.uint8) self.rawdata[slice_data] = bytearray(arr) def new(self): """构造新的数据对象 """ self.rawdata = bytearray(1432) self.rawdata[slice_start_magic] = DATA_START_MAGIC self.rawdata[slice_partial_cnt] = 0x02 self.rawdata[slice_partial_idx] = 0x00 self.rawdata[slice_sample_line] = 0x03 self.rawdata[slice_rgb_extern] = 0x01 self.setLineIdx(0x00) self.setValidDataLen(1280) self.setLineBytes(1432) self.randomData() self.rawdata[slice_end_magic] = DATA_END_MAGIC def hex(self): return self.rawdata.hex() def __repr__(self): return '<CameraData@{} hex len: {}>'.format(hex(id(self)), len(self.rawdata))

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsxfdy.html