1、安装依赖

<dependency>
            <groupId>org.eclipse.milo</groupId>
            <artifactId>sdk-client</artifactId>
            <version>0.6.14</version>
        </dependency>

2、新建OpcUaConfig配置类

package org.example.opcua.opc_config;

import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;


@Configuration
public class OpcUaConfig {
    @Bean
    public OpcUaClient createClient() throws Exception {

        String endPointUrl = "opc.tcp://127.0.0.1:49320";
        Path securityTempDir = Paths.get(System.getProperty("java.io.tmpdir"), "security");
        Files.createDirectories(securityTempDir);
        if (!Files.exists(securityTempDir)) {
            throw new Exception("unable to create security dir: " + securityTempDir);
        }
        OpcUaClient opcUaClient = OpcUaClient.create(endPointUrl,
                endpoints ->
                        endpoints.stream()
                                .filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri()))
                                .findFirst(),
                configBuilder ->
                        configBuilder
                                .setApplicationName(LocalizedText.english("eclipse milo opc-ua client"))
                                .setApplicationUri("urn:eclipse:milo:examples:client")
                                //访问方式
                                .setIdentityProvider(new AnonymousProvider())
                                .setRequestTimeout(UInteger.valueOf(500))
                                .build()
        );
        opcUaClient.connect().get();
//        Thread.sleep(2000); // 线程休眠一下再返回对象,给创建过程一个时间。
        return opcUaClient;
    }
}

3、新建OpcService用于订阅OPCUA数据

package org.example.opcua.opc_service;

import jakarta.annotation.Resource;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static com.google.common.collect.Lists.newArrayList;
import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;

@Service
@Slf4j
public class OpcService implements ApplicationRunner {

    @Resource
    private OpcUaClient opcUaClient;

    private static final AtomicInteger atomic = new AtomicInteger();
    //单个订阅
    @SneakyThrows
    public void subscribe(OpcUaClient client) {
        client
                .getSubscriptionManager()
                .createSubscription(1000.0)
                .thenAccept(t -> {
                    //节点
                    NodeId nodeId = new NodeId(2,"accesstest2.equip1.test");
                    ReadValueId readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(), null, null);
                    //创建监控的参数
                    MonitoringParameters parameters = new MonitoringParameters(UInteger.valueOf(atomic.getAndIncrement()), 1000.0, null, UInteger.valueOf(10), true);
                    //创建监控项请求
                    //该请求最后用于创建订阅。
                    MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
                    List<MonitoredItemCreateRequest> requests = new ArrayList<>();
                    requests.add(request);
                    //创建监控项,并且注册变量值改变时候的回调函数。
                    t.createMonitoredItems(
                            TimestampsToReturn.Both,
                            requests,
                            (item, id) -> item.setValueConsumer((it, val) -> {
                                System.out.println("nodeid :" + it.getReadValueId().getNodeId());
                                System.out.println("value :" + val.getValue().getValue());
                            })
                    );
                }).get();
    }

    /**
     * 批量订阅
     * @throws Exception
     */
    public void createSubscription() throws Exception {
        // 获取OPC UA服务器的数据
        //创建监控项请求

        //创建发布间隔1000ms的订阅对象
        UaSubscription subscription = opcUaClient.getSubscriptionManager().createSubscription(1000.0).get();
        // 你所需要订阅的key
        List<String> key = new ArrayList<>();
        key.add("accesstest2.equip1.test");
        key.add("accesstest2.equip1.test1");
        key.add("accesstest2.equip1.test2");

        for (int i = 0; i < key.size(); i++) {
            String node = key.get(i);
            //创建订阅的变量
            NodeId nodeId = new NodeId(2, node);
            ReadValueId readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(), null, null);
            //创建监控的参数
            MonitoringParameters parameters = new MonitoringParameters(
                    uint(1 + i),  // 为了保证唯一性,否则key值一致
                    0.0,     // sampling interval
                    null,       // filter, null means use default
                    uint(10),   // queue size
                    true        // discard oldest
            );

            MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
            //创建监控项,并且注册变量值改变时候的回调函数。
            List<UaMonitoredItem> items = subscription.createMonitoredItems(
                    TimestampsToReturn.Both,
                    newArrayList(request),
                    (item, id) -> {
                        item.setValueConsumer((is, value) -> {
                            String nodeName = item.getReadValueId().getNodeId().getIdentifier().toString();
                            String nodeValue = value.getValue().getValue().toString();
                            System.out.println("订阅");
                            System.out.println(nodeName);
                            System.out.println(nodeValue);
                        });
                    }).get();
        }

    }


    @Override
    public void run(ApplicationArguments args){
        try {
            //创建发布间隔1000ms的订阅对象
            System.out.println("执行订阅");
            this.createSubscription();
        }catch (Exception e) {
            e.printStackTrace();
            log.error(e.getMessage());
        }
    }
}

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐