MQ消息中间件 --》RocketMQ --》RocketMQ实战异常汇总 目录 No route info of this topic, xxxTopic CODE: 1 DESC: The broker does not support consumer to filter message by SQL92 MQ控制台查询%DLQ%Topic消息异常 broker配置文件增加aclEnable=true配置开启acl权限认证,MQ控制台Cluster模块异常 CODE: 208 DESC: query message by key finished, but no message. [10015:signature-failed] unable to calculate a request signature. Algorithm HmacSHA1 not available CODE: 2 DESC: [TIMEOUT_CLEAN_QUEUE] broker busy, start flow control for a while, period in queue: 203ms, size of queue: 2 No route info of this topic, xxxTopic 按照gitHub上rocketMQ提供的Demo进行生产者发送消息到MQ,出现下面错误 错误原因:没有创建主题,主题不存在,导致MQ路由不到此主题。 解决方案:通过控制台,来创建主题。 还有就是可以在启动broker的时候设定 自动创建Topic nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true & 不建议在生产环境这样配置,(弊端待补充)。 CODE: 1 DESC: The broker does not support consumer to filter message by SQL92 错误原因:目前代理不支持消费者通过SQL92过滤消息 解决方案:在broker的配置文件添加 enablePropertyFilter = true 来支持SQL92方式过滤消息 MQ控制台查询%DLQ%Topic消息异常 模拟消息重试,16次之后消息不在消费,消息已经进入死信队列,但想通过控制台查询死信队列里有没有该消息,输入条件之后 SEARCH 出现异常: { "status": -1, "data": null, "errMsg": "org.apache.rocketmq.client.exception.MQClientException: Can not find Message Queue for this topic, %DLQ%CG_RDATA_CLINICAL_NCT\nSee http://rocketmq.apache.org/docs/faq/ for further details." } 根据异常信息发现是“未找到此主题的消息队列” 在服务端执行命令:sh mqadmin topicList -n localhost:9876,看该topic是否存在,不存在的话那就是服务端没有自动创建死信队列的topic 如果存在,查看该topic的路由信息,执行命令:sh mqadmin topicRoute -n localhost:9876 -t %DLQ%你的死信队列TOPIC 如果看到"perm":2,那就有问题。【至于这个"perm":2 和 "perm":6 是一个权限的问题。】 如果是"perm":2那就改一下执行命令:sh mqadmin updateTopic -b localhost:10911 -n localhost:9876 -t %DLQ%你的死信队列TOPIC -p 6 如果以上情况都不是,那就在搜索引擎下吧。 broker配置文件增加aclEnable=true配置开启acl权限认证,MQ控制台Cluster模块异常 ACL (Access Control List 访问控制列表) Caused by: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 1 DESC: org.apache.rocketmq.acl.common.AclException: request's extFields value is null,org.apache.rocketmq.acl.plain.PlainAccessValidator.parse(PlainAccessValidator.java:57) 具体抛出异常位置的地方 通过下面几行代码就知道了这个ExtFields里面装载是ACCESS_KEY和签名什么的。 在Broker项目下的BrokerController这个类里的initialAcl就是来初始化验证的位置。 MQ控制台是两年前的一个项目,而acl认证是MQ最近4.4.0版本出来的,不排除控制台这边没有进行对其支持。 具体分析: MQ-console开发人员对MQAdmin做了一个切面,控制台上有些模块操作需要读到broker。 通过这个切面发现不单单是cluster这一块,读topic列表时,也到了这个切面。 看下这个切面里的具体做法,黄色线标识区域就是初始化MQAdmin实例的。 就延伸到这个方法里了,来实例化DefaultMQAdminExt,去操作服务端. DefaultMQAdminExt具体的构造函数,提供了权限访问的构造方法。可以发现,在控制台这边没有用带RPCHOOK的构造函数。 哪只要把实例化DefaultMQAdminExt的代码修改一下就好了。 Before change: DefaultMQAdminExt defaultMQAdminExt; if (timeoutMillis > 0) { defaultMQAdminExt = new DefaultMQAdminExt(timeoutMillis); }else { defaultMQAdminExt = new DefaultMQAdminExt(); } After change: DefaultMQAdminExt defaultMQAdminExt; if (timeoutMillis > 0) { defaultMQAdminExt = new DefaultMQAdminExt(getAclRPCHook(),timeoutMillis); }else { defaultMQAdminExt = new DefaultMQAdminExt(getAclRPCHook()); } static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY)); } ACL_ACCESS_KEY,ACL_SECRET_KEY需要定义一下,写到配置文件,然后从配置文件读取。 最好是把控制台上的所有功能点,都点一遍 看有什么异常,差不多服务端开了acl之后 控制台上有部分功能会异常。 CODE: 208 DESC: query message by key finished, but no message. 在做demo时,服务端都是默认配置,没有aclEnable=true这一项,MQ客户端里的功能都可以正常使用。 当服务端配置加上aclEnable=true,控制台一些功能就异常了...这个就是问题原因所在。MQ控制台是两年前的一个项目,而acl认证是MQ最近4.4.0版本出来的,不排除控制台这边没有进行对其支持更新迭代什么的。 其他的异常问题基本上都是实例化时调用了没有RPCHook的构造函数,只需要在实例化的时候传进去就可以了。但是在消息和消息轨迹这块,消息列表可以查询出来,但是去看消息的具体信息,却还是异常。 MQ版本4.5.0,Console版本1.0。 原因分析: 客户端这边基本上没有什么太多具体实现。rocketmq-client模块基本上都已经实现好了,MQ控制台这边只是对其一个调用。 具体都是调用到了MQAdminImpl、MQAdminAPIImpl但对于消息的具体实现查询还是在MQAdminAPIImpl里的viewMessage、queryMessage方法。基本上操作服务端的接口都在这个实现类里面了。 这个就是具体抛出异常的位置。 先分析viewMessage MQ控制台上消息模块下:TOPIC、MESSAGE KEY、MESSAGE ID去查看消息详情的时候都是调用的viewMessage方法,包括消息轨迹下Message ID 搜索消息。 MQAdmin接口上已经明确指出是要通过offsetMsgId查询消息的。 继续分析下queryMessage,看下具体代码实现 protected QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end, boolean isUniqKey) throws MQClientException, InterruptedException { // 根据主题查询出主题的详细路由信息 TopicRouteData topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic); if (null == topicRouteData) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic); } // 主题路由信息不为空,说明此主题存在,否则抛出异常 if (topicRouteData != null) { // 取出主题所在的broker地址 List<String> brokerAddrs = new LinkedList<String>(); for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { String addr = brokerData.selectBrokerAddr(); if (addr != null) { brokerAddrs.add(addr); } } // broker地址不为空 if (!brokerAddrs.isEmpty()) { final CountDownLatch countDownLatch = new CountDownLatch(brokerAddrs.size()); final List<QueryResult> queryResultList = new LinkedList<QueryResult>(); final ReadWriteLock lock = new ReentrantReadWriteLock(false); // 遍历broker地址去请求查询 for (String addr : brokerAddrs) { try { // 组装请求头 QueryMessageRequestHeader requestHeader = new QueryMessageRequestHeader(); requestHeader.setTopic(topic); requestHeader.setKey(key); requestHeader.setMaxNum(maxNum); requestHeader.setBeginTimestamp(begin); requestHeader.setEndTimestamp(end); /** * 具体查询实现实在 rocketmq/remoting/netty/NettyRemotingClient.java invokeAsync方法 * 这里回调了一个线程 */ this.mQClientFactory.getMQClientAPIImpl().queryMessage(addr, requestHeader, timeoutMillis * 3, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { try { /** * 查询回调,开启aclEnable之后,所执行的查询这里返回的response.getCode都是1=SYSTEM_ERROR * 而进行验证的位置是:rocketmq/broker/BrokerController.java 里的 initialAcl方法 * 具体实现在:rocketmq/acl/plain/PlainPermissionLoader.java 里的 validate 方法 * Check the signature 时候抛出的异常 */ RemotingCommand response = responseFuture.getResponseCommand(); if (response != null) { switch (response.getCode()) { case ResponseCode.SUCCESS: { ...省略部分代码 List<MessageExt> wrappers = MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true); // 取出查询的消息 QueryResult qr = new QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers); try { lock.writeLock().lock(); queryResultList.add(qr); } finally { lock.writeLock().unlock(); } break; } default: log.warn("getResponseCommand failed, {} {}", response.getCode(), response.getRemark()); break; } } else { log.warn("getResponseCommand return null"); } } finally { countDownLatch.countDown(); } } }, isUniqKey); } catch (Exception e) { log.warn("queryMessage exception", e); } } boolean ok = countDownLatch.await(timeoutMillis * 4, TimeUnit.MILLISECONDS); if (!ok) { log.warn("queryMessage, maybe some broker failed"); } long indexLastUpdateTimestamp = 0; List<MessageExt> messageList = new LinkedList<MessageExt>(); // 遍历消息列表 for (QueryResult qr : queryResultList) { if (qr.getIndexLastUpdateTimestamp() > indexLastUpdateTimestamp) { indexLastUpdateTimestamp = qr.getIndexLastUpdateTimestamp(); } // 遍历每一个消息 for (MessageExt msgExt : qr.getMessageList()) { // 是否通过uniqkey去查询消息 if (isUniqKey) { ...省略部分代码 } else { ...省略部分代码 } } } } // 最终返回消息体 if (!messageList.isEmpty()) { return new QueryResult(indexLastUpdateTimestamp, messageList); } else { throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by key finished, but no message."); } } } throw new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "The topic[" + topic + "] not matched route info"); } 但只是对消息查询结果的一个封装,而具体实现还是得从底层实现去看。 继续深入remotingClient.invokeAsync()的实现,在具体实现里面会发现有这么一行代码需要执行 doBeforeRpcHooks() 这个就是在调用之前进行的一个权限钩子的组装。 为什么要看这个呢?因为通过响应的错误信息得知是签名认证失败了,所以要深入去看一下在调用前对这个rpcHooks做的事情。 具体的异常信息看下: 2019-07-12 12:41:06 ERROR QueryMessageThread_7 - process request exception org.apache.rocketmq.acl.common.AclException: Check signature failed for accessKey=rocketmq2 at org.apache.rocketmq.acl.plain.PlainPermissionLoader.validate(PlainPermissionLoader.java:210) ~[rocketmq-acl-4.5.0.jar:4.5.0] at org.apache.rocketmq.acl.plain.PlainAccessValidator.validate(PlainAccessValidator.java:132) ~[rocketmq-acl-4.5.0.jar:4.5.0] at org.apache.rocketmq.broker.BrokerController$10.doBeforeRequest(BrokerController.java:510) ~[rocketmq-broker-4.5.0.jar:4.5.0] at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.doBeforeRpcHooks(NettyRemotingAbstract.java:172) ~[rocketmq-remoting-4.5.0.jar:4.5.0] at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$1.run(NettyRemotingAbstract.java:202) ~[rocketmq-remoting-4.5.0.jar:4.5.0] at org.apache.rocketmq.remoting.netty.RequestTask.run(RequestTask.java:80) [rocketmq-remoting-4.5.0.jar:4.5.0] 看下客户端签名具体做法: public void doBeforeRequest(String remoteAddr, RemotingCommand request) { byte[] total = AclUtils.combineRequestContent(request, parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken())); String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey()); request.addExtField(SIGNATURE, signature); request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey()); // The SecurityToken value is unneccessary,user can choose this one. if (sessionCredentials.getSecurityToken() != null) { request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken()); } } // 从request.readCustomHeader()取出属性值,组装加密map protected SortedMap<String, String> parseRequestContent(RemotingCommand request, String ak, String securityToken) { CommandCustomHeader header = request.readCustomHeader(); // Sort property SortedMap<String, String> map = new TreeMap<String, String>(); map.put(ACCESS_KEY, ak); if (securityToken != null) { map.put(SECURITY_TOKEN, securityToken); } ...省略部分代码... return map; } catch (Exception e) { throw new RuntimeException("incompatible exception.", e); } } 等签名加密做完之后,在真正调用服务器端前会把request.readCustomHeader()里的属性值给添加到request的ExtFileds扩展区里。而服务端的加密方式直接取出扩展区内容进行签名加密 服务器端的签名生成的Map // Content SortedMap<String, String> map = new TreeMap<String, String>(); for (Map.Entry<String, String> entry : request.getExtFields().entrySet()) { if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) { map.put(entry.getKey(), entry.getValue()); } } accessResource.setContent(AclUtils.combineRequestContent(request, map)); 最终在签名校验时不一致,抛出异常。 // Check the signature String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey()); if (!signature.equals(plainAccessResource.getSignature())) { throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey())); } 具体原因就是在queryMessage,往request扩展区字段添加了UNIQUE_MSG_QUERY_FLAG属性值,客户端生成签名时没有将这个字段取出进行做签名生成,而服务端生成签名时扩展区字段全部取出进行签名的生成。 其实说白了就是两端签名生成的取值范围不一致,而导致的。等追查到问题的具体原因之后在github上Apache·rocketMQ的Issuse上发现了同样的问题。 Issuse地址:https://github.com/apache/rocketmq/issues/1216 分析的思路都很相似。 签名不一致的问题修复:将rocketmq/acl/AclClientRPCHook.java parseRequestContent()的方法调整: 然后将acl这个模块单独打成jar,放到rocketmq-console项目里。pom先需要排除别的jar对acl的引用。 消息ID查询问题修复:基本上签名通过之后控制台上消息这个模块都可以正常使用了。还只有消息轨迹的调用有问题,需要调整控制台里的代码实现 消息轨迹这里的查看详情,也是同样,方法的具体调用不对,只需要将控制台的这个查询方法重写下,调用到真正的key查询即可。 针对消息轨迹这里。可以查到消息,但消息的详情看不到的问题重申下: 这个是消息跟踪详情的调用controller层,其实去查询不是你控制台上的topic而是先查找你有么有自定义消息跟踪的topic,如果没有就使用系统默认的 然后根据消息的ID或者key去你自定义的消息跟踪topic或者系统默认的消息跟踪topic查询消息。查询不到就会出现控制台的异常,什么by key no message。查不到的原因就是因为在消息发送或者消费消息的时候往消息跟踪topic写消息没写入进去,而写不进去消息,就是服务端开启了权限,往消息跟踪topic写消息的时候异常了。而这个异常没有具体被抛出来,是为了不影响主消息的发送。具体分析看我博客《RocketMQ进阶(一)》里对生产者发消息的一个刨根问底。 我这里出现问题的原因就是服务端开启了aclEnable=true 在配置conf/plain_acl.yml 文件的时候,对于默认的topic没有配置权限,默认是DENY拒绝的。要配置为PUB|SUB权限。 [10015:signature-failed] unable to calculate a request signature. Algorithm HmacSHA1 not available 算法HmacSHA1不可用。rocketMQ版本4.5.0 console版本 1.0.0 同样broker开启 aclEnable=true,服务部署启动之后,通过MQ控制台访问会出现这么一个异常。 搜索引擎之后发现和github上apache/rocketmq Issuse 1134问题一样。 按照PR上的修改,改了之后重启mq服务端,发现没有效果,还是同样的问题。 挣扎了两天终于在下班前搞定了。 直接说解决方案吧 进入到rocketmq安装目录 bin 目录同级 执行 echo $JAVA_HOME 如果没有输出结果 配置JDK环境变量,直到在你rocketmq安装目录下执行 echo $JAVA_HOME 有输出结果为止! CODE: 2 DESC: [TIMEOUT_CLEAN_QUEUE] broker busy, start flow control for a while, period in queue: 203ms, size of queue: 2 在做对MQ压测的时候,所发生的异常,在一开始是用jmeter进行http请求压测的,会受到一个tomcat连接数的限制,50个线程每个线程1000,MQ的TPS达到3000+,无异常。这样可能压测不出来什么,故用了一段代码进行压测。 压测代码 public String syncSendMsg2MQBatch(){ ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { Thread myThread = new Thread(new Runnable() { @Override public void run() { try { // 发送目的地 TOPIC:TAG String destination = topic + ":" + tag; for (int i = 0; i < 10000; i++) { // 发送的消息.消息体必须存在 String msg = "Hello World " + i + " 》" + LocalDateTime.now(); // 业务主键作为消息key String key = i+""; // 构建消息 Message message = MessageBuilder.withPayload(msg) .setHeader(RocketMQHeaders.KEYS, key) .build(); SendResult sendResult = rocketMQTemplate.syncSend(destination, message); if(sendResult.getSendStatus() == SendStatus.SEND_OK){ System.out.println(sendResult.getMsgId() + " : " + sendResult.getSendStatus()); } } } catch (Exception e) { e.printStackTrace(); } } }); executorService.execute(myThread); } return " 》》Send Msg Success 》》"; } 5个线程,每个线程1000,MQ的TPS最多900+就结束了。 调大每个线程执行的数为10000,控制台打印出异常。 机器配置:M:8核16G S:32核128G JVM配置:-server -Xms1g -Xmx1g -Xmn512m 至于发生的原因自行搜索引擎一下,解释的很多也很详细。 推荐:https://blog.csdn.net/prestigeding/article/details/92800672 解决方案:https://stackoverflow.com/questions/47749906/rocketmq-throw-exception-timeout-clean-queuebroker-busy-start-flow-control-f broker配置新增或者修改 sendMessageThreadPoolNums=32 useReentrantLockWhenPutMessage=true 在查询MQ文档时发现的,应该会更好的解释上面两个参数的调整了吧。 sendMessageThreadPoolNums=16 5个线程,单个线程10000 没问题。 线程增加到10个就会出现异常。 至于这些参数配置,还是要经过一系列的压测之后 得到一个与机器相吻合的一个参数值。 ———————————————— 版权声明:本文为CSDN博主「「已注销」」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/weixin_43439073/article/details/95746775 |
|手机版|小黑屋|梦想之都-俊月星空
( 粤ICP备18056059号 )|网站地图
GMT+8, 2025-7-1 18:54 , Processed in 0.038593 second(s), 19 queries .
Powered by Mxzdjyxk! X3.5
© 2001-2025 Discuz! Team.