基于akka怎样实现RPC

发布时间:2021-11-15 23:45:21 作者:柒染
来源:亿速云 阅读:234

这期内容当中小编将会给大家带来有关基于akka怎样实现RPC,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

目前的工作在基于akka实现数据服务总线,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很简单的写出一个大型的分布式集群的架构。里面的一块功能就是RPC(远程过程调用),这篇文章将会介绍一种实现方式。
akka rpc java
目录[-]
akka-rpc(基于akka的rpc的实现)
RPC
实现原理
Server端核心代码
Client端核心代码 
Demo
akka-rpc(基于akka的rpc的实现)

代码:http://git.oschina.net/for-1988/Simples

目前的工作在基于akka(java)实现数据服务总线,Akka 2.3中提供了 Cluster Sharing(分片集群)和Persistence功能可以很简单的写出一个大型的分布式集群的架构。里面的一块功能就是RPC(远程过程调用)。

RPC

远程过程调用(Remote Procedure Call,RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用,例:Java RMI。

实现原理

整个RPC的调用过程完全基于akka来传递对象,因为需要进行网络通信,所以我们的接口实现类、调用参数以及返回值都需要实现java序列化接口。客户端跟服务端其实都是在一个Akka 集群关系中,Client跟Server都是集群中的一个节点。首先Client需要初始化RpcClient对象,在初始化的过程中,我们启动了AkkaSystem,加入到整个集群中,并创建了负责与Server进行通信的Actor。然后通过RpcClient中的getBean(Class<T> clz)方法获取Server端的接口实现类的实例对象,然后通过动态代理拦截这个对象的所有方法。最后,在执行方法的时候,在RpcBeanProxy中向Server发送CallMethod事件,执行远程实现类的方法,获取返回值给Client。

Server端核心代码

public class RpcServer extends UntypedActor {
         private Map<String, Object> proxyBeans;

    public RpcServer(Map<Class<?>, Object> beans) {
        proxyBeans = new HashMap<String, Object>();
        for (Iterator<Class<?>> iterator = beans.keySet().iterator(); iterator
                .hasNext();) {
            Class<?> inface = iterator.next();
            proxyBeans.put(inface.getName(), beans.get(inface));
        }
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof RpcEvent.CallBean) {   //返回Server端的接口实现类的实例
            CallBean event = (CallBean) message;
            ReturnBean bean = new ReturnBean(
                    proxyBeans.get(event.getBeanName()), getSelf());
            getSender().tell(bean, getSelf());
        } else if (message instanceof RpcEvent.CallMethod) {
            CallMethod event = (CallMethod) message;
            Object bean = proxyBeans.get(event.getBeanName());
            Object[] params = event.getParams();
            List<Class<?>> paraTypes = new ArrayList<Class<?>>();
            Class<?>[] paramerTypes = new Class<?>[] {};
            if (params != null) {
                for (Object param : params) {
                    paraTypes.add(param.getClass());
                }
            }
            Method method = bean.getClass().getMethod(event.getMethodName(),
                    paraTypes.toArray(paramerTypes));
            Object o = method.invoke(bean, params);
            getSender().tell(o, getSelf());
        }
    }

}
启动Server

public static void main(String[] args) {
        final Config config = ConfigFactory
                .parseString("akka.remote.netty.tcp.port=" + 2551)
                .withFallback(
                        ConfigFactory
                                .parseString("akka.cluster.roles = [RpcServer]"))
                .withFallback(ConfigFactory.load());

        ActorSystem system = ActorSystem.create("EsbSystem", config);
        
        // Server 加入发布的服务
        Map<Class<?>, Object> beans = new HashMap<Class<?>, Object>();
        beans.put(ExampleInterface.class, new ExampleInterfaceImpl());
        system.actorOf(Props.create(RpcServer.class, beans), "rpcServer");
    }
Client端核心代码 

RpcClient类型集成了Thread,为了解决一个问题:因为AkkaSystem在加入集群中的时候是异步的,所以我们在第一次new RpcClient对象的时候需要等待加入集群成功以后,才可以执行下面的方法,不然获取的 /user/rpcServer Route中没有Server的Actor,请求会失败。

public class RpcClient extends Thread {

    private ActorSystem system;

    private ActorRef rpc;

    private ActorRef clientServer;

    private static RpcClient instance = null;

    public RpcClient() {
        this.start();
        final Config config = ConfigFactory
                .parseString("akka.remote.netty.tcp.port=" + 2552)
                .withFallback(
                        ConfigFactory
                                .parseString("akka.cluster.roles = [RpcClient]"))
                .withFallback(ConfigFactory.load());
        system = ActorSystem.create("EsbSystem", config);

        int totalInstances = 100;
        Iterable<String> routeesPaths = Arrays.asList("/user/rpcServer");
        boolean allowLocalRoutees = false;
        ClusterRouterGroup clusterRouterGroup = new ClusterRouterGroup(
                new AdaptiveLoadBalancingGroup(
                        HeapMetricsSelector.getInstance(),
                        Collections.<String> emptyList()),
                new ClusterRouterGroupSettings(totalInstances, routeesPaths,
                        allowLocalRoutees, "RpcServer"));
        rpc = system.actorOf(clusterRouterGroup.props(), "rpcCall");
        clientServer = system.actorOf(Props.create(RpcClientServer.class, rpc),
                "client");
        Cluster.get(system).registerOnMemberUp(new Runnable() {  //加入集群成功后的回调事件,恢复当前线程的中断
            @Override
            public void run() {
                synchronized (instance) {
                    System.out.println("notify");
                    instance.notify();
                }
            }
        });

    }

    public static RpcClient getInstance() {
        if (instance == null) {
            instance = new RpcClient();
            synchronized (instance) {
                try {   //中断当前线程,等待加入集群成功后,恢复
                    System.out.println("wait");
                    instance.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        return instance;
    }

    public <T> T getBean(Class<T> clz) {
        Future<Object> future = Patterns.ask(clientServer,
                new RpcEvent.CallBean(clz.getName(), clientServer),
                new Timeout(Duration.create(5, TimeUnit.SECONDS)));
        try {
            Object o = Await.result(future,
                    Duration.create(5, TimeUnit.SECONDS));
            if (o != null) {
                ReturnBean returnBean = (ReturnBean) o;
                return (T) new RpcBeanProxy().proxy(returnBean.getObj(),
                        clientServer, clz);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}
RpcClientServer

public class RpcClientServer extends UntypedActor {

    private ActorRef rpc;

    public RpcClientServer(ActorRef rpc) {
        this.rpc = rpc;
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof RpcEvent.CallBean) {  //向Server发送CallBean请求
            CallBean event = (CallBean) message;
            Future<Object> future = Patterns.ask(rpc, event, new Timeout(
                    Duration.create(5, TimeUnit.SECONDS)));
            Object o = Await.result(future,
                    Duration.create(5, TimeUnit.SECONDS));
            getSender().tell(o, getSelf());
        } else if (message instanceof RpcEvent.CallMethod) {  //向Server发送方法调用请求
            Future<Object> future = Patterns.ask(rpc, message, new Timeout(
                    Duration.create(5, TimeUnit.SECONDS)));
            Object o = Await.result(future,
                    Duration.create(5, TimeUnit.SECONDS));
            getSender().tell(o, getSelf());
        }
    }
}
RpcBeanProxy,客户端的动态代理类

public class RpcBeanProxy implements InvocationHandler {

    private ActorRef rpcClientServer;

    private Class<?> clz;

    public Object proxy(Object target, ActorRef rpcClientServer, Class<?> clz) {
        this.rpcClientServer = rpcClientServer;
        this.clz = clz;
        return Proxy.newProxyInstance(target.getClass().getClassLoader(),
                target.getClass().getInterfaces(), this);
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args)
            throws Throwable {
        Object result = null;
        RpcEvent.CallMethod callMethod = new RpcEvent.CallMethod(
                method.getName(), args, clz.getName());
        Future<Object> future = Patterns.ask(rpcClientServer, callMethod,
                new Timeout(Duration.create(5, TimeUnit.SECONDS)));
        Object o = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
        result = o;
        return result;
    }

}
Demo

Interface,Client和Server都需要这个类,必须实现序列化

public interface ExampleInterface extends Serializable{
    public String sayHello(String name);
}
实现类,只需要Server端存在这个类。

public class ExampleInterfaceImpl implements ExampleInterface {
    @Override
    public String sayHello(String name) {
        System.out.println("Be Called !");
        return "Hello " + name;
    }
}
Client调用

public static void main(String[] args) {
        RpcClient client = RpcClient.getInstance();
        long start = System.currentTimeMillis();
        
        ExampleInterface example = client.getBean(ExampleInterface.class);
        System.out.println(example.sayHello("rpc"));
        
        long time = System.currentTimeMillis() - start;
        System.out.println("time :" + time);
    }
 


这里第一次调用耗时比较长需要46毫秒,akka会对消息进行优化,调用多次以后时间为 1~2毫秒。

上述就是小编为大家分享的基于akka怎样实现RPC了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。

推荐阅读:
  1. Akka Actor模拟实现YARN
  2. 如何开始使用 Akka

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

akka rpc

上一篇:Spark的HashPartitioner方式的Python实现是这样的

下一篇:spark-shell如何实现PageRank

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》