缓存方式幂等性工具、消息队列方式幂等性工具、检测客户端服务端是否在线

r囧r小猫 2023-01-08 10:29 59阅读 0赞

1、缓存方式幂等性工具

  1. import cn.hutool.core.util.IdUtil;
  2. import com.alibaba.fastjson.JSON;
  3. import org.apache.commons.lang3.StringUtils;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.data.redis.core.StringRedisTemplate;
  6. import java.util.concurrent.TimeUnit;
  7. /** * 通过唯一ID判断数据是否存在 * 主要逻辑 存入数据不通过mysql主键进行做唯一标识, * 采用redis天然的幂等性,插入数据进行存储唯一标识,如果插入数据进行判断存在,不进行插入数据 */
  8. public class IdentityHash {
  9. @Autowired
  10. private StringRedisTemplate stringRedisTemplate;
  11. /** * 将对象存入缓存中 * @param obj 对象数据 * @param timeout 超时时间 */
  12. public void set(Object obj, long timeout) {
  13. if (obj instanceof String) {
  14. stringRedisTemplate.opsForValue().set(JSON.toJSONString(System.identityHashCode(obj)), IdUtil.randomUUID(), timeout, TimeUnit.MILLISECONDS);
  15. return;
  16. }
  17. stringRedisTemplate.opsForValue().set(JSON.toJSONString(System.identityHashCode(obj)), IdUtil.randomUUID(), timeout, TimeUnit.MILLISECONDS);
  18. }
  19. /** * 根据Key查询缓存中的数据 * @param key * @return */
  20. public String get(final String key) {
  21. if (StringUtils.isEmpty(key)) {
  22. return null;
  23. }
  24. return stringRedisTemplate.opsForValue().get(key);
  25. }
  26. /** * 根据key删除缓存数据 * @param key */
  27. public void delete(String key) {
  28. stringRedisTemplate.delete(key);
  29. }
  30. /** * 查询缓存是否存在 * @param checkCase * @return */
  31. public boolean checkForm(String checkCase){
  32. String cacheValue = get(checkCase);
  33. /**如果查询缓存不为空,返回true*/
  34. if (StringUtils.isNotEmpty(cacheValue)){
  35. return true;
  36. }
  37. return false;
  38. }
  39. }

2、消息队列方式幂等性工具

  1. import com.alibaba.fastjson.JSON;
  2. import com.rabbitmq.client.Channel;
  3. import lombok.Data;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.amqp.core.AmqpTemplate;
  6. import org.springframework.amqp.core.Message;
  7. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  8. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.stereotype.Service;
  11. import java.io.IOException;
  12. @Slf4j
  13. @Service
  14. public class AmqpHash {
  15. @Autowired
  16. private AmqpTemplate amqpTemplate;
  17. /** * 发送消息 * * @param queueName 队列名称 */
  18. public void send(String queueName, Object obj) {
  19. Messages messages = new Messages();
  20. messages.setObj(obj);
  21. messages.setMessageId(String.valueOf(System.identityHashCode(obj)));
  22. amqpTemplate.convertAndSend(queueName, JSON.toJSONString(messages));
  23. }
  24. @RabbitHandler
  25. @RabbitListener(queues = "queue", concurrency = "1")
  26. public void process(String msg, Message message, Channel channel) throws IOException {
  27. log.debug("消费者接收到的消息 :{}, 来源:{} 时间:{}", msg, channel.getChannelNumber(), System.currentTimeMillis());
  28. try {
  29. if(msg.contains("5")){
  30. throw new RuntimeException("抛出异常");
  31. }
  32. log.info("消息{}消费成功",msg);
  33. //消息Id
  34. message.getMessageProperties().getMessageId();
  35. channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
  36. } catch (Exception e) {
  37. log.error("接收消息过程中出现异常,执行nack");
  38. //第三个参数为true表示异常消息重新返回队列,会导致一直在刷新消息,且返回的消息处于队列头部,影响后续消息的处理
  39. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  40. log.error("消息{}异常",message.getMessageProperties().getHeaders());
  41. }
  42. }
  43. @Data
  44. static class Messages{
  45. private Object obj;
  46. private String messageId;
  47. private String uuid;
  48. }
  49. }

3、检测客户端服务端是否在线

一、服务端:

  1. import io.netty.bootstrap.ServerBootstrap;
  2. import io.netty.channel.*;
  3. import io.netty.channel.group.ChannelGroup;
  4. import io.netty.channel.group.DefaultChannelGroup;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.SocketChannel;
  7. import io.netty.channel.socket.nio.NioServerSocketChannel;
  8. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  9. import io.netty.handler.codec.Delimiters;
  10. import io.netty.handler.codec.string.StringDecoder;
  11. import io.netty.handler.codec.string.StringEncoder;
  12. import io.netty.handler.timeout.IdleStateEvent;
  13. import io.netty.handler.timeout.IdleStateHandler;
  14. import io.netty.util.concurrent.GlobalEventExecutor;
  15. import lombok.extern.slf4j.Slf4j;
  16. import java.util.concurrent.TimeUnit;
  17. /** * 启动服务端服务 * 需要开启一个新的线程来执行netty 服务端 或者客户端 * ThreadUtil.execute(() -> { * Server.initServer(); * Client.initClient(); * }); */
  18. @Slf4j
  19. public class Server {
  20. /** * 装有所有客户端channel的组 */
  21. private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  22. static class ServerHeader extends SimpleChannelInboundHandler<String> {
  23. @Override
  24. protected void channelRead0(ChannelHandlerContext ctx, String msg) {
  25. Channel channel = ctx.channel();
  26. //循环channel组,判断是不是其它客户端发送的消息
  27. channelGroup.stream().forEach(ch -> {
  28. if (channel != ch) {
  29. ch.writeAndFlush(channel.remoteAddress() + "---发送的消息为:" + msg + "\n");
  30. } else {
  31. ch.writeAndFlush("【自己的消息】" + msg + "\n");
  32. }
  33. });
  34. }
  35. /** * 用户事件触发方法 判断事件 * * @param ctx 上下文对象 * @param evt 事件对象 */
  36. @Override
  37. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
  38. //判断该事件是否为超时事件
  39. if (evt instanceof IdleStateEvent) {
  40. IdleStateEvent event = (IdleStateEvent) evt;
  41. String type;
  42. //判断具体事件 具体项目可以根据不同情况进行不同处理
  43. switch (event.state()) {
  44. case READER_IDLE:
  45. type = "读空闲";
  46. break;
  47. case WRITER_IDLE:
  48. type = "写空闲";
  49. break;
  50. default:
  51. type = "读写空闲";
  52. }
  53. log.info("%s==超时事件==%s\n", ctx.channel().remoteAddress().toString(), type);
  54. ctx.channel().close();
  55. }
  56. }
  57. @Override
  58. public void handlerAdded(ChannelHandlerContext ctx) {
  59. Channel channel = ctx.channel();
  60. //通知其它客户端
  61. channelGroup.writeAndFlush("【服务端:】" + channel.remoteAddress() + "加入\n");
  62. channelGroup.add(channel);
  63. }
  64. @Override
  65. public void channelActive(ChannelHandlerContext ctx) {
  66. Channel channel = ctx.channel();
  67. log.info("【客户端:】" + channel.remoteAddress() + "上线\n");
  68. }
  69. @Override
  70. public void channelInactive(ChannelHandlerContext ctx) {
  71. Channel channel = ctx.channel();
  72. log.info("【客户端:】" + channel.remoteAddress() + "下线\n");
  73. }
  74. @Override
  75. public void channelUnregistered(ChannelHandlerContext ctx) {
  76. Channel channel = ctx.channel();
  77. channelGroup.writeAndFlush("【客户端:】" + channel.remoteAddress() + "离开\n");
  78. //此处不用从组中一处,掉线之后netty会自动的从组中移除
  79. }
  80. }
  81. static class ServerInit extends ChannelInitializer<SocketChannel> {
  82. @Override
  83. protected void initChannel(SocketChannel socketChannel) {
  84. ChannelPipeline pipeline = socketChannel.pipeline();
  85. //添加处理器,netty通过解决不同的情景来添加不同的处理器
  86. pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()))
  87. .addLast(new StringDecoder())
  88. .addLast(new StringEncoder())
  89. .addLast(new IdleStateHandler(100,200,300, TimeUnit.SECONDS))
  90. .addLast(new ServerHeader());
  91. }
  92. }
  93. /** * 初始化 */
  94. public static void initServer(){
  95. EventLoopGroup bossGroup = new NioEventLoopGroup();
  96. EventLoopGroup workerGroup = new NioEventLoopGroup();
  97. try {
  98. ServerBootstrap serverBootstrap = new ServerBootstrap();
  99. serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new ServerInit());
  100. // 端口可以自定义
  101. ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
  102. channelFuture.channel().closeFuture().sync();
  103. } catch (Exception e){
  104. log.error("初始化失败:{}",e);
  105. } finally {
  106. bossGroup.shutdownGracefully();
  107. workerGroup.shutdownGracefully();
  108. }
  109. }
  110. }

二、客户端:

  1. import cn.hutool.cron.CronUtil;
  2. import cn.hutool.cron.task.Task;
  3. import io.netty.bootstrap.Bootstrap;
  4. import io.netty.channel.*;
  5. import io.netty.channel.nio.NioEventLoopGroup;
  6. import io.netty.channel.socket.SocketChannel;
  7. import io.netty.channel.socket.nio.NioSocketChannel;
  8. import io.netty.handler.codec.DelimiterBasedFrameDecoder;
  9. import io.netty.handler.codec.Delimiters;
  10. import io.netty.handler.codec.string.StringDecoder;
  11. import io.netty.handler.codec.string.StringEncoder;
  12. import lombok.extern.slf4j.Slf4j;
  13. import java.io.BufferedReader;
  14. import java.io.ByteArrayInputStream;
  15. import java.io.IOException;
  16. import java.io.InputStreamReader;
  17. /** * 启动客户端服务 * 需要开启一个新的线程来执行netty 服务端 或者客户端 * ThreadUtil.execute(() -> { * Server.initServer(); * Client.initClient(); * }); */
  18. @Slf4j
  19. public class Client {
  20. static class ClientHeader extends SimpleChannelInboundHandler<String> {
  21. @Override
  22. protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) {
  23. log.info("服务端消息:" + msg);
  24. }
  25. }
  26. static class ClientInit extends ChannelInitializer<SocketChannel> {
  27. @Override
  28. protected void initChannel(SocketChannel socketChannel) {
  29. ChannelPipeline pipeline = socketChannel.pipeline();
  30. //添加处理器,netty通过解决不同的情景来添加不同的处理器
  31. pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()))
  32. .addLast(new StringDecoder())
  33. .addLast(new StringEncoder())
  34. .addLast(new ClientHeader());
  35. }
  36. }
  37. /** * 初始化客户端 */
  38. public static void initClient(){
  39. EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
  40. try {
  41. Bootstrap bootstrap = new Bootstrap();
  42. bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ClientInit());
  43. //获得channel对象
  44. Channel channel = bootstrap.connect("localhost", 8899).sync().channel();
  45. // 支持秒级别定时任务
  46. CronUtil.setMatchSecond(true);
  47. CronUtil.start();
  48. CronUtil.schedule("*/2 * * * * *", (Task) () -> {
  49. String str = String.valueOf(System.currentTimeMillis());
  50. BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(str.getBytes())));
  51. try {
  52. channel.writeAndFlush(br.readLine() + "\r\n");
  53. } catch (IOException e) {
  54. log.error("通道已关闭:{}",e);
  55. }
  56. });
  57. } catch (Exception e) {
  58. log.info("启动客户端失败:{}",e);
  59. } finally {
  60. eventLoopGroup.shutdownGracefully();
  61. }
  62. }
  63. }

三、springboot方式启动:

  1. import cn.hutool.core.thread.ThreadUtil;
  2. import com.imooc.stream.utils.Client;
  3. import com.imooc.stream.utils.Server;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.boot.CommandLineRunner;
  6. import org.springframework.boot.SpringApplication;
  7. import org.springframework.boot.autoconfigure.SpringBootApplication;
  8. import org.springframework.boot.builder.SpringApplicationBuilder;
  9. import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
  10. @Slf4j
  11. @SpringBootApplication
  12. public class StreamApplication extends SpringBootServletInitializer implements CommandLineRunner {
  13. public static void main(String[] args) {
  14. SpringApplication.run(StreamApplication.class, args);
  15. }
  16. /** * 支持使用外置的tomcat启动 * @param application * @return */
  17. @Override
  18. protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
  19. log.info("启动加载自定义的ServletInitializer");
  20. return application.sources(StreamApplication.class);
  21. }
  22. @Override
  23. public void run(String... args) {
  24. log.info("启动成功, 同时启动netty服务");
  25. //需要开启一个新的线程来执行netty 服务端 或者客户端
  26. ThreadUtil.execute(() -> {
  27. Server.initServer();
  28. Client.initClient();
  29. });
  30. }
  31. }

发表评论

表情:
评论列表 (有 0 条评论,59人围观)

还没有评论,来说两句吧...

相关阅读

    相关 消息队列

    一、什么是幂等性 可以参考数据库乐观锁机制,比如执行一条更新库存的 SQL 语句,在并发场景,为了性能和数据可靠性,会在更新时加上查询时的版本,并且更新这个版本信息。可能你

    相关

    幂等性 1.什么是幂等性? 对于同一笔业务操作,不管调用多少次,得到的结果都是一样的。 2.幂等性技术方案 1.悲观锁 获取数据的时候加锁获取。sel

    相关 实现 -接口

    接口幂等性 1.什么是幂等性 > 对于同一笔业务操作,不管调用多少次,得到的结果都是一样的。 > 也就是方法调用一次和调用多次产生的额外效果是相同的,他就具有幂

    相关

    老婆问了个问题,什么是“幂等性”?这个问题,从现象上好解释,例如今儿是618大促,购物车添加了丰富的商品,满心欢喜地点击了支付按钮,支付成 功了,但是返回的时候网络异常,不知道

    相关

    HTTP/1.1中对幂等性的定义是:一次和多次请求某一个资源对于资源本身应该具有同样的结果(网络超时等问题除外)。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影