Remote Process Call

之前见到的RPC都是Remote Procedure Call, 比如Java的RMI(Remote Method Invocation)。 而最近的项目使用了Remote Process Call/远程程序调用, 也就是用主机A调用主机B上的程序,并返回执行结果。 这有些类似SSH的功能,只不过没有SSH的加密功能。 今天分析的都是这种RPC。

单一主机内的IPC(Inter-Process Communication)常常利用PIPE,FIFO机制实现。 而跨主机IPC都是通过网络来进行的。 因此,RPC作为IPC的一种应用,常常利用Socket机制来实现。 主机A上的请求进程使用Socket发送请求, 主机B上的服务进程使用Socket接收请求,执行请求,并使用Socket返回请求结果。

作为两个远程进程之间的通信,他们必需商定一种协议相互配合完成RPC。 这个协议就是对请求的封装,封装成双方都认识的数据结构,从而能了解对方的意图。 一个对远程程序调用的请求可以用以下数据结构简单表示:

1
2
3
4
struct RemoteProcessCall {
	char cmd[256];       // 命令字符串,带参数
	char stdout[10240];  // 命令STDOUT输出
};

这几乎是一个最简单的RPC数据结构, 没有STDIN,STDOUT,STDERR部份信息,命令和参数也合在了一起,并有一个最大长度上限。 请求进程只需把需要执行的命令填入cmd部份, 服务进程在其主机上执行这个命令,并获得程序的STDOUT填入stdout数组,返回即可。 请求结构和返回结构不一定使用同一个数据结构。 可以使用两个数据结构,一个表示请求,一个表示返回,防止冗余,并映射到使用同一个ID号即可。

有了PRC数据结构, 下面考虑怎样在两个主机上发送和接收。 首先定义一个头文件包含使用的公共数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
#ifndef _RPC_H_
#define _PRC_H_

struct RemoteProcessCall {
  char cmd[256];
  char stdout[10240];
};

#define RPCHOST "127.0.0.1"
#define RPCPORT 8081

#endif

然后考虑较简单的请求进程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include "rpc.h"

int main(int argc, char *argv[])
{
  int rpc_fd;
  struct sockaddr_in rpc_addr;

  struct RemoteProcessCall rpc;
  ssize_t nbytes;
  ssize_t recvd_nbytes;
  
  memset(&rpc_addr, 0, sizeof(rpc_addr));
  rpc_addr.sin_family = AF_INET;
  rpc_addr.sin_addr.s_addr = inet_addr(RPCHOST);
  rpc_addr.sin_port = htons(RPCPORT);

  if((rpc_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
    perror("create socket error!");
    return 1;
  }

  if(connect(rpc_fd, (struct sockaddr*)&rpc_addr, sizeof(struct sockaddr)) < 0)   {
    perror("connect error!");
    return 2;
  }

  // send rpc call
  memcpy(rpc.cmd, argv[1], strlen(argv[1])+1);
  if((nbytes = send(rpc_fd, &rpc, sizeof(rpc), 0)) < sizeof(rpc)) {
    perror("send error!");
    return 3;
  }
  
  // recieve rpc call
  recvd_nbytes = 0;
  while((nbytes=recv(rpc_fd, (char*)&rpc+recvd_nbytes, sizeof(rpc), 0)) > 0) {
    recvd_nbytes += nbytes;
    if (recvd_nbytes == sizeof(rpc)) break;
  }
  if (recvd_nbytes != sizeof(rpc)) {
    perror("recv call bad!");
    close(rpc_fd);
    return 4;
  }

  // output result
  printf("%s\n", rpc.stdout);

  close(rpc_fd);
  return 0;
}

基本就是单一进程socket访问的流程,创建socket,connect连接服务端, send发送构造好的数据包, recv阻塞等待调用完成,然后输出到终端。

服务端复杂一些,首先创建服务端Socket侦听程序, 一旦有请求过来,fork一个子进程进入run_rpc处理RPC请求。 子进程使用popen系统调用执行命令,这样可以获取程序的输出,但是不能获取返回值。 其它执行命令的方案可以是system(),可以获取程序的返回值但不能获取输出。 或者是再fork()子进程并exec()去执行命令,这样可以进行更详细的控制。 popen执行命令之后,获取输出填入rpc call并发回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/wait.h>
#include "rpc.h"

void run_rpc(int fd) {
  struct RemoteProcessCall call;
  FILE* clfd;
  ssize_t nbytes;
  int recvd_nbytes;

  recvd_nbytes = 0;
  while((nbytes=recv(fd, (char*)&call+recvd_nbytes, sizeof(call), 0)) > 0) {
    recvd_nbytes += nbytes;
    if (recvd_nbytes == sizeof(call)) break;
  }
  if (recvd_nbytes != sizeof(call)) {
    perror("recv call bad!");
    close(fd);
    exit(-1);    
  }
  printf("recv command: %s\n", call.cmd);

  clfd = popen(call.cmd, "r");
  if((nbytes=fread(call.stdout, 1, sizeof(call.stdout), clfd)) < 0) {
    perror("get stdout error!");
    exit(-1);
  }
  pclose(clfd);
  printf("run command over,  stdout size: %d\n", nbytes); // TODO nbytes

  if((nbytes=send(fd, &call, sizeof(call), 0)) != sizeof(call)) {
    perror("send call bad!");
    close(fd);
    exit(-1);
  }
  printf("send result: %s\n", call.cmd);

  close(fd);
  exit(0);
}

void sig_child(int signo)
{
    int status;
    wait(&status);
    //printf("process exit status %d\n", WEXITSTATUS(status));
}

int main(int argc, char *argv[])
{
  int server_fd, client_fd;
  struct sockaddr_in server_addr;
  int pid;

  if((server_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
    perror("create socket error!");
    return 1;
  }
  
  server_addr.sin_family = AF_INET;
  server_addr.sin_port=htons(RPCPORT);
  server_addr.sin_addr.s_addr = INADDR_ANY;
  bzero(&(server_addr.sin_zero), 8);

  if(bind(server_fd, (struct sockaddr*)&server_addr, sizeof(struct sockaddr)) < 0) {
    perror("bind error!");
    return 2;
  }

  if(listen(server_fd, 10) < 0) {
    perror("listen error!");
    return 3;
  }
  
  signal(SIGCHLD, sig_child);
  for(;;) {
    if((client_fd = accept(server_fd, NULL, NULL)) < 0) {
      perror("accept error!");
      continue;
    }
    
    if((pid = fork()) < 0) {
      perror("fork error!");
      close(client_fd);
      close(server_fd);
      return 4;
    }
    if(pid == 0) {    // child
      close(server_fd);
      run_rpc(client_fd);
    }
    // father
    close(client_fd);
  }
  return 0;
}

执行的效果如下,服务端:

1
2
3
4
5
6
7
$./server                        
recv command: ps
run command over,  stdout size: 145
send result: ps
recv command: ps aux
run command over,  stdout size: 6175
send result: ps aux

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$./client "ps"
  PID TTY          TIME CMD
  269 pts/1    00:00:00 zsh
  480 pts/1    00:00:00 server
  492 pts/1    00:00:00 server
  493 pts/1    00:00:00 ps

$./client "ps aux"
USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root         1  0.1  1.0   4964  2652 ?        Ss   08:59   0:01 /sbin/init
root         2  0.0  0.0      0     0 ?        S    08:59   0:00 [kthreadd]
root         3  0.0  0.0      0     0 ?        S    08:59   0:00 [ksoftirqd/0]
root         4  0.0  0.0      0     0 ?        S    08:59   0:00 [kworker/0:0]
root         5  0.0  0.0      0     0 ?        S<   08:59   0:00 [kworker/0:0H]
root         6  0.0  0.0      0     0 ?        S    08:59   0:00 [kworker/u:0]
root         7  0.0  0.0      0     0 ?        S<   08:59   0:00 [kworker/u:0H]
...

更多可扩展的问题:

  • 服务端转为Daemon进程
  • 网络字节序的调整
  • stderr的获取
  • 网络传输字节的加密