激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務(wù)器之家 - 編程語言 - Java教程 - java實現(xiàn)樸素rpc

java實現(xiàn)樸素rpc

2023-10-16 07:02未知服務(wù)器之家 Java教程

遠程過程調(diào)用(RPC),比較樸素的說法就是,從某臺機器調(diào)用另一臺機器的一段代碼,并獲取返回結(jié)果。實現(xiàn)了rpc的通信過程,完成度比較高。針對大流量的服務(wù)端還有優(yōu)化空間,比如NIO的使用來管理長連接會更加有效。 五層協(xié)議

遠程過程調(diào)用(RPC),比較樸素的說法就是,從某臺機器調(diào)用另一臺機器的一段代碼,并獲取返回結(jié)果。 實現(xiàn)了rpc的通信過程,完成度比較高。 針對大流量的服務(wù)端還有優(yōu)化空間,比如NIO的使用來管理長連接會更加有效。

五層協(xié)議中,RPC在第幾層?

五層協(xié)議
應(yīng)用層
傳輸層
網(wǎng)絡(luò)層
鏈路層
物理層

我不知道,我要去大氣層!


遠程過程調(diào)用(RPC),比較樸素的說法就是,從某臺機器調(diào)用另一臺機器的一段代碼,并獲取返回結(jié)果。

這之前的一個基層問題就是進程間通信方式(IPC),從是否設(shè)計網(wǎng)絡(luò)通信分為:

  • 基于信號量和共享內(nèi)存實現(xiàn)的管道和消息隊列和其本身(不涉及IP端口)
  • Socket(IP端口)

和共享內(nèi)存不同,Socket實現(xiàn)不并不是只依靠內(nèi)存屏障,它還額外需要物理/虛擬網(wǎng)卡設(shè)備。

關(guān)于網(wǎng)卡,只需要知道網(wǎng)卡可以幫助我們從網(wǎng)絡(luò)中讀寫信息,這也是RPC的基礎(chǔ)。

jRPC實現(xiàn)

遠程過程調(diào)用,不如先來研究調(diào)用。

回聲服務(wù)實現(xiàn)

先來一段普通的代碼。

public class EchoService {

	public static EchoResponse echo(EchoRequest req) throws Exception {
		return new EchoResponse("echo:" + req.content);
	}

	public static void main(String[] args) throws Exception {
		System.out.println(EchoService.echo(new EchoRequest("ping")).content); // echo:ping
	}
}

class EchoRequest {
	String content;

	public EchoRequest(String content) {
		this.content = content;
	}
}

class EchoResponse {
	String content;

	public EchoResponse() {
	}

	public EchoResponse(String content) {
		this.content = content;
	}
}

回聲服務(wù)對傳入?yún)?shù)直接返回,就像你在山谷中的回聲一樣。

現(xiàn)在如果使用遠程傳輸,我們需要給網(wǎng)卡注冊自己的IP和端口,以便和服務(wù)端建立連接。連接建立后,我們還需要確定數(shù)據(jù)如何傳輸。

服務(wù)端實現(xiàn)

為了樸素性,我們假設(shè)只有10臺機器和我們進行連接。

public Runnable apply(Integer port) {
	return () -> {
		try {
			try (ServerSocket serverSocket = new ServerSocket(port)) {
				for (;;) {
					Socket clientSocket = serverSocket.accept();
					new Thread(() -> {
						// 數(shù)據(jù)如何傳輸
					}).start();
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	};
}

根據(jù)Socket的文檔,我們可以很快迭代出一臺服務(wù)器應(yīng)該如何與他的客戶端連接。對于每個客戶端,我們提供了獨立的線程支持兩臺機器間的長連接。

試想一下,此時的長連接如果是百萬甚至千萬,為每個連接分配一個線程不可取,有什么好辦法可以支持到呢?這個問題這里不解了,有興趣自行研究下。

Serializable

一說起序列化,最怕異口同聲json

使用json就難免會使用到 第三方庫,如果沒有必要,并不希望引入。除了json外,java其實本身就有Serializable實現(xiàn),他和synchronized一樣,java官方提供并維護。

public class EchoService {

	public static EchoResponse echo(EchoRequest req) throws Exception {
		throw new UnsupportedOperationException();
	}
}

class EchoRequest implements Serializable {
	String content;

	public EchoRequest(String content) {
		this.content = content;
	}
}

class EchoResponse implements Serializable {
	String content;

	public EchoResponse() {
	}

	public EchoResponse(String content) {
		this.content = content;
	}
}

除了參數(shù)外,一個rpc需要知道,ip、端口、服務(wù)名、方法名。

ip和端口在調(diào)用時應(yīng)該已經(jīng)知道,為此還需要支持一個header來完成服務(wù)名和方法名的指定。

class Header implements Serializable {
	String stub;
	String method;

	public Header(String stub, String method) {
		this.stub = stub;
		this.method = method;
	}
}

通過編碼解碼器對Serializable的數(shù)據(jù)編碼和解碼。

public class Codec {
	Socket clientSocket;
	ObjectInputStream objectInputStream;
	ObjectOutputStream objectOutputStream;

	public Codec(Socket clientSocket)
		throws Exception {
		this.clientSocket = clientSocket;
		this.objectOutputStream = new ObjectOutputStream(clientSocket.getOutputStream());
		this.objectInputStream = new ObjectInputStream(clientSocket.getInputStream());
	}

	public Header header() throws Exception {
		return (Header) this.objectInputStream.readObject();
	}

	public Object read() throws Exception {
		return this.objectInputStream.readObject();
	}

	public void write(Header header, Object obj) throws Exception {
		this.objectOutputStream.writeObject(header);
		this.objectOutputStream.writeObject(obj);
	}
}

回到服務(wù)端,將空缺的地方通過反射補全。

Codec codec = new Codec(clientSocket);
for (;;) {
	Header header = codec.header();
	Class<?> stub = Class.forName(header.stub);
	Map<String, Method> methods = Arrays.asList(stub.getDeclaredMethods()).stream()
		.collect(Collectors.toMap(t -> t.getName(), t -> t));
	Method method = methods.get(header.method);
	codec.write(header, method.invoke(null, header, codec.read()));
}

通過codec解碼stub和method來找到對應(yīng)的方法,調(diào)用對應(yīng)方法,獲取結(jié)果后再通過編碼返回客戶端。

高性能客戶端

想一下,如果一個客戶端發(fā)送了10個請求,其中第2個由于種種原因被阻塞掉,后面的請求會被卡在阻塞的請求之后而無法獲得響應(yīng)。

簡單的處理方法,就是抽象掉調(diào)用過程,并給其唯一標識。需要一個map來存全部的調(diào)用請求。

class Call {
    Long seq;
    Object req;
    Object rsp;
    Thread thread;

    public Call(Long seq, Object req) {
        this.seq = seq;
        this.req = req;
    }
}

對call抽象后,對client也就迎刃而解了。

我知道了,map,用map解。

Long seq;
Codec codec;
ReentrantLock clock;
Map<Long, Call> calls;
ReentrantLock metux;

在map之上提供對seq的操作。

Call register(Call call) {
	try {
		clock.lock();
		call.seq = seq;
		calls.put(seq, call);
		seq++;
		return call;
	} finally {
		clock.unlock();
	}
}

Call remove(Call call) {
	try {
		clock.lock();
		call.seq = seq;
		calls.remove(seq);
		return call;
	} finally {
		clock.unlock();
	}
}

對服務(wù)端的響應(yīng)監(jiān)聽,喚醒阻塞的線程。

void receive() throws Exception {
	for (;;) {
		Header header = codec.header();
		Call call = calls.remove(header.seq);
		Object rsp = codec.read();
		call.rsp = rsp;
		LockSupport.unpark(call.thread);
	}
}

最后就是發(fā)起客戶端調(diào)用的代碼。

FutureTask<Object> start(Header header, Object req) throws Exception {
	Call call = new Call(seq, req);
	try {
		metux.lock();
		final Call fcall = register(call);
		header.seq = call.seq;
		codec.write(header, req);
		FutureTask<Object> task = new FutureTask<>(() -> {
			fcall.thread = Thread.currentThread();
			LockSupport.park();
			return fcall.rsp;
		});
		task.run();
		return task;
	} finally {
		metux.unlock();
	}
}

你好,世界

public static void main(String[] args) throws UnknownHostException, IOException, Exception {
	new Thread(new Server().apply(8080)).start(); // 服務(wù)端啟動
	// 模擬調(diào)用
	ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
	Client client = new Client(new Codec(new Socket("127.0.0.1", 8080)));
	for (int i = 0; i < 100; i++) {
		newFixedThreadPool.submit(() -> {
			try {
				FutureTask<Object> call = client.start(
					new Header("EchoService", "echo"),
					new EchoRequest("~hello"));
				EchoResponse rsp = (EchoResponse) call.get();
				System.out.println(rsp.content);
			} catch (Exception e) {
				e.printStackTrace();
			}
		});
	}
}

Output

RPC echo~hello 0
RPC echo~hello 1
RPC echo~hello 2
RPC echo~hello 3
RPC echo~hello 4
RPC echo~hello 6
RPC echo~hello 5
RPC echo~hello 7
RPC echo~hello 9
RPC echo~hello 8

至此,只是實現(xiàn)了rpc的通信過程,完成度比較高。

  • 針對大流量的服務(wù)端還有優(yōu)化空間,比如NIO的使用來管理長連接會更加有效。
  • 沒有實現(xiàn)注冊中心。

延伸 · 閱讀

精彩推薦
Weibo Article 1 Weibo Article 2 Weibo Article 3 Weibo Article 4 Weibo Article 5 Weibo Article 6 Weibo Article 7 Weibo Article 8 Weibo Article 9 Weibo Article 10 Weibo Article 11 Weibo Article 12 Weibo Article 13 Weibo Article 14 Weibo Article 15 Weibo Article 16 Weibo Article 17 Weibo Article 18 Weibo Article 19 Weibo Article 20 Weibo Article 21 Weibo Article 22 Weibo Article 23 Weibo Article 24 Weibo Article 25
主站蜘蛛池模板: 91久久国产综合久久91猫猫 | 一级黄色免费观看视频 | 成人毛片在线免费看 | 男女视频免费看 | 精品二区在线观看 | 欧美大穴 | 久久精品com | 国产一区二区视频精品 | 亚洲αv| 国产日产精品一区二区三区四区 | 欧美日韩在线看片 | 国产免费视频在线 | 龙床上的呻吟高h | 欧美自拍三区 | 久久在线| 免费黄网站在线播放 | 欧美巨乳在线观看 | 72pao成人国产永久免费视频 | 欧美一级免费视频 | 视频一区二区中文字幕 | 91成人亚洲 | 色婷婷久久久亚洲一区二区三区 | 99这里精品 | 黄色免费入口 | 免费视频a | 日本成人一二三区 | 看片91 | 4399一级成人毛片 | 精品国产精品久久 | 成人精品aaaa网站 | 久久国产精品久久久久久久久久 | 精品久久999 | 国产亚洲欧美一区久久久在 | 久久亚洲第一 | 一级网站 | 中文字幕在线视频网站 | 日产精品久久久一区二区福利 | 免费a级毛片大学生免费观看 | 刘亦菲一区二区三区免费看 | 特级毛片a级毛片100免费 | 一级黄色影院 |