spring boot  集成rabbitmq、  redis 、 和  mqtt(mosquitto)

一、 添加依赖,编写 application.xml 依赖

                
                <!--添加 rabbitmq 的依赖-->
                 <dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>http-client</artifactId>
			<version>${rabbitmq.http-client}</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

		<!-- 引入redis依赖 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis</artifactId>
		</dependency>
		<!-- 缓存的依赖 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-cache</artifactId>
		</dependency>
                <!--mqtt依赖-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-stream</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-mqtt</artifactId>
		</dependency>

          <!--添加activemq的依赖-->
            <dependency>  
                 <groupId>org.springframework.boot</groupId>  
                 <artifactId>spring-boot-starter-activemq</artifactId>  
             </dependency>
      
             <dependency>
          <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.8</version>
          </dependency>

二、配置自动配置类

1.编写 RabbitConfig 配置类

package com.devframe.common.config;

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpManagementOperations;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitManagementTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName:
 * @Description:
 * @author DuanZhaoXu
 * @data 2019年1月5日上午10:52:32
 */
@Configuration
public class RabbitConfig {

	@Bean
	@ConfigurationProperties(prefix = "spring.rabbitmq")
	public ConnectionFactory connectionFactory() {
		return new CachingConnectionFactory();
	}

	@Bean
	public AmqpAdmin AmqpAdmin() {
		return new RabbitAdmin(connectionFactory());
	}

	@Bean
	public RabbitTemplate rabbitTemplate() {
		RabbitTemplate template = new RabbitTemplate(connectionFactory());
		template.setMessageConverter(new Jackson2JsonMessageConverter());
		return template;
	}

	@Bean
	public AmqpManagementOperations amqpManagementOperations() {
		AmqpManagementOperations amqpManagementOperations = new RabbitManagementTemplate(
				"http://192.168.19.200:15672", "admin", "admin@123");
		return amqpManagementOperations;
	}
	
	
	@Bean
	public Queue mqttQueue() {
		return new Queue("mqttQueue", true, false, false);
	}
}

 2.编写 RedisConfig配置类

package com.devframe.common.config;

import java.io.Serializable;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

	@Bean
    public RedisTemplate<String, Serializable> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        return redisTemplate;
    }
}

 3.编写 MqttSenderConfig 配置类 

https://docs.spring.io/spring-integration/docs/5.1.1.RELEASE/reference/html/mqtt.html#mqtt-inbound

package com.devframe.common.config;

import java.util.Arrays;
import java.util.List;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;


@Configuration
@IntegrationComponentScan
public class MqttSenderConfig {


    @Value("${spring.mqtt.username}")
    private String username;

 

    @Value("${spring.mqtt.password}")
    private String password;

 

    @Value("${spring.mqtt.url}")
    private String hostUrl;

 

    //@Value("${spring.mqtt.client.id}")
    private String clientId = String.valueOf(System.currentTimeMillis());

 

    @Value("${spring.mqtt.default.topic}")
    private String defaultTopic;
    
    @Value("#{'${spring.mqtt.topics}'.split(',')}")
    private List<String> topics ;
    
    @Value("#{'${spring.mqtt.qosValues}'.split(',')}")
    private List<Integer> qosValues;

 
    @Bean
    public MqttConnectOptions getMqttConnectOptions(){
        MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        mqttConnectOptions.setKeepAliveInterval(2);
        // 设置超时时间 单位为秒
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setMaxInflight(100000000);
      
        return mqttConnectOptions;

    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =  new MqttPahoMessageHandler(clientId, mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        messageHandler.setDefaultRetained(false);
        return messageHandler;
    }
    
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
    //接收通道
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    
    //配置client,监听的topic 
    @Bean
    public MessageProducer inbound() {
    	String[] strings = new String[topics.size()];
    	Integer[] ints = new Integer[qosValues.size()];
    	topics.toArray(strings);
    	qosValues.toArray(ints);
    	
    	int[] its= Arrays.stream(ints).mapToInt(Integer::valueOf).toArray();
    	MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),strings);
    	
        adapter.setCompletionTimeout(3000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    //通过通道获取数据
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
                String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
               // System.out.println(topic+"|"+message.getPayload().toString());
            }
        };
    }

}

4.编写  MqttGateway 发送消息的接口

package com.devframe.common.config;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {

	void sendToMqtt(String data);
	void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
	void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

在  其他的 需要 用到 mqtt 发送消息的时候直接 @Autowired 该 接口 既可进行消息的发送 

5、编写 activemq 的 配置类

package com.devframe.common.config;

import javax.jms.Queue;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

@Configuration
public class ActivemqConfig {
	
//	  #配置 农机上报的消息转发到的topic名称
//	  queueName: mqttQueue
//	  topicName: mqttTopic
    //@Value("${spring.queueName}")
    public static final String queueName ="mqttQueue";

    //@Value("${spring.topicName}")
    public static final String topicName ="mqttTopic";

    @Value("${spring.activemq.user}")
    private String usrName;

    @Value("${spring.activemq.password}")
    private  String password;

    @Value("${spring.activemq.broker-url}")
    private  String brokerUrl;

    
    @Bean
    public Queue queue(){
        return new ActiveMQQueue(queueName);
    }

    @Bean
    public Topic topic(){
        return new ActiveMQTopic(topicName);
    }
    
    //配置activemq连接工厂
//    @Bean
//    public ActiveMQConnectionFactory activeMQConnectionFactory() {
//    	return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
//    }
    
    //配置连接池工厂(在高并发的情况下需要使用池连接工厂,不然当向activemq发送过多的消息时候会报错)
    @Bean
    public PooledConnectionFactory pooledConnectionFactory() {
    	 return new PooledConnectionFactory(new ActiveMQConnectionFactory(usrName, password, brokerUrl));
    }
    

    //配置JmsListenerContainerFactory
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue( ){
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(pooledConnectionFactory());
        return bean;
    }

    //配置发布订阅模式的JmsListenerContainerFactory,用于在消费者方指定
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(){
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        //设置为发布订阅方式, 默认情况下使用的生产消费者方式
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(pooledConnectionFactory());
        return bean;
    }
}

 

 

 三 、AmqpTemplate 、RedisTemplate、MqttGateway ,

 1.AmqpTemplate 的使用

package com.devframe.controller;

import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpManagementOperations;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.devframe.entity.OperatEntity;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;

@RestController
@Api(tags = "RabbitServerController rabbitmq测试controller")
public class RabbitServerController {

	@Autowired
	private AmqpTemplate amqpTemplate;

	@Autowired
	private AmqpManagementOperations amqpManagementOperations;

	@Autowired
	private AmqpAdmin amqpAdmin;

	@RequestMapping(value = "/sendMsg", method = RequestMethod.POST)
	public String sendAmqbMsg(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
		if (model != null && !"".equals(msg)) {
			amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf.send", msg);
		} else {
			amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf.send", "hello world");
		}
		amqpManagementOperations.getQueues().forEach(x -> {
			System.out.println(x.getName());
		});
		;

		return "success";
	}

	@RequestMapping(value = "/sendMsg2", method = RequestMethod.POST)
	public String sendAmqbMsg2(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
		if (model != null && !"".equals(msg)) {
			amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf2.send", "这个世界很奇妙!!!");
		} else {
			amqpTemplate.convertAndSend("mq.asdfExChange", "mq.asdf2.send", "这个世界很奇妙");
		}
		return "success";
	}

	@RequestMapping(value = "/sendMsg3", method = RequestMethod.POST)
	public String sendAmqbMsg3(Model model, @RequestParam(value = "msg", defaultValue = "hello world!!!") String msg) {
		if (model != null && !"".equals(msg)) {
			amqpTemplate.convertAndSend("mq.qwerExChange", "mq.qwer.send", "神奇的世界!!!");
		} else {
			amqpTemplate.convertAndSend("mq.qwerExChange", "mq.qwer.send", "神奇的世界");
		}
		return "success";
	}

	@RequestMapping(value = "/helloWorld", method = RequestMethod.POST)
	@ApiOperation("1对1 or 1对多 无交换机模式")
	public String helloWorld(Model model, @RequestParam(value = "msg", defaultValue = "1对1,无交换机模式!") String msg) {
		if (model != null && !"".equals(msg)) {
			amqpTemplate.convertAndSend("mqttQueue", msg);
		}
		return "success";
	}

	@RequestMapping(value = "/pubSub", method = RequestMethod.POST)
	@ApiOperation("广播发布/订阅模式")
	public String pubSub(Model model, @RequestParam(value = "msg", defaultValue = "广播发布/订阅模式") String msg) {
		if (model != null && !"".equals(msg)) {
			// 广播模式对于路由无效,所有的消费者都可以获取都消息
			amqpTemplate.convertAndSend("pubSubExchange", "", msg);
		}
		return "success";
	}

	@RequestMapping(value = "/routing", method = RequestMethod.POST)
	@ApiOperation("路由消息模式")
	public String routing(Model model, @RequestParam(value = "msg", defaultValue = "路由消息模式") String msg) {
		if (model != null && !"".equals(msg)) {
			String[] infoTyp = new String[] { "info", "warn", "error" };
			for (String routing : infoTyp) {
				amqpTemplate.convertAndSend("routingExchange", routing, msg);
			}
		}
		return "success";
	}

	@RequestMapping(value = "/topicMatch", method = RequestMethod.POST)
	@ApiOperation("主题模式")
	public String topicModal(Model model, @RequestParam(value = "msg", defaultValue = "主题模式") String msg) {
		if (model != null && !"".equals(msg)) {
			String[] infoTyp = new String[] { "P.123.asdasd", "P.456.JQBE", "P.789.WBD", "P.ASBDJBAS" };
			for (String routing : infoTyp) {
				amqpTemplate.convertAndSend("topicExchange", routing, msg);
			}
		}
		return "success";
	}

	@RequestMapping(value = "/headerModal", method = RequestMethod.POST)
	@ApiOperation("header模式")
	public String headerModal(Model model, @RequestParam(value = "msg", defaultValue = "主题模式") String msg)
			throws UnsupportedEncodingException {
		if (model != null && !"".equals(msg)) {
			Map<String, Object> map = new HashMap<String, Object>();
			map.put("abc", "nb");
			map.put("def", "pl");
			map.put("jabs", "aksd");
			for (Entry<String, Object> entry : map.entrySet()) {
				MessageProperties messageProperties = new MessageProperties();
				messageProperties.setHeader(entry.getKey(), entry.getValue());
				Message message = new Message(msg.getBytes("utf-8"), messageProperties);
				amqpTemplate.convertAndSend("headerExchange", null, message);
			}
		}
		return "success";
	}

	@RequestMapping(value = "/createTaskQueue", method = RequestMethod.POST)
	@ApiOperation("自动创建队列并发送消息到队列")
	public String createTaskQueue(Model model, @RequestParam(value = "msg", defaultValue = "主题模式") String msg,
			@RequestParam(name = "queueName", defaultValue = "ff008") String queueName)
			throws UnsupportedEncodingException {
		if (model != null && !"".equals(msg)) {
			amqpAdmin.declareQueue(new Queue(queueName, true, false, false));
			OperatEntity operatEntity = new OperatEntity();
			operatEntity.setId("123456");
			operatEntity.setIndepeid("askdnad");
			operatEntity.setRecordid(msg);
			operatEntity.setTablename(msg);
			amqpTemplate.convertAndSend(queueName, operatEntity);
			// Object object = amqpTemplate.receiveAndConvert(queueName);
			// if(object instanceof OperatEntity){
			// OperatEntity operatEntity2 = (OperatEntity)object;
			// System.out.println("从队列"+queueName+"收到了一个【"+operatEntity2.toString()+"】");
			// System.out.println(operatEntity.getId());
			// System.out.println(operatEntity.getIndepeid());
			// System.out.println(operatEntity.getRecordid());
			// System.out.println(operatEntity.getTablename());
			// }else{
			// String result = (String)object;
			// System.out.println("从队列"+queueName+"收到了一个【"+result+"】");
			// }
		}
		return "success";
	}

	/**
	 * 删除以 任务名称为前缀的队列
	 * 
	 * @param queueNamePre
	 * @return String
	 */
	@RequestMapping(value = "/deleteQueue", method = RequestMethod.POST)
	@ApiOperation("删除以 任务名称为前缀的队列")
	public String deleteQueueWithPre(String queueNamePre) {
		List<Queue> queues = amqpManagementOperations.getQueues();
		for (Queue queue : queues) {
			if (queue.getName().startsWith(queueNamePre)) {
				amqpManagementOperations.deleteQueue(queue);
			}
		}
		return "success";
	}

	@RequestMapping(value = "/createExchangeBindTaskQueue", method = RequestMethod.POST)
	@ApiOperation("自动创建交换机并绑定队列")
	public String createExchangeBindTaskQueue(Model model,
			@RequestParam(value = "msg", defaultValue = "主题模式") String msg,
			@RequestParam(name = "exchangeName", defaultValue = "ff008") String exchangeName)
			throws UnsupportedEncodingException {
		if (model != null && !"".equals(msg)) {
			// 查询交换机是否存在
			// Exchange exchange =
			// amqpManagementOperations.getExchange(exchangeName);
			// if(exchange==null){ //如果不存在 ,则声明该交换机
			String randomNum = UUID.randomUUID().toString().substring(0, 8);
			Exchange exchange = ExchangeBuilder.directExchange(exchangeName).durable(true).build();
			Queue queue = QueueBuilder.durable(exchangeName + "-" + randomNum).build();
			amqpAdmin.declareExchange(exchange);
			amqpAdmin.declareQueue(queue);
			// }
			// 否则直接 将 该队列绑定到 交换机上面,routingkey 为 生成的8位随机数
			amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with(randomNum).noargs());
			amqpTemplate.convertAndSend(exchangeName, randomNum, msg);
			String result = (String) amqpTemplate.receiveAndConvert(queue.getName());
			System.out.println(result);
		}
		return "success";
	}

	/**
	 * 删除以 任务名称的交换机
	 * 
	 * @param queueNamePre
	 * @return String
	 */
	@RequestMapping(value = "/deleteExchange", method = RequestMethod.POST)
	@ApiOperation("删除以 任务名称的交换机")
	public String deleteExchange(String exchangeName) {
		// Map<String, Object> map =
		// amqpManagementOperations.getExchange(exchangeName).getArguments();
		List<Binding> bindings = amqpManagementOperations.getBindingsForExchange("/", exchangeName);
		for (Binding binding : bindings) {
			String routingkey = binding.getRoutingKey();
			amqpAdmin.deleteQueue(exchangeName + "-" + routingkey);
		}
		amqpAdmin.deleteExchange(exchangeName);
		return "success";
	}
	// rpc调用未实现
}

2. RedisTemplate 的使用

 

package com.devframe.common.util;

import java.util.List;
import java.util.Set;

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

@Component
public class RedisTemplateUtils {

	@Autowired
	private StringRedisTemplate stringRedisTemplate;
	
//	@Autowired
//	private RedisTemplate<String, Serializable> redisTemplate;
	
	public static final int LOCK_TIMEOUT = 4;
	

	// ---------------value 为String 的 操作
	/**
	 * set 字符串的 key,value
	 * 
	 * @param key
	 * @param value
	 */
	public void set(String key, String value) {
		stringRedisTemplate.opsForValue().set(key, value);
	}

	/**
	 * 根据key获取字符串的 value
	 * 
	 * @param key
	 */
	public String get(String key) {
		return stringRedisTemplate.opsForValue().get(key);
	}

	/**
	 * 根据 key 删除
	 * 
	 * @param key
	 */
	public void del(String key) {
		stringRedisTemplate.opsForValue().getOperations().delete(key);
	}
	
	
	// ---------------value 为 List 的操作

	/**
	 * push 元素到 list中
	 */
	public  void  lpush(String key,String value) {
		stringRedisTemplate.opsForList().leftPush(key, value);
	}
	
	/**
	 *  获取 list中 某个下标的元素
	 * @param key
	 * @param index
	 */
	public String  lindex(String key,long index) {
	   return stringRedisTemplate.opsForList().index(key, index);
	}
	
	
	/**
	 * 根据key 获取 list的长度
	 * @param key
	 * @return
	 */
	public long  llen(String key) {
		return stringRedisTemplate.opsForList().size(key);
	}
	
	
	/**
	 * 根据 key,start,end 获取某一个区间的 list数据集
	 * @param key
	 * @param start
	 * @param end
	 * @return
	 */
	public  List<String>  lrange(String key, long start, long end){
		return stringRedisTemplate.opsForList().range(key, start, end);
	}
	
	
	/**
	 * 根据 key值 pattern查询所有匹配的值,比如login*
	 * @param key
	 * @return
	 */
	public  Set<String> keys(String key){
		return  stringRedisTemplate.keys(key);
	}
	

	

	/**
	 * 加锁
	 * 
	 * @param key   productId - 商品的唯一标志
	 * @param value 当前时间+超时时间 也就是时间戳
	 * @return
	 */
	public boolean lock(String key,String value) {
		if (stringRedisTemplate.opsForValue().setIfAbsent(key, value)) {// 对应setnx命令
			// 可以成功设置,也就是key不存在
			return true;
		}

		// 判断锁超时 - 防止原来的操作异常,没有运行解锁操作 防止死锁
		String currentValue = stringRedisTemplate.opsForValue().get(key);
		// 如果锁过期
		if (!StringUtils.isEmpty(currentValue) && Long.parseLong(currentValue) < System.currentTimeMillis()) {// currentValue不为空且小于当前时间
			// 获取上一个锁的时间value
			String oldValue = stringRedisTemplate.opsForValue().getAndSet(key, value);// 对应getset,如果key存在

			// 假设两个线程同时进来这里,因为key被占用了,而且锁过期了。获取的值currentValue=A(get取的旧的值肯定是一样的),两个线程的value都是B,key都是K.锁时间已经过期了。
			// 而这里面的getAndSet一次只会一个执行,也就是一个执行之后,上一个的value已经变成了B。只有一个线程获取的上一个值会是A,另一个线程拿到的值是B。
			if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
				// oldValue不为空且oldValue等于currentValue,也就是校验是不是上个对应的商品时间戳,也是防止并发
				return true;
			}
		}
		return false;
	}

	/**
	 * 解锁
	 * 
	 * @param key
	 * @param value
	 */
	public void unlock(String key,String value) {
		try {
			String currentValue = stringRedisTemplate.opsForValue().get(key);
			if (!StringUtils.isEmpty(currentValue) && currentValue.equals(value)) {
				stringRedisTemplate.opsForValue().getOperations().delete(key);// 删除key
			}
			
		} catch (Exception e) {
			Log.error(e);
		}
	}

}

 3.MqttGateway 的使用

          @Autowired
	  private MqttGateway mqttGateway;
	  
	  @RequestMapping("/sendMsg")
	    public String sendMsg(String  sendData,String topic){
	        try {
		    mqttGateway.sendToMqtt(sendData,topic);
	        mqttGateway.sendToMqtt(topic, 0, sendData);
	        return "发送成功";
	        }catch (Exception e) {
				// TODO: handle exception
	        Log.error(e);
	        return "发送失败";
			}
	  }

4、jmsMessagingTemplate 的使用

@@Autowired
private JmsMessagingTemplate  jmsMessagingTemplate;
jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(ActivemqConfig.topicName), payload);

 

 

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐