监听串口,接收它们发过来的数据,进行处理。

一、概况

前不久做的一个项目,需要读取水下传感器的数据。这些传感器通过串口与外界交互。我们写了一个java程序,接收传感器传送的数据,同时也下发命令,控制部分传感器。

二、运行环境

(一)硬件环境

串口的话,一般台式机主板有1、2个串口,如果只有2台传感器,那么通过串口线与台式机连接起来,就能直接访问了。
在这里插入图片描述

1、串口线

在这里插入图片描述

2、串口服务器

但如果有许多传感器,怎么办呢?这时候需要用到串口服务器,比如8口的串口服务器,同时接8个传感器,然后串口服务器通过网线与应用服务器相连。而在应用服务器,通过串口软件,将串口服务器的8个串口映射到本机。这样,就相当于8个传感器直接连到了应用服务器一样,应用服务器上的程序就能直接访问这8个传感器了。

在这里插入图片描述
串口服务器连接8个传感器的接口,既可以是串口,也可以是经过转换线后接入网口。
在这里插入图片描述

3、应用服务器映射串口服务器的串口

在这里插入图片描述

(二)串口软件

1、映射串口服务器的串口到本地

前面说过,要用串口软件将串口服务器的8个串口映射到本地。要达成这个效果,
1)首先要安装串口服务器的驱动程序
2)串口服务器提供的软件,将8个串口映射到本地
以moxa为例:
在这里插入图片描述
3)访问串口服务器提供的WEB管理页面,对每个串口的参数,如波特率等进行设置。这种设置,主要是针对传感器,假设每款传感器固定连接到某个串口。

2、模拟数据

使用串口设备似乎有个好处,就是有一些类似串口助手之类的软件,可以模拟向指定串口发送数据,利于测试。
在这里插入图片描述
在这里插入图片描述

三、代码

我们使用JAVA来接收数据和写入指令。

1、代码结构

代码文件的名字与其真正用途有点对应不上,主要是对系统的理解一直有变化,而文件名却基本保留了下来,二者不同步导致。
在这里插入图片描述

在这里插入图片描述

代码讲解

1、开始程序

开始程序主要是执行初始化工作,获取服务器中可用的串口。串口是独占的,如果有别的进程在用,那我们的程序就用不了。需要先中止别的进程,释放资源才行。

Receiver.java

public class Receiver implements ApplicationRunner {
    private List<PortReader> readerList = new ArrayList<PortReader>();//设备数据接收器,有多个,所以用数组
    private List<PortEx> curPortList = new ArrayList<>();//当前活跃端口。用于检查端口变化
    private PortSaver portSaver = null;//接收到的数据输出或转储处理器,只有1个,集中处理
    public static BlockingQueue<DataRow> queue = new LinkedBlockingQueue<>();//接收到的数据处理队列

    @Autowired
    @Qualifier("redisService1")
    RedisService redisService;

    //相关数据库表操作类
    @Autowired
    PortService portService;

    //以下为传感器类
    @Autowired
    PortCtdService ctdService;
    @Autowired
    PortCo2Service co2Service;
    @Autowired
    PortAdcpEquipService adcpEquipService;
    @Autowired
    PortAdcpCurrentService adcpCurrentService;
    @Autowired
    PortAdcpWaveService adcpWaveService;
    @Autowired
    PortAdcpInstrumentService adcpInstrumentService;
    @Autowired
    PortHydroService hydroService;

    public static void main(String[] args) {
        Receiver rec = new Receiver();
        rec.init();
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        System.out.println("start init reader ");
        init();
    }

    @PreDestroy
    public void exit() {
        System.out.println("Reader go home when system exit");
        for (PortReader reader : readerList) {
            if (reader != null) {
                reader.stopRead();
            }
        }
    }

    private void init() {
        List<PortEx> ports = getAvailablePorts();//获取系统中可用的端口
        go(ports);

        portSaver = new PortSaver(queue, 
            redisService,
            ctdService,co2Service, adcpEquipService,adcpCurrentService,adcpWaveService,adcpInstrumentService,hydroService);//将各款传感器类都传入,方便集中处理
        portSaver.start();
    }
    private List<PortEx> getAvailablePorts() {//获取系统中可用的端口
        List<PortEx> availablePorts = new ArrayList<>();

        List<PortEx> ports = portService.queryByPageEx(null, PageRequest.of(0, 1000));

        Enumeration portList = CommPortIdentifier.getPortIdentifiers();
        while (portList.hasMoreElements()) {
            CommPortIdentifier portId = (CommPortIdentifier) portList.nextElement();
            if (portId.getPortType() == CommPortIdentifier.PORT_SERIAL) {
                for (PortEx port : ports) {
                    if (portId.getName().equals(port.getPortId())) {
                        port.setCommPortIdentifier(portId);
                        availablePorts.add(port);
                        break;
                    }
                }
            }
        }

        return availablePorts;
    }
    private void go(List<PortEx> ports) {
        readerList.clear();
        curPortList.clear();

        for (PortEx port : ports) {
            if(!port.getSensorCategory().equals("LIGHT")){//如果是不用写入指令的设备。。。
                PortReader read = new PortReader(port.getCommPortIdentifier(), port, queue);
                read.setDaemon(true);//设置守护进程
                read.start();
                readerList.add(read);
                curPortList.add(port);
            }else{
                PortWriter writer = new PortWriter(port.getCommPortIdentifier(), port, DeviceManager.deviceMap);
                writer.setDaemon(true);
                writer.start();
            }
        }
    }
}

2、串口设备对象类

串口设备对象类,初始化串口,开启串口监听,接收设备信息流,都在这里完成。

Device.java

/**
 * 串口设备对象类
 * 初始化串口,开启串口监听,接收设备信息流,都在这里完成
 */
public class Device implements SerialPortEventListener {

    private PortEx port = null;//自定义的串口实体,包括端口号,波特率等等
    private SerialPort serialPort = null;
    private CommPortIdentifier portId = null;
    private InputStream inputStream;
    private OutputStream outputStream;

    private BlockingQueue<DataRow> queue = null;

    private Map<String,String> _temps = new HashMap<>();//用于暂存接收到的字符串,积攒到完整一条记录后才填入消息队列进行处理

    public Device(CommPortIdentifier Id, PortEx p, BlockingQueue<DataRow> queue) {
        try {
            this.port = p;
            this.portId = Id;
            this.queue = queue;
            serialPort = (SerialPort) portId.open(port.getPortName(), 2000);
            //设置波特率、数据位、停止位、检验位
            serialPort.setSerialPortParams(port.getBaudRate(),
                    port.getDataBits(),
                    port.getStopBits(),
                    port.getParity());
            //获取输入流
            inputStream = serialPort.getInputStream();
            outputStream = serialPort.getOutputStream();

            //设置串口监听
            serialPort.addEventListener(this);
            //设置开启监听
            serialPort.notifyOnDataAvailable(true);

            System.out.println("已初始化端口:" + portId.getName());
        } catch (PortInUseException e) {
            System.out.println(String.format("%s正在使用!", portId.getName()));
        } catch (TooManyListenersException e) {
            e.printStackTrace();
        } catch (UnsupportedCommOperationException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void open() {
        if(serialPort != null)  {
            System.out.println(String.format("%s打开中...",portId.getName()));
        } else {
            System.out.println(String.format("%s打开失败",portId.getName()));
        }
    }
    public boolean close() {
        try {
            serialPort.close();
        } catch(Exception ex){
            System.err.println(ex.getCause());
            return false;
        }
        try {
            inputStream.close();
        } catch (Exception ex) {
            System.err.println(ex.getCause());
            return false;
        }
        try {
            outputStream.close();
        } catch (Exception ex) {
            System.err.println(ex.getCause());
            return false;
        }
        System.out.println(String.format("%s关闭...%b",port.getPortName(),portId.isCurrentlyOwned()));
        return true;
    }    

    /**
     * 监听函数
     */
    public void serialEvent(SerialPortEvent serialPortEvent) {
        switch (serialPortEvent.getEventType()) {
            //获取到有效信息
            case SerialPortEvent.DATA_AVAILABLE:
                on_data_available();
                break;
            default:
                System.out.println("不知所谓");
                break;
        }
    }
    public void write(byte[] cmd) {//写入指令到设备
        try {
            outputStream.write(cmd);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    /**
     * 读取串口信息
     */
    private void on_data_available() {
        if (inputStream != null) {
            try {
                int len = inputStream.available();
                byte[] readBuffer = new byte[len];
                len = inputStream.read(readBuffer);
                String txtData = new String(readBuffer, 0, len).trim();
                read(txtData);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    private void read(String txtData) {
        if (txtData != null && txtData.length() > 0) {
            String category = port.getSensorCategory();

            String keeper = "";
            if(_temps.containsKey(category)){
                keeper = _temps.get(category);
            }

            String[] lines = txtData.split("\r");
            for(int i = 0;i < lines.length;i++){
                String line = lines[i];
                keeper += line.trim();
                if(DataUtil.iswhole(category,keeper)){
                    sendIt(category,keeper);//信息处理。。。
                    keeper = "";
                }
                if(i < lines.length - 1) {//本line与下一个line之间有换行符。意味着已经结束
                    keeper = "";
                }
                _temps.put(category,keeper);
            }
        }
    }
    private void sendIt(String category,String line){
        try {
            DataRow row = new DataRow();

            //将数据写入row...
            。。。

            queue.put(row);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

3、串口设备管理器

读取数据的串口设备管理器,在这里操作Device对象。

PortReader.java

/**
 * 读取数据的串口设备管理器
 * 
 */
public class PortReader extends Thread {
    private CommPortIdentifier portId = null;
    private PortEx port = null;
    private Device device = null;
    private BlockingQueue<DataRow> queue = null;

    public PortReader(CommPortIdentifier id, PortEx p, BlockingQueue<DataRow> queue) {
        super();
        this.portId = id;
        this.queue = queue;
        this.port = p;
    }

    public void run() {
        device = new Device(portId, port, queue);
        device.open();
        DeviceManager.deviceMap.put(port.getPortId(), device);
    }

    public void stopRead() {
        if(device != null) device.close();
        System.out.println(getName() + " stop success when reader go home");
    }

    public PortEx getPort(){
        return this.port;
    }
}

4、数据处理类

将从消息队列中接收到的设备数据进行处理。

PortSaver.java

/**
 * 将读取到的设备数据输出或转储处理器
 */
public class PortSaver extends Thread {
    private BlockingQueue<DataRow> queue = null;
    private DataRow dataRow = null;
    private RedisService redisService;

    //以下为各传感器类
    private PortCtdService ctdService;
    private PortCo2Service co2Service;
    private PortAdcpEquipService adcpEquipService;
    private PortAdcpCurrentService adcpCurrentService;
    private PortAdcpWaveService adcpWaveService;
    private PortAdcpInstrumentService adcpInstrumentService;
    private PortHydroService hydroService;

    public PortSaver(BlockingQueue<DataRow> queue, RedisService redisService,
                     PortCtdService ctdService,
                     PortCo2Service co2Service,
                     PortAdcpEquipService adcpEquipService,
                     PortAdcpCurrentService adcpCurrentService,
                     PortAdcpWaveService adcpWaveService,
                     PortAdcpInstrumentService adcpInstrumentService,
                     PortHydroService hydroService) {
        super();
        this.queue = queue;
        this.redisService = redisService;

        this.ctdService = ctdService;
        this.co2Service = co2Service;
        this.adcpEquipService = adcpEquipService;
        this.adcpCurrentService = adcpCurrentService;
        this.adcpWaveService = adcpWaveService;
        this.adcpInstrumentService = adcpInstrumentService;
        this.hydroService = hydroService;
    }

    public void run() {
        if (queue != null) {
            while (true) {
                if (queue.size() > 0) {
                    try {
                        dataRow = queue.take();

                        System.out.println(dataRow.getMessage());

                        String[] values = dataRow.getMessage().trim().split(",");
                        String category = dataRow.getCategory();//串口类型
                        int sensorId = dataRow.getSensorId();
                        Object obj = DataUtil.getEntity(category, values);
                        dealIt(category, sensorId, obj);
                    } catch (Exception ex) {
                        // TODO Auto-generated catch block
                        System.err.println(ex.getCause());
                    }
                }
            }
        }
    }

    public void stopSave() {
        System.out.println(getName() + " stop success when reader go home");
    }

    private Map<String,PortAdcpCurrent> adcpCurrents = new HashMap<>();
    private void dealIt(String category, int sensorId, Object obj) {
        if (obj == null) return;

        //判断category对应的传感器类型,分别处理其数据
        。。。
    }

    。。。
}

5、队列

代码中使用了队列,用于处理接收到的传感器数据。

BlockingQueue<DataRow> 是Java中的一种数据结构,它是Java并发库(java.util.concurrent)的一部分,主要用于多线程间的生产者-消费者模式。它是一个线程安全的队列,能够阻塞插入和移除操作。

1)在项目的启动程序设置一个静态的队列

首先,在项目的启动程序设置一个静态的队列,然后将该队列注入到串口设备管理器(PortReader)和串口数据处理器(PortSaver)

@Component
@Order(2)
@EnableScheduling
public class Receiver implements ApplicationRunner {
    private List<PortReader> readerList = new ArrayList<PortReader>();
    private PortSaver portSaver = null;
    
    //设置一个静态的队列
    public static BlockingQueue<DataRow> queue = new LinkedBlockingQueue<>();

    public static void main(String[] args) {
        Receiver rec = new Receiver();
        rec.init();
    }

    private void init() {
		。。。
        portSaver = new PortSaver(queue, 。。。);
        portSaver.start();
    }
    
    private void go(List<PortEx> ports) {
		。。。
        for (PortEx port : ports) {
	        PortReader read = new PortReader(port.getCommPortIdentifier(), port, queue);
	        read.setDaemon(true);//设置守护进程
	        read.start();
			。。。
        }
    }
}

2)生产者:串口设备管理器(PortReader)

串口设备管理器(PortReader)中,将注入的队列再注入到串口设备对象类(Device)

public class PortReader extends Thread {
    private Device device = null;
    private BlockingQueue<DataRow> queue = null;

    public PortReader(CommPortIdentifier id, PortEx p, BlockingQueue<DataRow> queue) {
        super();
        this.portId = id;
        this.queue = queue;
        this.port = p;
    }

    public void run() {
        device = new Device(portId, port, queue);
        device.open();
        DeviceManager.deviceMap.put(port.getPortId(), device);
    }

	。。。
}

/**
 * 串口设备对象
 */
public class Device implements SerialPortEventListener {
    private BlockingQueue<DataRow> queue = null;

    public Device(CommPortIdentifier Id, PortEx p, BlockingQueue<DataRow> queue) {
         this.queue = queue;
         。。。
    }
	
	。。。

    private void sendIt(String category,String line){
        try {
            DataRow row = new DataRow();

			//将读到的内容装配到row
            row.setRecTime(new Date());
            row.setMessage(line);
			。。。

            queue.put(row);//写入队列
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

3)消费者:串口数据处理器(PortSaver)

public class PortSaver extends Thread {
    private BlockingQueue<DataRow> queue = null;
	。。。
	
    public PortSaver(BlockingQueue<DataRow> queue, 。。。) {
        super();
        this.queue = queue;
		。。。
    }

    public void run() {
        if (queue != null) {
            while (true) {
                if (queue.size() > 0) {
                    try {
                        dataRow = queue.take();
						//处理dataRow
						。。。
                    } catch (Exception ex) {
                        // TODO Auto-generated catch block
                        System.err.println(ex.getCause());
                    }
                }
            }
        }
    }
    
}

4)这个队列不是消息队列

从代码中可以看出,这个队列(BlockingQueue<>)不是一个消息队列,消费的时候,它只能在无限循环中不断检查有无新的数据到来:

while (true) {
    if (queue.size() > 0) {
        try {
            dataRow = queue.take();
			//处理dataRow
			。。。
        } catch (Exception ex) {
            // TODO Auto-generated catch block
            System.err.println(ex.getCause());
        }
    }
}

所以说它功能有限,只用在简单起见的场合。

四、小结

我们项目中用到的传感器,发送数据有一定频率,比如每分钟发一笔数据。每笔数据有固定的格式。但是,每笔数据有可能分为好几次发送,断断续续,所以处理时要注意。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐