五層協(xié)議中,RPC在第幾層?
五層協(xié)議 |
---|
應(yīng)用層 |
傳輸層 |
網(wǎng)絡(luò)層 |
鏈路層 |
物理層 |
我不知道,我要去大氣層!
遠程過程調(diào)用(RPC),比較樸素的說法就是,從某臺機器調(diào)用另一臺機器的一段代碼,并獲取返回結(jié)果。
這之前的一個基層問題就是進程間通信方式(IPC),從是否設(shè)計網(wǎng)絡(luò)通信分為:
- 基于信號量和共享內(nèi)存實現(xiàn)的管道和消息隊列和其本身(不涉及IP端口)
- Socket(IP端口)
和共享內(nèi)存不同,Socket實現(xiàn)不并不是只依靠內(nèi)存屏障,它還額外需要物理/虛擬網(wǎng)卡設(shè)備。
關(guān)于網(wǎng)卡,只需要知道網(wǎng)卡可以幫助我們從網(wǎng)絡(luò)中讀寫信息,這也是RPC的基礎(chǔ)。
jRPC實現(xiàn)
遠程過程調(diào)用,不如先來研究調(diào)用。
回聲服務(wù)實現(xiàn)
先來一段普通的代碼。
public class EchoService {
public static EchoResponse echo(EchoRequest req) throws Exception {
return new EchoResponse("echo:" + req.content);
}
public static void main(String[] args) throws Exception {
System.out.println(EchoService.echo(new EchoRequest("ping")).content); // echo:ping
}
}
class EchoRequest {
String content;
public EchoRequest(String content) {
this.content = content;
}
}
class EchoResponse {
String content;
public EchoResponse() {
}
public EchoResponse(String content) {
this.content = content;
}
}
回聲服務(wù)對傳入?yún)?shù)直接返回,就像你在山谷中的回聲一樣。
現(xiàn)在如果使用遠程傳輸,我們需要給網(wǎng)卡注冊自己的IP和端口,以便和服務(wù)端建立連接。連接建立后,我們還需要確定數(shù)據(jù)如何傳輸。
服務(wù)端實現(xiàn)
為了樸素性,我們假設(shè)只有10臺機器和我們進行連接。
public Runnable apply(Integer port) {
return () -> {
try {
try (ServerSocket serverSocket = new ServerSocket(port)) {
for (;;) {
Socket clientSocket = serverSocket.accept();
new Thread(() -> {
// 數(shù)據(jù)如何傳輸
}).start();
}
}
} catch (Exception e) {
e.printStackTrace();
}
};
}
根據(jù)Socket的文檔,我們可以很快迭代出一臺服務(wù)器應(yīng)該如何與他的客戶端連接。對于每個客戶端,我們提供了獨立的線程支持兩臺機器間的長連接。
試想一下,此時的長連接如果是百萬甚至千萬,為每個連接分配一個線程不可取,有什么好辦法可以支持到呢?這個問題這里不解了,有興趣自行研究下。
Serializable
一說起序列化,最怕異口同聲json
。
使用json
就難免會使用到 第三方庫
,如果沒有必要,并不希望引入。除了json
外,java其實本身就有Serializable實現(xiàn),他和synchronized一樣,java官方提供并維護。
public class EchoService {
public static EchoResponse echo(EchoRequest req) throws Exception {
throw new UnsupportedOperationException();
}
}
class EchoRequest implements Serializable {
String content;
public EchoRequest(String content) {
this.content = content;
}
}
class EchoResponse implements Serializable {
String content;
public EchoResponse() {
}
public EchoResponse(String content) {
this.content = content;
}
}
除了參數(shù)外,一個rpc需要知道,ip、端口、服務(wù)名、方法名。
ip和端口在調(diào)用時應(yīng)該已經(jīng)知道,為此還需要支持一個header來完成服務(wù)名和方法名的指定。
class Header implements Serializable {
String stub;
String method;
public Header(String stub, String method) {
this.stub = stub;
this.method = method;
}
}
通過編碼解碼器對Serializable的數(shù)據(jù)編碼和解碼。
public class Codec {
Socket clientSocket;
ObjectInputStream objectInputStream;
ObjectOutputStream objectOutputStream;
public Codec(Socket clientSocket)
throws Exception {
this.clientSocket = clientSocket;
this.objectOutputStream = new ObjectOutputStream(clientSocket.getOutputStream());
this.objectInputStream = new ObjectInputStream(clientSocket.getInputStream());
}
public Header header() throws Exception {
return (Header) this.objectInputStream.readObject();
}
public Object read() throws Exception {
return this.objectInputStream.readObject();
}
public void write(Header header, Object obj) throws Exception {
this.objectOutputStream.writeObject(header);
this.objectOutputStream.writeObject(obj);
}
}
回到服務(wù)端,將空缺的地方通過反射補全。
Codec codec = new Codec(clientSocket);
for (;;) {
Header header = codec.header();
Class<?> stub = Class.forName(header.stub);
Map<String, Method> methods = Arrays.asList(stub.getDeclaredMethods()).stream()
.collect(Collectors.toMap(t -> t.getName(), t -> t));
Method method = methods.get(header.method);
codec.write(header, method.invoke(null, header, codec.read()));
}
通過codec解碼stub和method來找到對應(yīng)的方法,調(diào)用對應(yīng)方法,獲取結(jié)果后再通過編碼返回客戶端。
高性能客戶端
想一下,如果一個客戶端發(fā)送了10個請求,其中第2個由于種種原因被阻塞掉,后面的請求會被卡在阻塞的請求之后而無法獲得響應(yīng)。
簡單的處理方法,就是抽象掉調(diào)用過程,并給其唯一標識。需要一個map來存全部的調(diào)用請求。
class Call {
Long seq;
Object req;
Object rsp;
Thread thread;
public Call(Long seq, Object req) {
this.seq = seq;
this.req = req;
}
}
對call抽象后,對client也就迎刃而解了。
我知道了,map,用map解。
Long seq;
Codec codec;
ReentrantLock clock;
Map<Long, Call> calls;
ReentrantLock metux;
在map之上提供對seq的操作。
Call register(Call call) {
try {
clock.lock();
call.seq = seq;
calls.put(seq, call);
seq++;
return call;
} finally {
clock.unlock();
}
}
Call remove(Call call) {
try {
clock.lock();
call.seq = seq;
calls.remove(seq);
return call;
} finally {
clock.unlock();
}
}
對服務(wù)端的響應(yīng)監(jiān)聽,喚醒阻塞的線程。
void receive() throws Exception {
for (;;) {
Header header = codec.header();
Call call = calls.remove(header.seq);
Object rsp = codec.read();
call.rsp = rsp;
LockSupport.unpark(call.thread);
}
}
最后就是發(fā)起客戶端調(diào)用的代碼。
FutureTask<Object> start(Header header, Object req) throws Exception {
Call call = new Call(seq, req);
try {
metux.lock();
final Call fcall = register(call);
header.seq = call.seq;
codec.write(header, req);
FutureTask<Object> task = new FutureTask<>(() -> {
fcall.thread = Thread.currentThread();
LockSupport.park();
return fcall.rsp;
});
task.run();
return task;
} finally {
metux.unlock();
}
}
你好,世界
public static void main(String[] args) throws UnknownHostException, IOException, Exception {
new Thread(new Server().apply(8080)).start(); // 服務(wù)端啟動
// 模擬調(diào)用
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
Client client = new Client(new Codec(new Socket("127.0.0.1", 8080)));
for (int i = 0; i < 100; i++) {
newFixedThreadPool.submit(() -> {
try {
FutureTask<Object> call = client.start(
new Header("EchoService", "echo"),
new EchoRequest("~hello"));
EchoResponse rsp = (EchoResponse) call.get();
System.out.println(rsp.content);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
Output
RPC echo~hello 0
RPC echo~hello 1
RPC echo~hello 2
RPC echo~hello 3
RPC echo~hello 4
RPC echo~hello 6
RPC echo~hello 5
RPC echo~hello 7
RPC echo~hello 9
RPC echo~hello 8
至此,只是實現(xiàn)了rpc的通信過程,完成度比較高。
- 針對大流量的服務(wù)端還有優(yōu)化空間,比如NIO的使用來管理長連接會更加有效。
- 沒有實現(xiàn)注冊中心。