UNIX TCP回射服务器/客户端(6)

程序简介:这是一个基本的线程池服务器模型。服务器启动时,先启动N个用于处理与客户端通信的子进程,主进程与每个子进程之间建立两条用于通信管道,并关闭各自不需要的端口。当一个客户端连接上服务器时(accept),主进程就随机选择一个子进程,通过管理向其传递socket的描述符,与子进程来处理与客户端的通信,通信结束后,子进程通信管道向主进程报告通信结束。

上代码:

#include "my_unp.h"

typedef struct
{
 pid_t child_pid;  //子进程的ID
 int child_pipefd;  //子进程与父进程的通信管道
 int child_status;  //子进程状态,0为准备好了
 long child_count;  //子进程的处理号
} Child;

Child *cptr;
static int  nchildren;

ssize_t write_fd(int fd, void *ptr, size_t nbytes, int sendfd)
{
 struct msghdr msg;
 struct iovec iov[1];

//保证cmsghdr和msg_control的对齐 
 union
 {
  struct cmsghdr cm;
  char    control[CMSG_SPACE(sizeof(int))];
 } control_un;
 struct cmsghdr *cmptr;
 //设置辅助缓冲区和长度
 msg.msg_control = control_un.control;
 msg.msg_controllen = sizeof(control_un.control);

//只需要一组附属数据就够了,直接通过CMSG_FIRSTHDR取得
 cmptr = CMSG_FIRSTHDR(&msg);
 //设置必要的字段,数据和长度 
 cmptr->cmsg_len = CMSG_LEN(sizeof(int));
 cmptr->cmsg_level = SOL_SOCKET;
 //指明发送的是描述符 
 cmptr->cmsg_type = SCM_RIGHTS;
 //把fd写入辅助数据中
 *((int *) CMSG_DATA(cmptr)) = sendfd;

//UDP才需要这个,直接为空
 msg.msg_name = NULL;
 msg.msg_namelen = 0;

//设置数据缓冲区,实际上1个字节就够了
 iov[0].iov_base = ptr;
 iov[0].iov_len = nbytes;
 msg.msg_iov = iov;
 msg.msg_iovlen = 1;

//发送
 return sendmsg(fd, &msg, 0);
}

ssize_t read_fd(int fd, void *ptr, size_t nbytes, int *recvfd)
{
 struct msghdr msg;
 struct iovec iov[1];
 ssize_t  n;

//保证cmsghdr和msg_control的对齐 
 union
 {
  struct cmsghdr cm;
  char    control[CMSG_SPACE(sizeof(int))];
 } control_un;
 struct cmsghdr *cmptr;

//设置辅助数据缓冲区和长度 
 msg.msg_control = control_un.control;
 msg.msg_controllen = sizeof(control_un.control);

//UDP才需要这个,直接为空
 msg.msg_name = NULL;
 msg.msg_namelen = 0;

//设置数据缓冲区
 iov[0].iov_base = ptr;
 iov[0].iov_len = nbytes;
 msg.msg_iov = iov;
 msg.msg_iovlen = 1;

//设置结束,开始接收 
 if ( (n = recvmsg(fd, &msg, 0)) <= 0)
  return(n);

//检查一下返回结果
 if ( (cmptr = CMSG_FIRSTHDR(&msg)) != NULL &&
  cmptr->cmsg_len == CMSG_LEN(sizeof(int)) )
 {
  if (cmptr->cmsg_level != SOL_SOCKET)
   error_quit("control level != SOL_SOCKET");
  if (cmptr->cmsg_type != SCM_RIGHTS)
   error_quit("control type != SCM_RIGHTS");
  //终于拿到描述符了
  *recvfd = *((int *) CMSG_DATA(cmptr));
 }
 //出错,没有拿到描述符
 else
  *recvfd = -1;

return n;
}

//接受来自客户端字符串,然后再原样返回
void str_echo(int sockfd)
{
 ssize_t n; 
 char buf[MAXLINE]; 

again: 
 //从套接字中读取数据,写到buffer中去 
 //再将buffer中的数据写到套接字中去 
 while( (n=read(sockfd, buf, MAXLINE)) > 0 ) 
  Writen(sockfd, buf, n);

//由于信号中断,没写或读成功任何数据 
 if( n<0 && errno==EINTR ) 
  goto again; 
 else if( n < 0 ) 
  error_quit("str_echo: read error"); 
}

//子进程的主要操作函数
void child_main(int i, int listenfd, int addrlen)
{
 char c;
 int connfd;
 ssize_t n;

printf("child %ld starting\n", (long) getpid());
 while(1)
 {
  //当子进程没事干的时候,就阻塞在这里,等待父进程的调度
  n = read_fd(STDERR_FILENO, &c, 1, &connfd);
  if ( n < 0 )
   error_quit("read_fd error");
  if (connfd < 0)
   error_quit("no descriptor from read_fd");
  //处理客户请求
  str_echo(connfd);
  //关闭套接字
  Close(connfd);

//向父进程发送信息,报告自己处于空闲状态,准备好接收新请求
  Write(STDERR_FILENO, "", 1);
 }
}

//产生子进程,并创建通信管道
pid_t child_make(int i, int listenfd, int addrlen)
{
 int  sockfd[2];
 pid_t pid;

//在创建子进程之前,创造一对未命名的、相互连接的UNIX域套接字
 Socketpair(AF_LOCAL, SOCK_STREAM, 0, sockfd);
 if ( (pid = Fork()) > 0)
 {
  //父进程关闭不需要管道端口
  //设置控制子进程的结构体数组
  Close(sockfd[1]);
  cptr[i].child_pid = pid;
  cptr[i].child_pipefd = sockfd[0];
  cptr[i].child_status = 0;
  return(pid);  /* parent */
 }

//将子进程的标准错误重定向到套接字中
 Dup2(sockfd[1], STDERR_FILENO);

//关闭不需要的端口
 Close(sockfd[0]);
 Close(sockfd[1]);
 Close(listenfd);
 //该函数从不会返回
 child_main(i, listenfd, addrlen);
}

int main(int argc, char **argv)
{
 int  listenfd, i, navail, maxfd, nsel, connfd, rc;
 ssize_t  n;
 fd_set  rset, masterset;
 socklen_t addrlen, clilen;
 struct sockaddr *cliaddr;
 struct sockaddr_in servaddr; 

if( argc != 2 )
  error_quit("Using: server <child num>");

//创建用于TCP协议的套接字 
 listenfd = Socket(AF_INET, SOCK_STREAM, 0); 
 memset(&servaddr, 0, sizeof(servaddr)); 
 servaddr.sin_family = AF_INET; 
 servaddr.sin_addr.s_addr = htonl(INADDR_ANY); 
 servaddr.sin_port = htons(SERV_PORT); 

//把socket和socket地址结构联系起来   
 Bind(listenfd, (SA*)&servaddr, sizeof(servaddr)); 
 //开始监听LISTENQ端口 
 Listen(listenfd, LISTENQ); 

FD_ZERO(&masterset);
 FD_SET(listenfd, &masterset);
 maxfd = listenfd;
 cliaddr = Malloc( sizeof(addrlen) );

nchildren = atoi(argv[1]);
 navail = nchildren;
 cptr = Calloc(nchildren, sizeof(Child));

//创建子进程及其通信管理
 for (i = 0; i < nchildren; i++)
 {
  //父进程返回,子进程不返回
  child_make(i, listenfd, addrlen);
  //增加监听描述符到监听集合中
  FD_SET(cptr[i].child_pipefd, &masterset);
  //设置最大的监听描述符
  maxfd = max(maxfd, cptr[i].child_pipefd);
 }

while(1)
 {
  rset = masterset;
  //如果没有可用的子进程,就晳时关闭套接字(不accept)
  //将请求阻塞在内核中
  if (navail <= 0)
   FD_CLR(listenfd, &rset);
  nsel = Select(maxfd + 1, &rset, NULL, NULL, NULL);

//如果套接字变成可读,就接收请求,
  //并找出第一个可用的子进程,传递描述符,让子进程处理请求
  if (FD_ISSET(listenfd, &rset))
  {
   //接收请求
   clilen = addrlen;
   connfd = Accept(listenfd, cliaddr, &clilen);

//找出可用的子进程(这里负载不均衡,但对性能基本没影响)
   for (i = 0; i < nchildren; i++)
    if (cptr[i].child_status == 0)
     break;
   if (i == nchildren)
    error_quit("no available children");

//更新该子进程的结构体
   cptr[i].child_status = 1;
   cptr[i].child_count++;
   navail--;

//向子进程发出通知,让其处理请求请求
   n = write_fd(cptr[i].child_pipefd, "", 1, connfd);
   if( n < 0 )
    error_quit("write_fd error");
   Close(connfd);
   if (--nsel == 0)
    continue;
  }

//子进程完成处理后,通知父进程更新结构
  for (i = 0; i < nchildren; i++)
  {
   if (FD_ISSET(cptr[i].child_pipefd, &rset))
   {
    if ( (n = Read(cptr[i].child_pipefd, &rc, 1)) == 0)
     error_quit("child %d terminated unexpectedly", i);
    cptr[i].child_status = 0;
    navail++;
    if (--nsel == 0)
     break;
   }
  }
 }
 return 0;
}

与其配套的客户端在这里:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/705b6d211bf1cc988c7ad0b9b8966423.html