data:image/s3,"s3://crabby-images/e83b4/e83b48bbac783e2b7a1200cd2c8fc51f8adae3ca" alt="cover"
python车辆定位调度管理系统,基于django+twisted
数据处理模块: data_handle文件夹 1、receive_serv.py,数据接收,并解析,解析出来时候放入redis,设备的监听是10808,默认协议是jt808部标通讯协议,目前也只解析了定位和一些基本参数。软件架构说明 主要框架 python: django twisted 数据库:postgresql 前端:layui 进程守护:supervisor web服务器:nginx 中间
·
通过给车辆安装gps定位器,实现车辆的监控管理
软件架构
软件架构说明 主要框架 python: django twisted 数据库:postgresql 前端:layui 进程守护:supervisor web服务器:nginx 中间件:redis
安装和使用教程
web服务器: 1、postgresql数据库新建 car 数据库,也可自行修改setting里面的数据库配置 2、运行python manage.py runserver 有缺少的库,自行安装下。
数据处理模块: data_handle文件夹 1、receive_serv.py,数据接收,并解析,解析出来时候放入redis,设备的监听是10808,默认协议是jt808部标通讯协议,目前也只解析了定位和一些基本参数。 2、store_serv.py,从redis接受解析好的数据,进行状态计算(超速报警,围栏报警等)和存储。
完整程序代码下载地址:python车辆定位调度管理系统
receive_serv.py
#! /usr/bin/python#-*- coding: UTF-8 -*-
# 三基类协议手环手表产品接收模块,
# 在analysis中引入redis,目前主要处理需要分包处理的数据
# Python代码大全,2022.7.1
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor
from twisted.internet.threads import deferToThread
import time
import datetime
import redis
import re
import sys,os
import time
empty_table={'lbs': '0', 'mileage': '0', 'speed': '0', 'othermesg':{}, 'wifi': '0',
'oil_use': '0', 'dirct': '0', 'scanner': '0', 'rssi': '0', 'efence_alarm': '0',
'status': '0', 'remark': '0', 'heart_rate': '0', 'speed_alarm': '0', 'battery': '0',
'blood_pres': '0', 'stop_time': '0','sos_alarm': '0', 'step_count': '0',
'heart_alarm': '0', 'device_id':'0', 'dev_upload': '0', 'obd': '0', 'blood_alarm': '0',
'lng': '0', 'serv_receive': '0', 'track_type': '0', 'location': '0', 'lat': '0', 'hard_verson': '0'}
pool_redis= redis.ConnectionPool(host='localhost',port=6379,decode_responses=True)
mesg_redis=redis.Redis(connection_pool=pool_redis)
def log_write(data):#日志log存储
with open('sys_log/import_analysis_log','a') as f:
f.write(data)
f.write('&&')
#自动加载command文件夹里面的所有command命令文档,存储为字典{厂家型号:命令函数()},
#例如{'vk':lambda device_id,kind,param,model:vk_command(device_id,kind,param,model)}
analysis={}
first_handle={}
def impfile(filename):
filelist,*_ = os.walk(filename,topdown=True)
*_,filenames = filelist
for i in filenames:
print(i)
if i[-3:] == '.py' and i != '__init__.py':
try:
device_model=re.search(r'(?P<model>\w*)_',i).group('model')
exc = 'from '+ filename+'.'+i[:-3] + ' import '+ '*'
exec(exc,globals())
print(i[:-3])
# analysis[device_model]=lambda datas:eval(i[:-3])(datas)#本地可运行,服务器不可
analysis[device_model]=i[:-3]
first_handle[device_model]=device_model+'_first_handle'
except Exception as e:
analysis_log=str([i,e,datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')])
log_write(analysis_log)
#命令函数目录名
impfile('analysis')
print('analysis',analysis)
print('first_handle',first_handle)
port_and_model={10712:'wz808',10808:'rby808',10809:'jt808'}
# port_and_model={23333:'jt808'}
class Iot_Pro(Protocol):
def __init__(self,tml_id,data_buffer,separate_package):
self.tml_id=tml_id
self.data_buffer=data_buffer
self.separate_package=separate_package
def connectionMade(self):
print("new connect:",str(self.transport.client))
param=mesg_redis.rpop('car_set_param')
print('command_param==>',param)
if param!=None:
param=eval(param)
self.handle_command(param)
def connectionLost(self, reason):
client_num=str(self.transport.client)
if client_num in self.tml_id:
print('lost connected...:',self.tml_id[client_num])
offline_data={'device_id':self.tml_id[client_num],'othermesg':str({'on_off_line':'offline'}),'serv_receive':datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
save_talbe=empty_table.copy()
save_talbe.update(offline_data)
del self.tml_id[self.tml_id[client_num]]
del self.tml_id[client_num]
mesg_redis.rpush('car_gps_data',str(save_talbe))
# print(empty_table)
if client_num in self.data_buffer:
del self.data_buffer[client_num]
def dataReceived(self,origin_data):
#接收数据,并进行初步处理,判断数据的完整性,完整的数据异步分发到下一步的处理模块,根据协议进行解析,不完整的数据,放到相应的缓冲地带
protocol_num=self.transport.socket.getsockname()[1]
print('org_data==>',origin_data)
first_result=eval(first_handle[port_and_model[protocol_num]]+'(origin_data)')
# print('first_result',first_result)
if 'normal_data' in first_result:
first_result=first_result['normal_data']
for f_result in first_result:
deferToThread(self.handle_data,f_result).addCallback(self.write_response)
elif first_result=='buffer_data':
buffer_id=self.transport.client
# print('buffer_id',buffer_id)
# print('self.data_buffer',self.data_buffer)
if buffer_id in self.data_buffer:
if self.data_buffer[buffer_id]==b'':
self.data_buffer[buffer_id]=origin_data
else:
self.data_buffer[buffer_id]+=origin_data
data=self.data_buffer[buffer_id]
buffer_result=eval(first_handle[port_and_model[protocol_num]]+'(data)')
# print('buffer_result',buffer_result)
if 'normal_data' in buffer_result:
self.data_buffer[buffer_id]=b''
for f_result in buffer_result['normal_data']:
deferToThread(self.handle_data,f_result).addCallback(self.write_response)
# 缓冲超过一定的数量,清零处理
elif len(data)>12800000:
self.data_buffer[buffer_id]=b''
else:
print('buffer_data_add')
self.data_buffer[buffer_id]=origin_data
def handle_data(self, data):
#对数据包进行二次处理,主要是第完整的数据包根据设备的协议进行解析,获得设备上传的主要数据
protocol_num=self.transport.socket.getsockname()[1]
client_num=str(self.transport.client)
receive_time=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
d_done=''
dev_id=self.tml_id.get(client_num)
d_done=eval(analysis[port_and_model[protocol_num]]+'(data,receive_time,protocol_num,mesg_redis,dev_id)')
if 'device_id' in d_done:
self.tml_id[client_num]=d_done['device_id']
self.tml_id[d_done['device_id']]=self
else:
d_done['device_id']=self.tml_id[client_num]
print('d_done==>',d_done)
response=d_done.pop('response')
save_kind=d_done.pop('save_kind')
if save_kind=='yes':
print('send_data_to_redis')
mesg_redis.rpush('car_gps_data',str(d_done))
# print('serv_receive',d_done['serv_receive'])
result={'device_id':d_done['device_id'],'response':response}#执行缓存命令,需要用到设备编号
return result
def write_response(self, result):#数据包的处理结果,并返回数据给客户端
print("data_resule_:",result)
response=result['response']
device_id=result['device_id']
if response!=b'0':
self.transport.write(response)
cache_command=mesg_redis.rpop(device_id+'cache_command')
if cache_command!=None:
cache_command=eval(cache_command)
self.handle_command(cache_command)
def handle_command(self,param):
# 获取该设备的命令执行状态,正在执行命令的情况下,本次的命令转为缓存,等下次设备收到信息再执行
device_id=param['device_id']
command_excuting= mesg_redis.get(device_id+'command_excuting')
# 设备在线,且没有执行其他命令的情况下,执行本次命令,否则缓存该命令,等下次设备收到信息再执行
if device_id in self.tml_id and command_excuting==None:
mesg_redis.set(device_id+'command_excuting','yes',ex=60)
set_result=b'set_param,ok!'
send_data=param['param']
# 针对设备的数据包是分包发送的情况,分包好的数据整体推送到redis,执行的时候按顺序取下来发送给设备。
if isinstance(send_data,dict):
if send_data.get('voice_dict')!=None:
mesg_redis.set(device_id+'voice_dict',str(send_data['voice_dict']),ex=60)
# 发送分包数据的首包
send_data=send_data['command']
else:
set_result=b'voice_rececving'
if set_result==b'set_param,ok!':
self.tml_id[device_id].transport.write(send_data)
command_detail=param['command_detail']
command_name=param['command_name']
# 把命令执行表推送到redis,设备执行完毕的是,在analysis模块取下来更新执行结果
mesg_redis.set(device_id+command_name,str(command_detail),ex=60)
# 回复命令的下发情况
self.transport.write(set_result)
print('set_result====>',set_result)
else:
# 设备不在线的情况下,缓存命令,等上线再执行
mesg_redis.lpush(device_id+'cache_command',str(param))
self.transport.write(b'mission cache!')
class Iot_Fact(Factory):
def __init__(self):
self.tml_id = {}
self.data_buffer = {}
self.separate_package={}
def buildProtocol(self, addr):
return Iot_Pro(self.tml_id,self.data_buffer,self.separate_package)
for port in port_and_model:
reactor.listenTCP(port,Iot_Fact())
reactor.run()
完整程序代码下载地址:python车辆定位调度管理系统
更多推荐
所有评论(0)