Wednesday, January 26, 2011

Linux Socket Programming, 五個I/O模型

I/O 模型


五個I/O模型

     阻塞I/O
     非阻塞I/O
     I/O復用(select和poll)
     信號驅動I/O(SIGIO)
     異步I/O


阻塞 I/O模型

進程調用recvfrom,此系統調用直到數據報到達且拷貝到應用緩沖區或是出錯才返回。最常見的錯誤是系統調用被信號中斷,進程阻塞的整段時間是指從調用recvfrom開始到它返回的這段時間,當進程返回成功指示時,應用進程開始處理數據報。

非阻塞方式

當 請求的I/O操作不能完成時,不讓進程睡眠,而應返回一個錯誤。 前三次調用recvfrom時仍無數據返回,因此內核立即返回一個錯誤。第四次調用recvfrom時,數據報已準備好,被拷貝到應用緩沖區, recvfrom返回成功指示,接著處理數據。 此過程稱為輪詢(polling)。這對CPU時間是極大的浪費。

I/O復用模型

調用select或poll,在這兩個系統調用中的某一個上阻塞,而不是阻塞真正I/O系統調用。 阻塞select調用,等待數據報套接口可讀。當select返回套接口可讀條件時,調用recevfrom將數據報拷貝到應用緩沖區中。

信號驅動I/O模型

套接口啟動信號驅動I/O, 並通過系統調用sigaction安裝一個信號處理程序。此系統調用立即返回,進程繼續工作,它是非阻塞的。
當數據報準備好被讀時,就為該進程生成一個SIGIO信號。
隨即可以在信號處理程序中調用recvfrom來讀數據報,井通知主循環數據已準備好被處理中。也可以通知主循環,讓它來讀數據報。

異步I/O模型

讓內核啟動操作,並在整個操作完成(包括將數據從內核拷貝到用戶自己的緩沖區)通知用戶。
信號驅動I/O:由內核通知我們何時可以啟動一個I/O操作,
異步I/O模型:由內核通知我們I/O操作何時完成。



select 函數


允許進程指示內核等待多個事件中的任一個發生,並僅在一個或多個事件發生或經過某指定的時間才喚醒進程。
作為一個例子,我們可以調用函數select並通知內核僅在下列情況發生時才返回
集合{1,4,5}中的任何報述字準備好讀
集合{2,7}的任何描述字準備好寫
集合{1,4}中的任何描述字有異常條件待處理
已經過了10.2秒
通知內核我們對哪些描述字感興趣(讀、寫或異常條件)以及等待多長時間。

描述字不受限套接口:任何描述字(例如文件描述字)都可用select來測試。

select 定義

int select(int maxfdp1,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout);

maxfdp1 : 描述字最大值
readset : 讀描述字集
writeset : 寫描述字集
exceptset : 異常條件的描述字集
timeout : 等待時間

readset, writeset和exceptset

讓內核測試讀、寫和異常條件所需的描述字。

為這三個參數的每一個指定一個或多個描述字值

描述字集,是一個整數數組,每個數中的每一位對應一個描述字。

數組的第一個元素對應描述字0-31,

數組戶的第二個元素對應描述字3263。

例子:

fd_set rset; //定義描述字集數據類型

FD_ZERO (&rset); //對描述字集初始化

FD_SET(1, &rset); //打開描述字的第1位

FD_SET(4, &rset) // //打開描述字的第4位

......

FD_ISSET(4, &rest) //測試描述字的第4位

FD_CLR(4, &rset) // //關閉描述字的第4位

readset 套接口準備好讀

套接口接收緩沖區中的數據字節數>=

套接口接收緩沖區低潮限度的當前值

連接的讀這一半關閉(接收了FIN的TCP連接)

套接口是一個監聽套接口旦已完成的連接數為非0。

有一個套接口錯誤待處理。

writeset 套接口準備好寫

套接口發送緩沖區中的可用空間字節數大幹等套接口發送緩沖區低潮限度的

當前值,且或者(i)套接口已連接,或者(i)套接口不要求連接

連接的寫這一半關閉。對這樣的套接口的寫操作將產生信SIGPIPE。

有一個套接口錯誤待處理。對這樣的套接口的寫操作將不阻塞且返回一個錯誤(一1)

exceptset異常條件待處理

如果一個套接口存在帶外數據或者仍處帶外標記,那它有異常條件待處理。

帶外數據(outofband data),有時也稱為加速數據(expedited data),

是指連接雙方中的一方發生重要事情,想要迅速地通知對方。

這種通知在已經排隊等待發送的任何“普通”(有時稱為“帶內”)數據之前發送。

帶外數據設計為比普通數據有更高的優先級。

帶外數據是映射到現有的連接中的,而不是在客戶機和服務器間再用一個連接。

最大描述字 maxfdp1

當select剛開始設計時,操作系統常對每個進程可用的最大描述字數上限作出

限制(4.2BSD的限制為31),select也就用相同的限制值。

unix版本對每個進程的描述字數根本不作限制 (僅受限內存量和管理性限制),

#include <sys/types.h>

#DEFINE FD_SETSIZE 256



str_cli 函數的修訂版


服務器進程一終止客戶就能馬上得到通知

早期版本的問題就在當套接口上發生了某些事件時,客戶可能阻塞fgets調用,

新版本則阻塞select調用:等待標準輸入,等待套接口可讀。

//舊的回射服務器客戶端main程序

#include “unp.h“ //包含頭文件
int main(int argc, char **argv) //argv是命令行的第二個參數
{ int sockfd; //套接口描述字
struct sockaddr_in servaddr; //IPv4地址結構
if (argc != 2) //命令行要有第二個參數(服務器地址)
err_quit("usage: tcpcli <IPaddress>");
sockfd = Socket(AF_INET, SOCK_STREAM, 0); //
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(SERV_PORT);
Inet_pton(AF_INET, argv[1], &servaddr.sin_addr);
Connect(sockfd, (SA *) &servaddr, sizeof(servaddr));
str_cli(stdin, sockfd); /* do it all */
exit(0);
}

//str_cli
#include "unp.h"
void str_cli ( FILE *fp, int sockfd)
{
char sendline[MAXLINE], recvline[MAXLINE];

while (Fgets ( sendline, MAXLINE, fp) ! = NULL) {
Writen( sockfd, sendline, strlen(sendline) );
if ( Readline ( sockfd, recvline, MAXLINE ) = = 0)
err_quit("str_cli: server terminated prematurely");
Fputs(recvline, stdout);
}
}

對套接口的處理

對方TCP發送數據,套接口就變為可讀且read返回大0

對方TCP發送一個FIN(對方進程終止),套接口就變為可讀且read返回0(文件結束)。

對方TCP發送一個RST (對方主機崩潰並重新啟動),套接口變為可讀且返回-1

#include "unp.h"
void str_cli(FILE *fp, int sockfd)
{ int maxfdp1; //最大描述字
fd_set rset; //描述字集
char sendline[MAXLINE], recvline[MAXLINE];
FD_ZERO(&rset); //描述字集清零(空集)
for ( ; ; ) {
FD_SET(fileno(fp), &rset); //打開文件描述字的測試
FD_SET(sockfd, &rset); // //打開套接口描述字的測試
maxfdp1 = max(fileno(fp), sockfd) + 1; //獲得最大描述字
Select(maxfdp1, &rset, NULL, NULL, NULL); //對是否可讀進行測試
if (FD_ISSET(sockfd, &rset)) { //如果套接口可讀
if (Readline(sockfd, recvline, MAXLINE) = = 0) //讀入一行
err_quit(“str_cli: server termi. premat..”); //對方終止時退出
Fputs(recvline, stdout); //寫到標準輸出
}
if (FD_ISSET(fileno(fp), &rset)) { //如果標準輸入可讀
if (Fgets(sendline, MAXLINE, fp) == NULL) //讀入一行
return; // 遇到^D時退出子程序
Writen(sockfd, sendline, strlen(sendline)); //寫入套接口
}
}
}




shutdown 函數


close有兩個限制可由函數shutdown來避免:

close將描述字的訪問計數減1,僅在此計數為0時才關閉套接口
shutdown可激發TCP的正常連接終止序列, 而不管訪問計數。

close終止了數據傳送的兩個方向:讀和寫。
shutdown終止的數據傳送的兩個方向:讀和寫, 或其中任一方向:讀或寫

定義:
int shutdown( int sockfd, int howto) ;

howto選項:
SHUT_RD 關閉連接的讀一半
SHUT_WR 關閉連接的寫這一半
SHUT_RDWR 關閉連接讀讀和寫



str_cli函數(再修訂)


void str_cli(FILE *fp, int sockfd)
{ int maxfdp1, stdineof ; fd_set rset;
char sendline[MAXLINE], recvline[MAXLINE];
stdineof=0; FD_ZERO(&rset);
for ( ; ; ) {
if (stdineof == 0) FD_SET(fileno(fp), &rset);
FD_SET(sockfd, &rset);
maxfdp1 = max(fileno(fp), sockfd) + 1;
Select(maxfdp1, &rset, NULL, NULL, NULL);
if (FD_ISSET(sockfd, &rset)) { /* socket is readable */
if (Readline(sockfd, recvline, MAXLINE) == 0) {
if (stdineof == 1) return; /* normal termination */
else err_quit(“str_cli: server ERR. "); }
Fputs(recvline, stdout);
}
if (FD_ISSET(fileno(fp), &rset)) { /* input is readable */
if (Fgets(sendline, MAXLINE, fp) == NULL) {
stdineof = 1;
Shutdown(sockfd, SHUT_WR); /* send FIN */
FD_CLR(fileno(fp), &rset);
continue; }
Writen(sockfd, sendline, strlen(sendline));
} } }




TCP回射服務器程序(修訂版)


使用select來處理任意數目的客戶的單進程程序

不為每個客戶派生一個子進程,避免了創建一個新進程的所有開銷。

監聽套接口

服務器只維護一個讀描述字集

描述字0、1和2分別被設置為標準輸入、標準輸出和標準錯誤輸出

監聽套接口的第一個可用的描述字是3。

與第一個客戶建立連接

監聽描述字變為可讀,是服務器調用accept。

由accept返回的新的已連接描述字將是4。

第一個客戶終止與服務器的連接

客戶TCP發送一個FIN,這使得服務器中的描述字4變為可讀。

當服務器讀此已連接套接口時,readline返回0。

關閉此套接口並相應地更新數據結構,數組元素client[0]]的值置為一1,

描述字集中的描述字4被置為0,maxfd的值沒有改變。

//源程序

int main(int argc, char **argv)
{ int i, maxi, maxfd, listenfd, connfd, sockfd;
int nready, client[FD_SETSIZE];
ssize_t n;
fd_set rset, allset;
char line[MAXLINE];
socklen_t clilen;
struct sockaddr_in cliaddr, servaddr;
listenfd = Socket(AF_INET, SOCK_STREAM, 0);
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);
Bind(listenfd, (SA *) &servaddr, sizeof(servaddr));
Listen(listenfd, LISTENQ);
maxfd = listenfd; /* initialize */
maxi = -1; /* index into client[] array */
for (i = 0; i < FD_SETSIZE; i++)
client[i] = -1; /* -1 indicates available entry */
FD_ZERO(&allset);
FD_SET(listenfd, &allset);
for ( ; ; ) {
rset = allset; /* structure assignment */
nready = Select(maxfd+1, &rset, NULL, NULL, NULL);
if (FD_ISSET(listenfd, &rset)) { /* new client connection */
clilen = sizeof(cliaddr);
connfd = Accept(listenfd, (SA *) &cliaddr, &clilen);
for (i = 0; i < FD_SETSIZE; i++)
if (client[i] < 0) {
client[i] = connfd; /* save descriptor */
break
}
if (i == FD_SETSIZE) err_quit("too many clients");
FD_SET(connfd, &allset); /* add new descriptor to set */
if (connfd > maxfd) maxfd = connfd; /* for select */
if (i > maxi) maxi = i; /* max index in client[] array */
if (--nready <= 0) continue; /* no more readable descriptors */
}
for (i = 0; i <= maxi; i++) { /* check all clients for data */
if ( (sockfd = client[i]) < 0) continue;
if (FD_ISSET(sockfd, &rset)) {
if ( (n = Readline(sockfd, line, MAXLINE)) == 0) {
/*4connection closed by client */
Close(sockfd);
FD_CLR(sockfd, &allset);
client[i] = -1;
}
else Writen(sockfd, line, n);
if (--nready <= 0) break; /* no more readable descriptors */
}
}
} //end of for loop
} //end of main



poll 函數


原型:

int poll (struct pollfd *fdarray, unsigned long nfds, int timeout )

第一個參數是指向結構數組第一個元素的指針:
struct pollfd{
int fd; // descriptor
short events // events of interest on fd
short revents // events that occured on fd
}
第二個參數是套接字個數,第三個參數是等待時間 



TCP回射服務器程序(再修訂)


用poll而不是用select來重寫回射服務器程序。

在select版本中,必須分配一個client數組以及一個名為rset的描述字集。

使用poll時, 必須分配一個poll結構的數組來維護客戶信息,而不是分配另一個數組。

與select中處理數組client相同的方法, 處理此數組的fd成員,值一1表示條目未用,

否則即為描述字值。

傳遞給poll的pollfd結構數組中的任何N成員為負值的條目都是被忽略的。

//源程序:
#include "unp.h"
#include <limits.h> /* for OPEN_MAX */
int main(int argc, char **argv)
{
int i, maxi, listenfd, connfd, sockfd;
int nready; ssize_t n;
char line[MAXLINE];
socklen_t clilen;
struct pollfd client[OPEN_MAX];
struct sockaddr_in cliaddr, servaddr;
listenfd = Socket(AF_INET, SOCK_STREAM, 0);
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);
Bind(listenfd, (SA *) &servaddr, sizeof(servaddr));
Listen(listenfd, LISTENQ);
client[0].fd = listenfd;
client[0].events = POLLRDNORM;
for (i = 1; i < OPEN_MAX; i++)
client[i].fd = -1; /* -1 indicates available entry */
maxi = 0; 
for ( ; ; ) {
nready = Poll(client, maxi+1, INFTIM);
/* new client connection */
if (client[0].revents & POLLRDNORM) {
clilen = sizeof(cliaddr);
connfd = Accept(listenfd, (SA *) &cliaddr, &clilen);
for (i = 1; i < OPEN_MAX; i++)
if (client[i].fd < 0) {
client[i].fd = connfd; // save descriptor 
break;
}
if (i == OPEN_MAX) 
err_quit("too many clients");
client[i].events = POLLRDNORM;
if (i > maxi)
maxi = i; /* max index in client[] array */
if (--nready <= 0)
continue; /* no more readable descriptors */
}
for (i = 1; i <= maxi; i++) { /* check all clients for data */
if ( (sockfd = client[i].fd) < 0) continue;
if (client[i].revents & (POLLRDNORM | POLLERR)) {
if ( (n = readline(sockfd, line, MAXLINE)) < 0) {
if (errno == ECONNRESET) {
/*4connection reset by client */
Close(sockfd);
client[i].fd = -1;
} else 
err_sys("readline error");
} else if (n == 0) {
/*4connection closed by client */
Close(sockfd);
client[i].fd = -1;
} else
Writen(sockfd, line, n);
if (--nready <= 0)
break; /* no more readable descriptors */
}
}
}
}

1 comment: