spring boot 连接订阅OPCUA数据
spring boot 连接订阅OPCUA数据
·
1、安装依赖
<dependency>
<groupId>org.eclipse.milo</groupId>
<artifactId>sdk-client</artifactId>
<version>0.6.14</version>
</dependency>
2、新建OpcUaConfig配置类
package org.example.opcua.opc_config;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@Configuration
public class OpcUaConfig {
@Bean
public OpcUaClient createClient() throws Exception {
String endPointUrl = "opc.tcp://127.0.0.1:49320";
Path securityTempDir = Paths.get(System.getProperty("java.io.tmpdir"), "security");
Files.createDirectories(securityTempDir);
if (!Files.exists(securityTempDir)) {
throw new Exception("unable to create security dir: " + securityTempDir);
}
OpcUaClient opcUaClient = OpcUaClient.create(endPointUrl,
endpoints ->
endpoints.stream()
.filter(e -> e.getSecurityPolicyUri().equals(SecurityPolicy.None.getUri()))
.findFirst(),
configBuilder ->
configBuilder
.setApplicationName(LocalizedText.english("eclipse milo opc-ua client"))
.setApplicationUri("urn:eclipse:milo:examples:client")
//访问方式
.setIdentityProvider(new AnonymousProvider())
.setRequestTimeout(UInteger.valueOf(500))
.build()
);
opcUaClient.connect().get();
// Thread.sleep(2000); // 线程休眠一下再返回对象,给创建过程一个时间。
return opcUaClient;
}
}
3、新建OpcService用于订阅OPCUA数据
package org.example.opcua.opc_service;
import jakarta.annotation.Resource;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.collect.Lists.newArrayList;
import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
@Service
@Slf4j
public class OpcService implements ApplicationRunner {
@Resource
private OpcUaClient opcUaClient;
private static final AtomicInteger atomic = new AtomicInteger();
//单个订阅
@SneakyThrows
public void subscribe(OpcUaClient client) {
client
.getSubscriptionManager()
.createSubscription(1000.0)
.thenAccept(t -> {
//节点
NodeId nodeId = new NodeId(2,"accesstest2.equip1.test");
ReadValueId readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(), null, null);
//创建监控的参数
MonitoringParameters parameters = new MonitoringParameters(UInteger.valueOf(atomic.getAndIncrement()), 1000.0, null, UInteger.valueOf(10), true);
//创建监控项请求
//该请求最后用于创建订阅。
MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
List<MonitoredItemCreateRequest> requests = new ArrayList<>();
requests.add(request);
//创建监控项,并且注册变量值改变时候的回调函数。
t.createMonitoredItems(
TimestampsToReturn.Both,
requests,
(item, id) -> item.setValueConsumer((it, val) -> {
System.out.println("nodeid :" + it.getReadValueId().getNodeId());
System.out.println("value :" + val.getValue().getValue());
})
);
}).get();
}
/**
* 批量订阅
* @throws Exception
*/
public void createSubscription() throws Exception {
// 获取OPC UA服务器的数据
//创建监控项请求
//创建发布间隔1000ms的订阅对象
UaSubscription subscription = opcUaClient.getSubscriptionManager().createSubscription(1000.0).get();
// 你所需要订阅的key
List<String> key = new ArrayList<>();
key.add("accesstest2.equip1.test");
key.add("accesstest2.equip1.test1");
key.add("accesstest2.equip1.test2");
for (int i = 0; i < key.size(); i++) {
String node = key.get(i);
//创建订阅的变量
NodeId nodeId = new NodeId(2, node);
ReadValueId readValueId = new ReadValueId(nodeId, AttributeId.Value.uid(), null, null);
//创建监控的参数
MonitoringParameters parameters = new MonitoringParameters(
uint(1 + i), // 为了保证唯一性,否则key值一致
0.0, // sampling interval
null, // filter, null means use default
uint(10), // queue size
true // discard oldest
);
MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
//创建监控项,并且注册变量值改变时候的回调函数。
List<UaMonitoredItem> items = subscription.createMonitoredItems(
TimestampsToReturn.Both,
newArrayList(request),
(item, id) -> {
item.setValueConsumer((is, value) -> {
String nodeName = item.getReadValueId().getNodeId().getIdentifier().toString();
String nodeValue = value.getValue().getValue().toString();
System.out.println("订阅");
System.out.println(nodeName);
System.out.println(nodeValue);
});
}).get();
}
}
@Override
public void run(ApplicationArguments args){
try {
//创建发布间隔1000ms的订阅对象
System.out.println("执行订阅");
this.createSubscription();
}catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage());
}
}
}
更多推荐
已为社区贡献18条内容
所有评论(0)