激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - RocketMQ獲取指定消息的實現方法(源碼)

RocketMQ獲取指定消息的實現方法(源碼)

2020-08-16 14:26貓毛·波拿巴 Java教程

這篇文章主要給大家介紹了關于RocketMQ獲取指定消息的實現方法,文中通過示例代碼介紹的非常詳細,對大家學習或者使用RocketMQ具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧

概要

消息查詢是什么?

消息查詢就是根據用戶提供的msgId從MQ中取出該消息

RocketMQ如果有多個節點如何查詢?

問題:RocketMQ分布式結構中,數據分散在各個節點,即便是同一Topic的數據,也未必都在一個broker上。客戶端怎么知道數據該去哪個節點上查?

猜想1:逐個訪問broker節點查詢數據

猜想2:有某種數據中心存在,該中心知道所有消息存儲的位置,只要向該中心查詢即可得到消息具體位置,進而取得消息內容

實際:

1.消息Id中含有消息所在的broker的地址信息(IP\Port)以及該消息在CommitLog中的偏移量。

2.客戶端實現會從msgId字符串中解析出broker地址,向指定broker節查詢消息。

問題:CommitLog文件有多個,只有偏移量估計不能確定在哪個文件吧?

實際:單個Broker節點內offset是全局唯一的,不是每個CommitLog文件的偏移量都是從0開始的。單個節點內所有CommitLog文件共用一套偏移量,每個文件的文件名為其第一個消息的偏移量。所以可以根據偏移量和文件名確定CommitLog文件。

源碼閱讀

0.使用方式

MessageExt  msg = consumer.viewMessage(msgId);

1.消息ID解析

這個了解下就可以了

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class MessageId {
 private SocketAddress address;
 private long offset;
 
 public MessageId(SocketAddress address, long offset) {
  this.address = address;
  this.offset = offset;
 }
 
 //get-set
}
 
//from MQAdminImpl.java
public MessageExt viewMessage(
 String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
 
 MessageId messageId = null;
 try {
  //從msgId字符串中解析出address和offset
  //address = ip:port
  //offset為消息在CommitLog文件中的偏移量
  messageId = MessageDecoder.decodeMessageId(msgId);
 } catch (Exception e) {
  throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message.");
 }
 return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
  messageId.getOffset(), timeoutMillis);
}
 
//from MessageDecoder.java
public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
 SocketAddress address;
 long offset;
 //ipv4和ipv6的區別
 //如果msgId總長度超過32字符,則為ipv6
 int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2;
 
 byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength));
 byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8));
 ByteBuffer bb = ByteBuffer.wrap(port);
 int portInt = bb.getInt(0);
 address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);
 
 // offset
 byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16));
 bb = ByteBuffer.wrap(data);
 offset = bb.getLong(0);
 
 return new MessageId(address, offset);
}

2.長連接客戶端RPC實現

要發請求首先得先建立連接,這里方法可以看到創建連接相關的操作。值得注意的是,第一次訪問的時候可能連接還沒建立,建立連接需要消耗一段時間。代碼中對這個時間也做了判斷,如果連接建立完成后,發現已經超時,則不再發出請求。目的應該是盡可能減少請求線程的阻塞時間。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//from NettyRemotingClient.java
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
 throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
 long beginStartTime = System.currentTimeMillis();
 //這里會先檢查有無該地址的通道,有則返回,無則創建
 final Channel channel = this.getAndCreateChannel(addr);
 if (channel != null && channel.isActive()) {
  try {
   //前置鉤子
   doBeforeRpcHooks(addr, request);
   //判斷通道建立完成時是否已到達超時時間,如果超時直接拋出異常。不發請求
   long costTime = System.currentTimeMillis() - beginStartTime;
   if (timeoutMillis < costTime) {
    throw new RemotingTimeoutException("invokeSync call timeout");
   }
   //同步調用
   RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
   //后置鉤子
   doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response); //后置鉤子
   return response;
  } catch (RemotingSendRequestException e) {
   log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
   this.closeChannel(addr, channel);
   throw e;
  } catch (RemotingTimeoutException e) {
   if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
    this.closeChannel(addr, channel);
    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
   }
   log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
   throw e;
  }
 } else {
  this.closeChannel(addr, channel);
  throw new RemotingConnectException(addr);
 }
}

下一步看看它的同步調用做了什么處理。注意到它會構建一個Future對象加入待響應池,發出請求報文后就掛起線程,然后等待喚醒(waitResponse內部使用CountDownLatch等待)。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//from NettyRemotingAbstract.javapublic RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
 final long timeoutMillis)
 throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
 //請求id
 final int opaque = request.getOpaque();
 
 try {
  //請求存根
  final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
  //加入待響應的請求池
  this.responseTable.put(opaque, responseFuture);
  final SocketAddress addr = channel.remoteAddress();
  //將請求發出,成功發出時更新狀態
  channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
   @Override
   public void operationComplete(ChannelFuture f) throws Exception {
    if (f.isSuccess()) { //若成功發出,更新請求狀態為“已發出”
     responseFuture.setSendRequestOK(true);
     return;
    } else {
     responseFuture.setSendRequestOK(false);
    }
 
    //若發出失敗,則從池中移除(沒用了,釋放資源)
    responseTable.remove(opaque);
    responseFuture.setCause(f.cause());
    //putResponse的時候會喚醒等待的線程
    responseFuture.putResponse(null);
    log.warn("send a request command to channel <" + addr + "> failed.");
   }
  });
 
  //只等待一段時間,不會一直等下去
  //若正常響應,則收到響應后,此線程會被喚醒,繼續執行下去
  //若超時,則到達該時間后線程蘇醒,繼續執行
  RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
  if (null == responseCommand) {
   if (responseFuture.isSendRequestOK()) {
    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
     responseFuture.getCause());
   } else {
    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
   }
  }
 
  return responseCommand;
 } finally {
  //正常響應完成時,將future釋放(正常邏輯)
  //超時時,將future釋放。這個請求已經作廢了,后面如果再收到響應,就可以直接丟棄了(由于找不到相關的響應鉤子,就不處理了)
  this.responseTable.remove(opaque);
 }
}

好,我們再來看看收到報文的時候是怎么處理的。我們都了解JDK中的Future的原理,大概就是將這個任務提交給其他線程處理,該線程處理完畢后會將結果寫入到Future對象中,寫入時如果有線程在等待該結果,則喚醒這些線程。這里也差不多,只不過執行線程在服務端,服務執行完畢后會將結果通過長連接發送給客戶端,客戶端收到后根據報文中的ID信息從待響應池中找到Future對象,然后就是類似的處理了。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
 
 //底層解碼完畢得到RemotingCommand的報文
 @Override
 protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
  processMessageReceived(ctx, msg);
 }
}
 
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
 final RemotingCommand cmd = msg;
 if (cmd != null) {
  //判斷類型
  switch (cmd.getType()) {
   case REQUEST_COMMAND:
    processRequestCommand(ctx, cmd);
    break;
   case RESPONSE_COMMAND:
    processResponseCommand(ctx, cmd);
    break;
   default:
    break;
  }
 }
}
 
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
 //取得消息id
 final int opaque = cmd.getOpaque();
 //從待響應池中取得對應請求
 final ResponseFuture responseFuture = responseTable.get(opaque);
 if (responseFuture != null) {
  //將響應值注入到ResponseFuture對象中,等待線程可從這個對象獲取結果
  responseFuture.setResponseCommand(cmd);
  //請求已處理完畢,釋放該請求
  responseTable.remove(opaque);
 
  //如果有回調函數的話則回調(由當前線程處理)
  if (responseFuture.getInvokeCallback() != null) {
   executeInvokeCallback(responseFuture);
  } else {
   //沒有的話,則喚醒等待線程(由等待線程做處理)
   responseFuture.putResponse(cmd);
   responseFuture.release();
  }
 } else {
  log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
  log.warn(cmd.toString());
 }
}

總結一下,客戶端的處理時序大概是這樣的:

RocketMQ獲取指定消息的實現方法(源碼)

結構大概是這樣的:

RocketMQ獲取指定消息的實現方法(源碼)

3.服務端的處理

//todo 服務端待補充CommitLog文件映射相關內容

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
 
  @Override
  protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    processMessageReceived(ctx, msg);
  }
}
 
//from NettyRemotingAbscract.java
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
  final RemotingCommand cmd = msg;
  if (cmd != null) {
    switch (cmd.getType()) {
      case REQUEST_COMMAND: //服務端走這里
        processRequestCommand(ctx, cmd);
        break;
      case RESPONSE_COMMAND:
        processResponseCommand(ctx, cmd);
        break;
      default:
        break;
    }
  }
}
 
//from NettyRemotingAbscract.java
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
  //查看有無該請求code相關的處理器
  final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
  //如果沒有,則使用默認處理器(可能沒有默認處理器)
  final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
  final int opaque = cmd.getOpaque();
 
  if (pair != null) {
    Runnable run = new Runnable() {
      @Override
      public void run() {
        try {
          doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
          final RemotingResponseCallback callback = new RemotingResponseCallback() {
            @Override
            public void callback(RemotingCommand response) {
              doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
              if (!cmd.isOnewayRPC()) {
                if (response != null) { //不為null,則由本類將響應值寫會給請求方
                  response.setOpaque(opaque);
                  response.markResponseType();
                  try {
                    ctx.writeAndFlush(response);
                  } catch (Throwable e) {
                    log.error("process request over, but response failed", e);
                    log.error(cmd.toString());
                    log.error(response.toString());
                  }
                } else { //為null,意味著processor內部已經將響應處理了,這里無需再處理。
                }
              }
            }
          };
          if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {//QueryMessageProcessor為異步處理器
            AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
            processor.asyncProcessRequest(ctx, cmd, callback);
          } else {
            NettyRequestProcessor processor = pair.getObject1();
            RemotingCommand response = processor.processRequest(ctx, cmd);
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
            callback.callback(response);
          }
        } catch (Throwable e) {
          log.error("process request exception", e);
          log.error(cmd.toString());
 
          if (!cmd.isOnewayRPC()) {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
              RemotingHelper.exceptionSimpleDesc(e));
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
          }
        }
      }
    };
 
    if (pair.getObject1().rejectRequest()) {
      final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
        "[REJECTREQUEST]system busy, start flow control for a while");
      response.setOpaque(opaque);
      ctx.writeAndFlush(response);
      return;
    }
 
    try {
      final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
      pair.getObject2().submit(requestTask);
    } catch (RejectedExecutionException e) {
      if ((System.currentTimeMillis() % 10000) == 0) {
        log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
          + ", too many requests and system thread pool busy, RejectedExecutionException "
          + pair.getObject2().toString()
          + " request code: " + cmd.getCode());
      }
 
      if (!cmd.isOnewayRPC()) {
        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
          "[OVERLOAD]system busy, start flow control for a while");
        response.setOpaque(opaque);
        ctx.writeAndFlush(response);
      }
    }
  } else {
    String error = " request type " + cmd.getCode() + " not supported";
    final RemotingCommand response =
      RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
    response.setOpaque(opaque);
    ctx.writeAndFlush(response);
    log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
  }
}
 
//from QueryMessageProcesor.java
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
  throws RemotingCommandException {
  switch (request.getCode()) {
    case RequestCode.QUERY_MESSAGE:
      return this.queryMessage(ctx, request);
    case RequestCode.VIEW_MESSAGE_BY_ID: //通過msgId查詢消息
      return this.viewMessageById(ctx, request);
    default:
      break;
  }
 
  return null;
}
 
public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)
  throws RemotingCommandException {
  final RemotingCommand response = RemotingCommand.createResponseCommand(null);
  final ViewMessageRequestHeader requestHeader =
    (ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);
 
  response.setOpaque(request.getOpaque());
 
  //getMessagetStore得到當前映射到內存中的CommitLog文件,然后根據偏移量取得數據
  final SelectMappedBufferResult selectMappedBufferResult =
    this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
  if (selectMappedBufferResult != null) {
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
 
    //將響應通過socket寫回給客戶端
    try {
      //response對象的數據作為header
      //消息內容作為body
      FileRegion fileRegion =
        new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),
          selectMappedBufferResult);
      ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
          selectMappedBufferResult.release();
          if (!future.isSuccess()) {
            log.error("Transfer one message from page cache failed, ", future.cause());
          }
        }
      });
    } catch (Throwable e) {
      log.error("", e);
      selectMappedBufferResult.release();
    }
 
    return null; //如果有值,則直接寫回給請求方。這里返回null是不需要由外層處理響應。
  } else {
    response.setCode(ResponseCode.SYSTEM_ERROR);
    response.setRemark("can not find message by the offset, " + requestHeader.getOffset());
  }
 
  return response;
}

總結

到此這篇關于RocketMQ獲取指定消息的文章就介紹到這了,更多相關RocketMQ獲取指定消息內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!

原文鏈接:https://www.cnblogs.com/longfurcat/p/13510106.html

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 免费看黄色一级大片 | 欧美a视频在线观看 | 素人视频在线观看免费 | 日韩欧美色综合 | 久久久大片 | 中文字幕线观看 | 桥本有菜免费av一区二区三区 | 久久久久久久久久性 | 久久6国产 | 久久久久亚洲精品国产 | 一级毛片真人免费播放视频 | 精品国产91久久久 | 久久精品小短片 | 在线观看国产一区二区三区 | 久久国产乱子伦精品 | 有色视频在线观看 | 竹内纱里奈和大战黑人 | 天天草天天干天天射 | 久久久久久久久免费 | 91色成人 | 日本高清在线播放 | 国产精品免费一区二区三区四区 | 日韩视频在线观看免费视频 | 欧美精品一区自拍a毛片在线视频 | 欧美日韩高清在线观看 | 亚洲白嫩在线观看 | 国产三级国产精品国产普男人 | 久草影音 | 中文字幕在线观看视频一区 | 性明星video另类hd | 国产理论视频在线观看 | 宅男噜噜噜66国产免费观看 | 麻豆视频在线观看 | 久久久久中精品中文字幕19 | 久久情爱网 | 国产精品区一区二区三区 | 成人在线免费观看网址 | 91免费视频版 | 中文字幕视频在线播放 | 亚洲啪| 国产精品久久久久久久久久妇女 |