redis-pub/sub 清疚 2022-05-21 20:08 99阅读 0赞 使用redis实现简单的发布订阅功能,很简单看看小例子 RedisUtil工具类 package com.redis.util; import org.apache.log4j.Logger; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class RedisUtil { private static final Logger logger = Logger.getLogger(RedisUtil.class); //Redis服务器IP private static String ADDR = "192.168.72.129"; //Redis的端口号 private static int PORT = 6380; //可用连接实例的最大数目,默认值为8; //如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。 private static int MAX_ACTIVE = 1024; //控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。 private static int MAX_IDLE = 200; //等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException; private static int MAX_WAIT = 10000; private static int TIMEOUT = 10000; //在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的; private static boolean TEST_ON_BORROW = true; private static JedisPool jedisPool = null; static { try { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(MAX_ACTIVE); config.setMaxIdle(MAX_IDLE); config.setMaxWaitMillis(MAX_WAIT); config.setTestOnBorrow(TEST_ON_BORROW); jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT); } catch (Exception e) { e.printStackTrace(); } } /** * 获取Jedis实例 * @return */ public synchronized static Jedis getJedis() { try { if (jedisPool != null) { Jedis resource = jedisPool.getResource(); return resource; } else { return null; } } catch (Exception e) { e.printStackTrace(); return null; } } /** * 释放jedis资源 * @param jedis */ public static void closeResource(final Jedis jedis) { if (jedis != null) { jedis.close(); } } /** * 删除key */ public Long delkeyObject(String key) { Jedis jedis = null; try { jedis = jedisPool.getResource(); return jedis.del(key.getBytes()); }catch(Exception e) { e.printStackTrace(); return null; }finally{ if(jedis != null) { jedis.close(); } } } public Boolean existsObject(String key) { Jedis jedis = null; try { jedis = jedisPool.getResource(); return jedis.exists(key.getBytes()); }catch(Exception e) { e.printStackTrace(); return null; }finally{ if(jedis != null) { jedis.close(); } } } } 订阅者Sub onMessage方法接收数据 package com.redis.pubsub; import redis.clients.jedis.JedisPubSub; public class Sub extends JedisPubSub{ public Sub() { } /** * 取得订阅的消息后的处理 * @param channel 频道 * @param message 收到的消息 */ public void onMessage(String channel, String message) { System.out.println(String.format("###receive redis published message, channel %s, message %s", channel, message)); } /** * 初始化订阅时候的处理 * @param channel * @param subscribedChannels */ public void onSubscribe(String channel, int subscribedChannels) { System.out.println(String.format("###订阅redis频道成功,channel %s,subscribedChannels %d",channel, subscribedChannels)); } /** * 取消订阅时候的处理 * @param channel * @param subscribedChannels */ public void onUnsubscribe(String channel, int subscribedChannels) { System.out.println(String.format("###取消订阅 redis channel, channel %s, subscribedChannels %d",channel, subscribedChannels)); } /** * 初始化按表达式的方式订阅时候的处理 * @param pattern * @param subscribedChannels */ public void onPSubscribe(String pattern, int subscribedChannels) { System.out.println(pattern + "=" + subscribedChannels); } /** * 取消按表达式的方式订阅时候的处理 * @param pattern * @param subscribedChannels */ public void onPUnsubscribe(String pattern, int subscribedChannels) { System.out.println(pattern + "=" + subscribedChannels); } /** * 取得按表达式的方式订阅的消息后的处理 * @param pattern * @param channel * @param message */ public void onPMessage(String pattern, String channel, String message) { System.out.println(pattern + "=" + channel + "=" + message); } } 订阅启动 监听频道为1030 public class PSTest { public static void main(String[] args) { //这里使用的是简单的main启动 Jedis jedis = RedisUtil.getJedis(); jedis.subscribe(new Sub(),"1030"); } } 发布者Pub package com.redis.pubsub; import com.redis.util.RedisUtil; import redis.clients.jedis.Jedis; import java.io.BufferedReader; import java.io.InputStreamReader; public class Pub{ public static void main(String[] args) { Jedis jedis = RedisUtil.getJedis(); int a = 1; while (true){ //向1030频道发布数据 jedis.publish("1030","message"+a); a++; try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } } 打印结果: ###订阅redis频道成功,channel 1030,subscribedChannels 1 ###receive redis published message, channel 1030, message message24 ###receive redis published message, channel 1030, message message25 ###receive redis published message, channel 1030, message message26 ###receive redis published message, channel 1030, message message27 ###receive redis published message, channel 1030, message message28 这里需要注意的是一定要是订阅者阻塞。不然可能会出现丢失数据的情况。 也就是说一定要先订阅,然后再发布数据。 如果一个客户端订阅了频道,但自己读取消息的速度却不够快的话,那么不断积压的消息会使redis输出缓冲区的体积变得越来越大,这可能使得redis本身的速度变慢,甚至直接崩溃。
还没有评论,来说两句吧...