92 lines
2.9 KiB
Python
92 lines
2.9 KiB
Python
import time
|
|
import json
|
|
|
|
import serial
|
|
import serial.tools.list_ports
|
|
from PyQt5.QtWidgets import *
|
|
from PyQt5.QtCore import *
|
|
from PyQt5.QtGui import *
|
|
|
|
|
|
class SerialHandler(QThread):
|
|
sign_node = pyqtSignal(int, dict)
|
|
port_name: str
|
|
|
|
def __init__(self, port_name, parent=None):
|
|
super(SerialHandler, self).__init__(parent)
|
|
self.port_name = port_name
|
|
self.working = True
|
|
|
|
def __del__(self):
|
|
# 线程状态改变与线程终止
|
|
self.working = False
|
|
self.wait()
|
|
|
|
def run(self):
|
|
last_str = '' # 记录上一次收到的字符串
|
|
self.com = serial.Serial(self.port_name, 115200)
|
|
|
|
while self.working == True:
|
|
|
|
serial_str = self.com.readline().decode('utf-8')
|
|
str_list = serial_str.split('&') # 分割字符串
|
|
|
|
if len(str_list) != 5: # 数据缺失
|
|
print('数据缺失!')
|
|
continue
|
|
|
|
if (serial_str != last_str): # 检测数据是否重复
|
|
last_str = serial_str
|
|
|
|
try: # 转换数据
|
|
seq: int = int(str_list[0])
|
|
humi: float = round(float(str_list[1]), 1)
|
|
temp: float = round(float(str_list[2]), 1)
|
|
light: float = round(float(str_list[3]), 1)
|
|
|
|
except Exception:
|
|
print('数据错误!')
|
|
continue
|
|
|
|
# 检测数据合理性
|
|
if humi > 100 or temp > 50 or light > 100000:
|
|
print('数据不合理!')
|
|
continue
|
|
|
|
# 修正读数
|
|
config = json.loads(
|
|
open('./config.json', 'r', encoding='utf-8').read())
|
|
humi += config['node' + str(seq)]['humi']
|
|
temp += config['node' + str(seq)]['temp']
|
|
light += config['node' + str(seq)]['light']
|
|
|
|
humi = round(humi, 2)
|
|
temp = round(temp, 2)
|
|
light = round(light, 2)
|
|
|
|
print(f"节点:{seq} 湿度:{humi}% 温度:{temp}°C 光照度:{light}lx")
|
|
|
|
# 写入 json 文件
|
|
data = json.loads(
|
|
open('./data.json', 'r', encoding='utf-8').read())
|
|
|
|
data['node' + str(seq)]['humi'] = humi
|
|
data['node' + str(seq)]['temp'] = temp
|
|
data['node' + str(seq)]['light'] = light
|
|
data['time'] = time.strftime(
|
|
'%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
|
|
|
|
with open('./data.json', 'w') as fp:
|
|
fp.write(json.dumps(data))
|
|
fp.close()
|
|
|
|
# 获取文本
|
|
node_data = {
|
|
"seq": seq,
|
|
"humi": humi,
|
|
"temp": temp,
|
|
"light": light
|
|
}
|
|
# 发射信号
|
|
self.sign_node.emit(seq, node_data)
|