欧美亚洲中文,在线国自产视频,欧洲一区在线观看视频,亚洲综合中文字幕在线观看

      1. <dfn id="rfwes"></dfn>
          <object id="rfwes"></object>
        1. 站長(zhǎng)資訊網(wǎng)
          最全最豐富的資訊網(wǎng)站

          Java 手寫一個(gè)RPC框架

          Java 手寫一個(gè)RPC框架

          RPC框架稱為遠(yuǎn)程調(diào)用框架,其實(shí)現(xiàn)的核心原理就是消費(fèi)者端使用動(dòng)態(tài)代理來(lái)代理一個(gè)接口的方法(基于JDK的動(dòng)態(tài)代理,當(dāng)然如果使用CGLib可以直接使用無(wú)接口類的方法),通過(guò)加入網(wǎng)絡(luò)傳輸編程,傳輸調(diào)用接口方法名稱,方法參數(shù)來(lái)給提供者獲取,再通過(guò)反射,來(lái)執(zhí)行該接口的方法,再將反射執(zhí)行的結(jié)果通過(guò)網(wǎng)絡(luò)編程傳回消費(fèi)者端。

          現(xiàn)在我們來(lái)依次實(shí)現(xiàn)這些概念。這里我們做最簡(jiǎn)單的實(shí)現(xiàn),網(wǎng)絡(luò)編程使用的是BIO,大家可以使用Reactor模式的Netty來(lái)改寫性能更好的方式。而網(wǎng)絡(luò)傳輸中使用的序列化和反序列化也是Java自帶的,當(dāng)然這樣的傳輸字節(jié)比較大,可以使用google的protoBuffer或者kryo來(lái)處理。這里只為了方便說(shuō)明原理。

          pom

          <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">     <modelVersion>4.0.0</modelVersion>      <groupId>com.guanjian</groupId>     <artifactId>rpc-framework</artifactId>     <version>1.0-SNAPSHOT</version>          <build>         <plugins>             <plugin>                 <groupId>org.apache.maven.plugins</groupId>                 <artifactId>maven-compiler-plugin</artifactId>                 <version>3.7.0</version>                 <configuration>                     <source>1.8</source>                     <target>1.8</target>                     <encoding>UTF-8</encoding>                 </configuration>             </plugin>         </plugins>     </build> </project>

          首先當(dāng)然是我們要進(jìn)行遠(yuǎn)程調(diào)用的接口以及接口的方法。

          public interface HelloService {     String sayHello(String content);}

          接口實(shí)現(xiàn)類

          public class HelloServiceImpl implements HelloService {    public String sayHello(String content) {        return "hello," + content;    } }

          消費(fèi)者端的動(dòng)態(tài)代理,如果你是把提供者和消費(fèi)者寫在兩個(gè)工程中,則提供者端需要上面的接口和實(shí)現(xiàn)類,而消費(fèi)者端只需要上面的接口。

          public class ConsumerProxy {     /**      * 消費(fèi)者端的動(dòng)態(tài)代理      * @param interfaceClass 代理的接口類      * @param host 遠(yuǎn)程主機(jī)IP      * @param port 遠(yuǎn)程主機(jī)端口      * @param <T>      * @return      */     @SuppressWarnings("unchecked")     public static <T> T consume(final Class<T> interfaceClass,final String host,final int port) {         return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),                 new Class[]{interfaceClass}, (proxy,method,args) -> {                     //創(chuàng)建一個(gè)客戶端套接字                     Socket socket = new Socket(host, port);                     try {                         //創(chuàng)建一個(gè)對(duì)外傳輸?shù)膶?duì)象流,綁定套接字                         ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());                         try {                             //將動(dòng)態(tài)代理的方法名寫入對(duì)外傳輸?shù)膶?duì)象流中                             output.writeUTF(method.getName());                             //將動(dòng)態(tài)代理的方法的參數(shù)寫入對(duì)外傳輸?shù)膶?duì)象流中                             output.writeObject(args);                             //創(chuàng)建一個(gè)對(duì)內(nèi)傳輸?shù)膶?duì)象流,綁定套接字                             //這里是為了獲取提供者端傳回的結(jié)果                             ObjectInputStream input = new ObjectInputStream(socket.getInputStream());                             try {                                 //從對(duì)內(nèi)傳輸?shù)膶?duì)象流中獲取結(jié)果                                 Object result = input.readObject();                                 if (result instanceof Throwable) {                                     throw (Throwable) result;                                 }                                 return result;                             } finally {                                 input.close();                             }                         } finally {                             output.close();                         }                     } finally {                         socket.close();                     }                 }         );     } }

          有關(guān)JDK動(dòng)態(tài)代理的內(nèi)容可以參考AOP原理與自實(shí)現(xiàn) ,BIO的部分可以參考傳統(tǒng)IO與NIO比較

          提供者端的網(wǎng)絡(luò)傳輸和遠(yuǎn)程方式調(diào)用服務(wù)

          public class ProviderReflect {     private static final ExecutorService executorService = Executors.newCachedThreadPool();      /**      * RPC監(jiān)聽(tīng)和遠(yuǎn)程方法調(diào)用      * @param service RPC遠(yuǎn)程方法調(diào)用的接口實(shí)例      * @param port 監(jiān)聽(tīng)的端口      * @throws Exception      */     public static void provider(final Object service,int port) throws Exception {         //創(chuàng)建服務(wù)端的套接字,綁定端口port         ServerSocket serverSocket = new ServerSocket(port);         while (true) {             //開(kāi)始接收客戶端的消息,并以此創(chuàng)建套接字             final Socket socket = serverSocket.accept();             //多線程執(zhí)行,這里的問(wèn)題是連接數(shù)過(guò)大,線程池的線程數(shù)會(huì)耗盡             executorService.execute(() -> {                 try {                     //創(chuàng)建呢一個(gè)對(duì)內(nèi)傳輸?shù)膶?duì)象流,并綁定套接字                     ObjectInputStream input = new ObjectInputStream(socket.getInputStream());                     try {                         try {                             //從對(duì)象流中讀取接口方法的方法名                             String methodName = input.readUTF();                             //從對(duì)象流中讀取接口方法的所有參數(shù)                             Object[] args = (Object[]) input.readObject();                             Class[] argsTypes = new Class[args.length];                             for (int i = 0;i < args.length;i++) {                                 argsTypes[i] = args[i].getClass();                              }                             //創(chuàng)建一個(gè)對(duì)外傳輸?shù)膶?duì)象流,并綁定套接字                             //這里是為了將反射執(zhí)行結(jié)果傳遞回消費(fèi)者端                             ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());                             try {                                 Class<?>[] interfaces = service.getClass().getInterfaces();                                 Method method = null;                                 for (int i = 0;i < interfaces.length;i++) {                                     method = interfaces[i].getDeclaredMethod(methodName,argsTypes);                                     if (method != null) {                                         break;                                     }                                 }                                 Object result = method.invoke(service, args);                                 //將反射執(zhí)行結(jié)果寫入對(duì)外傳輸?shù)膶?duì)象流中                                 output.writeObject(result);                             } catch (Throwable t) {                                 output.writeObject(t);                             } finally {                                 output.close();                             }                         } catch (Exception e) {                             e.printStackTrace();                         } finally {                             input.close();                         }                     } finally {                         socket.close();                     }                 } catch (Exception e) {                     e.printStackTrace();                 }             });         }     } }

          啟動(dòng)提供者端的網(wǎng)絡(luò)偵聽(tīng)和遠(yuǎn)程調(diào)用

          public class RPCProviderMain {     public static void main(String[] args) throws Exception {         HelloService service = new HelloServiceImpl();         ProviderReflect.provider(service,8083);     } }

          啟動(dòng)消費(fèi)者的動(dòng)態(tài)代理調(diào)用

          public class RPCConsumerMain {     public static void main(String[] args) throws InterruptedException {         HelloService service = ConsumerProxy.consume(HelloService.class,"127.0.0.1",8083);         for (int i = 0;i < 1000;i++) {             String hello = service.sayHello("你好_" + i);             System.out.println(hello);             Thread.sleep(1000);         }     } }

          運(yùn)行結(jié)果

          hello,你好_0
          hello,你好_1
          hello,你好_2
          hello,你好_3
          hello,你好_4
          hello,你好_5

          …..

          如果你要擴(kuò)展成一個(gè)Netty+ProtoBuffer的高性能RPC框架可以參考Netty整合Protobuffer 的相關(guān)寫法。

          推薦教程:《PHP》

          贊(0)
          分享到: 更多 (0)
          網(wǎng)站地圖   滬ICP備18035694號(hào)-2    滬公網(wǎng)安備31011702889846號(hào)