您好,登录后才能下订单哦!
这篇文章将为大家详细讲解有关RocketMQ中怎么实现权限控制,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
1、简单使用
ACL是access control list的简称,俗称访问控制列表。访问控制,基本上会涉及到用户、资源、权限、角色等概念,那在RocketMQ中上述会对应哪些对象呢?
用户:用户是访问控制的基础要素,RocketMQ ACL必然也会引入用户的概念,即支持用户名、密码。 资源:需要保护的对象,消息发送涉及的Topic、消息消费涉及的消费组,应该进行保护,故可以抽象成资源。 权限:针对资源,能进行的操作。 角色:RocketMQ中,只定义两种角色:是否是管理员。
acl默认的配置文件名:plain_acl.yml,需要放在${ROCKETMQ_HOME}/store/config目录下
需要使用acl必须在服务端开启此功能,在Broker的配置文件中配置,aclEnable = true开启此功能
配置plain_acl.yml文件
globalWhiteRemoteAddresses: - 10.10.15.* - 192.168.0.* accounts: - accessKey: RocketMQ secretKey: 12345678 whiteRemoteAddress: admin: false defaultTopicPerm: DENY defaultGroupPerm: SUB topicPerms: - topicA=DENY - topicB=PUB|SUB - topicC=SUB groupPerms: # the group should convert to retry topic - groupA=DENY - groupB=PUB|SUB - groupC=SUB - accessKey: rocketmq2 secretKey: 12345678 whiteRemoteAddress: 192.168.1.* # if it is admin, it could access all resources admin: true
下面我们介绍一下plain_acl.yml文件中相关的参数含义及使用
字段 | 取值 | 含义 |
---|---|---|
globalWhiteRemoteAddresses | *;192.168.*.*;192.168.0.1 | 全局IP白名单 |
accessKey | 字符串 | Access Key 用户名 |
secretKey | 字符串 | Secret Key 密码 |
whiteRemoteAddress | *;192.168.*.*;192.168.0.1 | 用户IP白名单 |
admin | true;false | 是否管理员账户 |
defaultTopicPerm | DENY;PUB;SUB;PUB|SUB | 默认的Topic权限 |
defaultGroupPerm | DENY;PUB;SUB;PUB|SUB | 默认的ConsumerGroup权限 |
topicPerms | topic=权限 | 各个Topic的权限 |
groupPerms | group=权限 | 各个ConsumerGroup的权限 |
权限标识符的含义
权限 | 含义 |
---|---|
DENY | 拒绝 |
ANY | PUB 或者 SUB 权限 |
PUB | 发送权限 |
SUB | 订阅权限 |
处理流程
特殊的请求例如 UPDATE_AND_CREATE_TOPIC 等,只能由 admin 账户进行操作;
对于某个资源,如果有显性配置权限,则采用配置的权限;如果没有显性配置权限,则采用默认的权限
RocketMQ的权限控制存储的默认实现是基于yml配置文件。用户可以动态修改权限控制定义的属性,而不需重新启动Broker服务节点
如果ACL与高可用部署(Master/Slave架构)同时启用,那么需要在Broker Master节点的${ROCKETMQ_HOME}/store/conf/plain_acl.yml配置文件中 设置全局白名单信息,即为将Slave节点的ip地址设置至Master节点plain_acl.yml配置文件的全局白名单中
public class AclProducer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name", getAclRPCHook()); producer.setNamesrvAddr("10.10.15.246:9876;10.10.15.247:9876"); producer.start(); for (int i = 0; i < 10; i++) { try { Message msg = new Message("topicA" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("RocketMQ","12345678")); } }
查看结果
报错提示topicA没有权限,我们在plain_acl.yml文件中配置的也确实是RocketMQ用户拒绝,生产消费topicA主题信息,我们改变主题为topicB,则发现发送消息成功,topicB=PUB|SUB设置的权限是生产消费都可以。
查看结果
public class AclConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupA", getAclRPCHook(),new AllocateMessageQueueAveragely()); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("topicB", "*"); consumer.setNamesrvAddr("10.10.15.246:9876;10.10.15.247:9876"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("RocketMQ","12345678")); } }
查看结果:发现没有任何消息被消费,也没有报错信息,对于RocketMQ用户topicB设置的就是可以可以生产可以消费的,但是我们发现其groupA=DENY是拒绝的,说明消费组是groupA则拒绝消费任何消息,我们改成groupB或者groupC查看结果。
Broker端ACL原理图
Broker服务启动时创建BrokerController并初始化initialize()时调用acl相关的初始化方法initialAcl()
private void initialAcl() { //broker配置文件中是否开启ACL功能,默认关闭 if (!this.brokerConfig.isAclEnable()) { log.info("The broker dose not enable acl"); return; } //获取权限访问校验器的列表,加载的META-INF/service/org.apache.rocketmq.acl.AccessValidator文件中指向 //org.apache.rocketmq.acl.plain.PlainAccessValidator,默认只有一个 List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); if (accessValidators == null || accessValidators.isEmpty()) { log.info("The broker dose not load the AccessValidator"); return; } for (AccessValidator accessValidator: accessValidators) { final AccessValidator validator = accessValidator; //注册服务端就的“钩子”对象,对权限进行校验 this.registerServerRPCHook(new RPCHook() { @Override public void doBeforeRequest(String remoteAddr, RemotingCommand request) { //Do not catch the exception validator.validate(validator.parse(request, remoteAddr)); } @Override public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) { } }); } }
源码中有相关的注解,我们查看一下注册registerServerRPCHook方法
public void registerServerRPCHook(RPCHook rpcHook) { //服务端的NettyRemotingServer服务注册“钩子”函数 getRemotingServer().registerRPCHook(rpcHook); this.fastRemotingServer.registerRPCHook(rpcHook); }
关于NettyRemotingServer服务和NettyRemotingClient服务配合使用,后面章节RocketMQ Remoting会重点分析
PlainAccessValidator.parse(),根据客户端不同的请求Code其需要的检验资源也不一样
switch (request.getCode()) { //发送消息需要校验当前的账户的topic是否具有PUB权限 case RequestCode.SEND_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB); break; case RequestCode.SEND_MESSAGE_V2: accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB); break; case RequestCode.CONSUMER_SEND_MSG_BACK: accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB); accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB); break; //拉取消息时需要知道该consumer账户下拉取的topic是否具有SUB权限,并且还要知道订阅组consumerGroup是否有sub权限 case RequestCode.PULL_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB); accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB); break; case RequestCode.QUERY_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB); break; case RequestCode.HEART_BEAT: HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); for (ConsumerData data : heartbeatData.getConsumerDataSet()) { accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB); for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) { accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB); } } break; case RequestCode.UNREGISTER_CLIENT: final UnregisterClientRequestHeader unregisterClientRequestHeader = (UnregisterClientRequestHeader) request .decodeCommandCustomHeader(UnregisterClientRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB); break; case RequestCode.GET_CONSUMER_LIST_BY_GROUP: final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader = (GetConsumerListByGroupRequestHeader) request .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB); break; case RequestCode.UPDATE_CONSUMER_OFFSET: final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader = (UpdateConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB); accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB); break; default: break; }
根据request.getCode()获取当前的操作需要的权限标识集合,供后面与系统的权限配置文件plain_acl.yml中的权限标识符校验时使用
Broker初始化相关服务的时候创建了PlainAccessValidator,我们发现其默认的构造方法中调用了其权限资源加载器PlainPermissionLoader
public PlainAccessValidator() { aclPlugEngine = new PlainPermissionLoader(); }
创建PlainPermissionLoader对象
public PlainPermissionLoader() { //加载服务端的权限文件plain_acl.yml load(); //开启线程每500ms检测权限文件是否改变,若改变则执行load()从新加载权限文件 watch(); }
查看load方法流程
public void load() { Map<String, PlainAccessResource> plainAccessResourceMap = new HashMap<>(); List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>(); JSONObject plainAclConfData = AclUtils.getYamlDataObject(fileHome + File.separator + fileName, JSONObject.class); if (plainAclConfData == null || plainAclConfData.isEmpty()) { throw new AclException(String.format("%s file is not data", fileHome + File.separator + fileName)); } log.info("Broker plain acl conf data is : ", plainAclConfData.toString()); //获取全局白名单IP集合 JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray("globalWhiteRemoteAddresses"); if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) { for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) { globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory. getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i))); } } //获取账户权限集合 JSONArray accounts = plainAclConfData.getJSONArray("accounts"); if (accounts != null && !accounts.isEmpty()) { List<PlainAccessConfig> plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class); for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) { //构建每个账户的权限资源 PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig); //放入Map中AccessKey作为key,该账户的权限资源作为value plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource); } } this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy; this.plainAccessResourceMap = plainAccessResourceMap; }
加载资源文件,解析其中的权限标识,等待权限校验器PlainAccessValidator调用其validate()对权限校验
核心的校验方法PlainPermissionLoader.validate()
public void validate(PlainAccessResource plainAccessResource) { //全局的白名单IP进行校验 for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) { //匹配成功说明是全局的白名单IP,具有所有权限,直接返回。 if (remoteAddressStrategy.match(plainAccessResource)) { return; } } //判断用户名是否为空,null则抛出AclException异常 if (plainAccessResource.getAccessKey() == null) { throw new AclException(String.format("No accessKey is configured")); } //校验账户是否存在于服务端的权限资源文件中plain_acl.yml,不在则抛出异常 if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) { throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey())); } PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey()); //检查该账户的白名单IP是否匹配上客户端IP,匹配成功具有所有权限,除UPDATE_AND_CREATE_TOPIC等特殊权限需要管理员权限 if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) { return; } //校验签名 String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey()); if (!signature.equals(plainAccessResource.getSignature())) { throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey())); } //校验账户内的资源权限 checkPerm(plainAccessResource, ownedAccess); }
查看其对于当前账户内部的资源校验
void checkPerm(PlainAccessResource needCheckedAccess, PlainAccessResource ownedAccess) { //判断请求的命令的Code是否需要管理员权限,并判断该用户是否是管理员 if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) { throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey())); } Map<String, Byte> needCheckedPermMap = needCheckedAccess.getResourcePermMap(); Map<String, Byte> ownedPermMap = ownedAccess.getResourcePermMap(); if (needCheckedPermMap == null) { // If the needCheckedPermMap is null,then return return; } for (Map.Entry<String, Byte> needCheckedEntry : needCheckedPermMap.entrySet()) { String resource = needCheckedEntry.getKey(); Byte neededPerm = needCheckedEntry.getValue(); //判断是否是group,在构建resourcePermMap时候,group的key=RETRY_GROUP_TOPIC_PREFIX + consumerGroup boolean isGroup = PlainAccessResource.isRetryTopic(resource); //系统的权限配置文件中配置项包不含该客户端命令请求需要的权限 if (!ownedPermMap.containsKey(resource)) { //判断其是否是topic还是group的权限标识,获取该类型的全局的权限是什么 byte ownedPerm = isGroup ? needCheckedAccess.getDefaultGroupPerm() : needCheckedAccess.getDefaultTopicPerm(); //核对权限 if (!Permission.checkPermission(neededPerm, ownedPerm)) { throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup))); } continue; } //系统的权限配置文件中配置项包含该客户端命令请求需要的权限,则直接判断其权限 if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) { throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup))); } } }
所有的检验流程如果有一项不满足则抛出AclException异常
上面图中只是分析了Broker服务端的处理流程,客户端如何调用我们具体分析下我们以发送消息为例:
我们之前分析过Producer的消息发送的核心方法是DefaultMQProducerImpl.sendKernelImpl()该方法
//是否注册了“钩子” if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } //封装其ACL请求的参数信息 this.executeSendMessageHookBefore(context); }
hasSendMessageHook(),我们在构建Producer的时候创建了该对象,加入到DefaultMQProducerImpl的sendMessageHookList属性中。
我们查看其发送消息NettyRemotingClient类中调用AclClientRPCHook.doBeforeRequest()发送前的数据准备
public void doBeforeRequest(String remoteAddr, RemotingCommand request) { byte[] total = AclUtils.combineRequestContent(request, parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken())); String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey()); request.addExtField(SIGNATURE, signature); request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey()); // The SecurityToken value is unneccessary,user can choose this one. if (sessionCredentials.getSecurityToken() != null) { request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken()); } }
关于RocketMQ中怎么实现权限控制就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。