前言
Socket在實(shí)際系統(tǒng)程序開發(fā)當(dāng)中,應(yīng)用非常廣泛,也非常重要。實(shí)際應(yīng)用中服務(wù)器經(jīng)常需要支持多個(gè)客戶端連接,實(shí)現(xiàn)高并發(fā)服務(wù)器模型顯得尤為重要。高并發(fā)服務(wù)器從簡(jiǎn)單的循環(huán)服務(wù)器模型處理少量網(wǎng)絡(luò)并發(fā)請(qǐng)求,演進(jìn)到解決C10K,C10M問題的高并發(fā)服務(wù)器模型。本文通過一個(gè)簡(jiǎn)單的多線程模型,帶領(lǐng)大家學(xué)習(xí)如何自己實(shí)現(xiàn)一個(gè)簡(jiǎn)單的并發(fā)服務(wù)器。
C/S架構(gòu)
服務(wù)器-客戶機(jī),即Client-Server(C/S)結(jié)構(gòu)。C/S結(jié)構(gòu)通常采取兩層結(jié)構(gòu)。服務(wù)器負(fù)責(zé)數(shù)據(jù)的管理,客戶機(jī)負(fù)責(zé)完成與用戶的交互任務(wù)。
在C/S結(jié)構(gòu)中,應(yīng)用程序分為兩部分:服務(wù)器部分和客戶機(jī)部分。服務(wù)器部分是多個(gè)用戶共享的信息與功能,執(zhí)行后臺(tái)服務(wù),如控制共享數(shù)據(jù)庫(kù)的操作等;客戶機(jī)部分為用戶所專有,負(fù)責(zé)執(zhí)行前臺(tái)功能,在出錯(cuò)提示、在線幫助等方面都有強(qiáng)大的功能,并且可以在子程序間自由切換。
如上圖所示:這是基于套接字實(shí)現(xiàn)客戶端和服務(wù)器相連的函數(shù)調(diào)用關(guān)系,socket API資料比較多,本文不再過多敘述。
pthread線程庫(kù):(POSIX)
pthread線程庫(kù)是Linux下比較常用的一個(gè)線程庫(kù),關(guān)于他的用法和特性大家可以自行搜索相關(guān)文章,下面只簡(jiǎn)單介紹他的用法和編譯。
線程標(biāo)識(shí)
線程有ID, 但不是系統(tǒng)唯一, 而是進(jìn)程環(huán)境中唯一有效.線程的句柄是pthread_t類型, 該類型不能作為整數(shù)處理, 而是一個(gè)結(jié)構(gòu).下面介紹兩個(gè)函數(shù):
頭文件:?<pthread.h>
原型:?int?pthread_equal(pthread_t?tid1,?pthread_t?tid2);
返回值:?相等返回非0,?不相等返回0.
說明:?比較兩個(gè)線程ID是否相等.
頭文件:?<pthread.h>
原型:?pthread_t?pthread_self();
返回值:?返回調(diào)用線程的線程ID.
線程創(chuàng)建
在執(zhí)行中創(chuàng)建一個(gè)線程, 可以為該線程分配它需要做的工作(線程執(zhí)行函數(shù)), 該線程共享進(jìn)程的資源. 創(chuàng)建線程的函數(shù)pthread_create()
頭文件:?<pthread.h>
原型:?int?pthread_create(pthread_t?*restrict?tidp,?const?pthread_attr_t?*restrict?attr,?void?*(start_rtn)(void),?void?*restrict?arg);
返回值:?成功則返回0,?否則返回錯(cuò)誤編號(hào).
參數(shù):
tidp:?指向新創(chuàng)建線程ID的變量,?作為函數(shù)的輸出.
attr:?用于定制各種不同的線程屬性,?NULL為默認(rèn)屬性(見下).
start_rtn:?函數(shù)指針,?為線程開始執(zhí)行的函數(shù)名.該函數(shù)可以返回一個(gè)void?*類型的返回值,
而這個(gè)返回值也可以是其他類型,并由?pthread_join()獲取
arg:?函數(shù)的唯一無類型(void)指針參數(shù),?如要傳多個(gè)參數(shù),?可以用結(jié)構(gòu)封裝.
編譯
因?yàn)閜thread的庫(kù)不是linux系統(tǒng)的庫(kù),所以在進(jìn)行編譯的時(shí)候要加上?????-lpthread
#?gcc?filename?-lpthread??//默認(rèn)情況下gcc使用c庫(kù),要使用額外的庫(kù)要這樣選擇使用的庫(kù)
常見的網(wǎng)絡(luò)服務(wù)器模型
本文結(jié)合自己的理解,主要以TCP為例,總結(jié)了幾種常見的網(wǎng)絡(luò)服務(wù)器模型的實(shí)現(xiàn)方式,并最終實(shí)現(xiàn)一個(gè)簡(jiǎn)單的命令行聊天室。
單進(jìn)程循環(huán)
單線進(jìn)程循環(huán)原理就是主進(jìn)程沒和客戶端通信,客戶端都要先連接服務(wù)器,服務(wù)器接受一個(gè)客戶端連接后從客戶端讀取數(shù)據(jù),然后處理并將處理的結(jié)果返還給客戶端,然后再接受下一個(gè)客戶端的連接請(qǐng)求。
優(yōu)點(diǎn)
單線程循環(huán)模型優(yōu)點(diǎn)是簡(jiǎn)單、易于實(shí)現(xiàn),沒有同步、加鎖這些麻煩事,也沒有這些開銷。
缺點(diǎn)
- 阻塞模型,網(wǎng)絡(luò)請(qǐng)求串行處理;
- 沒有利用多核cpu的優(yōu)勢(shì),網(wǎng)絡(luò)請(qǐng)求串行處理;
- 無法支持同時(shí)多個(gè)客戶端連接;
- 程序串行操作,服務(wù)器無法實(shí)現(xiàn)同時(shí)收發(fā)數(shù)據(jù)。
單線程IO復(fù)用
linux高并發(fā)服務(wù)器中常用epoll作為IO復(fù)用機(jī)制。線程將需要處理的socket讀寫事件都注冊(cè)到epoll中,當(dāng)有網(wǎng)絡(luò)IO發(fā)生時(shí),epoll_wait返回,線程檢查并處理到來socket上的請(qǐng)求。
優(yōu)點(diǎn)
- 實(shí)現(xiàn)簡(jiǎn)單, 減少鎖開銷,減少線程切換開銷。
缺點(diǎn)
- 只能使用單核cpu,handle時(shí)間過長(zhǎng)會(huì)導(dǎo)致整個(gè)服務(wù)掛死;
- 當(dāng)有客戶端數(shù)量超過一定數(shù)量后,性能會(huì)顯著下降;
- 只適用高IO、低計(jì)算,handle處理時(shí)間短的場(chǎng)景。
多線程/多進(jìn)程
多線程、多進(jìn)程模型主要特點(diǎn)是每個(gè)網(wǎng)絡(luò)請(qǐng)求由一個(gè)進(jìn)程/線程處理,線程內(nèi)部使用阻塞式系統(tǒng)調(diào)用,在線程的職能劃分上,可以由一個(gè)單獨(dú)的線程處理accept連接,其余線程處理具體的網(wǎng)絡(luò)請(qǐng)求(收包,處理,發(fā)包);還可以多個(gè)進(jìn)程單獨(dú)listen、accept網(wǎng)絡(luò)連接。
優(yōu)點(diǎn):
1、實(shí)現(xiàn)相對(duì)簡(jiǎn)單;
2、利用到CPU多核資源。
缺點(diǎn):
1、線程內(nèi)部還是阻塞的,舉個(gè)極端的例子,如果一個(gè)線程在handle的業(yè)務(wù)邏輯中sleep了,這個(gè)線程也就掛住了。
多線程/多進(jìn)程IO復(fù)用
多線程、多進(jìn)程IO服用模型,每個(gè)子進(jìn)程都監(jiān)聽服務(wù),并且都使用epoll機(jī)制來處理進(jìn)程的網(wǎng)絡(luò)請(qǐng)求,子進(jìn)程 accept() 后將創(chuàng)建已連接描述符,然后通過已連接描述符來與客戶端通信。該機(jī)制適用于高并發(fā)的場(chǎng)景。
優(yōu)點(diǎn):
- 支撐較高并發(fā)。
缺點(diǎn):
- 異步編程不直觀、容易出錯(cuò)
多線程劃分IO角色
多線程劃分IO角色主要功能有:一個(gè)accept thread處理新連接建立;一個(gè)IO thread pool處理網(wǎng)絡(luò)IO;一個(gè)handle thread pool處理業(yè)務(wù)邏輯。使用場(chǎng)景如:電銷應(yīng)用,thrift TThreadedSelectorServer。
優(yōu)點(diǎn):
- 按不同功能劃分線程,各線程處理固定功能,效率更高
- 可以根據(jù)業(yè)務(wù)特點(diǎn)配置線程數(shù)量來性能調(diào)優(yōu)
缺點(diǎn):
- 線程間通信需要引入鎖開銷
- 邏輯較復(fù)雜,實(shí)現(xiàn)難度大
小結(jié)
上面介紹了常見的網(wǎng)絡(luò)服務(wù)器模型,還有AIO、協(xié)程,甚至還有其他的變型,在這里不再討論。重要的是理解每種場(chǎng)景中所面臨的問題和每種模型的特點(diǎn),設(shè)計(jì)出符合應(yīng)用場(chǎng)景的方案才是好方案。
多線程并發(fā)服務(wù)器模型
下面我們主要討論多線程并發(fā)服務(wù)器模型。
代碼結(jié)構(gòu)
并發(fā)服務(wù)器代碼結(jié)構(gòu)如下:
thread_func()
{
while(1)?{
recv(...);
process(...);
send(...);
}
close(...);
}
main(
socket(...);
bind(...);
listen(...);
while(1)?{
accept(...);
pthread_create();
}
}
由上可以看出,服務(wù)器分為兩部分:主線程、子線程。
主線程
main函數(shù)即主線程,它的主要任務(wù)如下:
- socket()創(chuàng)建監(jiān)聽套字;
- bind()綁定端口號(hào)和地址;
- listen()開啟監(jiān)聽;
- accept()等待客戶端的連接,
- 當(dāng)有客戶端連接時(shí),accept()會(huì)創(chuàng)建一個(gè)新的套接字new_fd;
- 主線程會(huì)創(chuàng)建子線程,并將new_fd傳遞給子線程。
子線程
- 子線程函數(shù)為thread_func(),他通過new_fd處理和客戶端所有的通信任務(wù)。
客戶端連接服務(wù)器詳細(xì)步驟
下面我們分步驟來看客戶端連接服務(wù)器的分步說明。
1. 客戶端連接服務(wù)器
- 服務(wù)器建立起監(jiān)聽套接字listen_fd,并初始化;
- 客戶端創(chuàng)建套接字fd1;
- 客戶端client1通過套接字fd1連接服務(wù)器的listen_fd;
2. 主線程創(chuàng)建子線程thread1
- server收到client1的連接請(qǐng)求后,accpet函數(shù)會(huì)返回一個(gè)新的套接字newfd1;
- 后面server與client1的通信就依賴newfd1,監(jiān)聽套接字listen_fd會(huì)繼續(xù)監(jiān)聽其他客戶端的連接;
- 主線程通過pthead_create()創(chuàng)建一個(gè)子線程thread1,并把newfd1傳遞給thread1;
- server與client1的通信就分別依賴newfd1、fd1。
- client1為了能夠?qū)崟r(shí)收到server發(fā)送的信息,同時(shí)還要能夠從鍵盤上讀取數(shù)據(jù),這兩個(gè)操作都是阻塞的,沒有數(shù)據(jù)的時(shí)候進(jìn)程會(huì)休眠,所以必須創(chuàng)建子線程read_thread;
- client1的主線負(fù)責(zé)從鍵盤上讀取數(shù)據(jù)并發(fā)送給,子線程read_thread負(fù)責(zé)從server接受信息。
3. client2連接服務(wù)器
- 客戶端client2創(chuàng)建套接字fd2;
- 通過connect函數(shù)連接server的listen_fd;
4. 主線程創(chuàng)建子線程thread2
- server收到client2的連接請(qǐng)求后,accpet函數(shù)會(huì)返回一個(gè)新的套接字newfd2;
- 后面server與client2的通信就依賴newfd2,監(jiān)聽套接字listen_fd會(huì)繼續(xù)監(jiān)聽其他客戶端的連接;
- 主線程通過pthead_create()創(chuàng)建一個(gè)子線程thread2,并把newfd2傳遞給thread2;
- server與client1的通信就分別依賴newfd2、fd2。
- 同樣client2為了能夠?qū)崟r(shí)收到server發(fā)送的信息,同時(shí)還要能夠從鍵盤上讀取數(shù)據(jù)必須創(chuàng)建子線程read_thread;
- client1的主線負(fù)責(zé)從鍵盤上讀取數(shù)據(jù)并發(fā)送給,子線程read_thread負(fù)責(zé)從server接受信息。
由上圖可見,每一個(gè)客戶端連接server后,server都要?jiǎng)?chuàng)建一個(gè)專門的thread負(fù)責(zé)和該客戶端的通信;每一個(gè)客戶端和server都有一對(duì)固定的fd組合用于連接。
實(shí)例
好了,理論講完了,根據(jù)一口君的慣例,也繼承祖師爺?shù)慕陶d:talk is cheap,show you my code.不上代碼,只寫理論的文章都是在耍流氓。
本例的主要功能描述如下:
- 實(shí)現(xiàn)多個(gè)客戶端可以同時(shí)連接服務(wù)器;
- 客戶端可以實(shí)現(xiàn)獨(dú)立的收發(fā)數(shù)據(jù);
- 客戶端發(fā)送數(shù)據(jù)給服務(wù)器后,服務(wù)器會(huì)將數(shù)據(jù)原封不動(dòng)返回給客戶端。
服務(wù)器端
/*********************************************
服務(wù)器程序??TCPServer.c
公眾號(hào):一口Linux
*********************************************/
#include?<stdio.h>
#include?<sys/types.h>
#include?<sys/socket.h>
#include?<arpa/inet.h>
#include?<errno.h>
#include?<string.h>
#include?<pthread.h>
#include?<stdlib.h>
#define?RECVBUFSIZE?2048
void?*rec_func(void?*arg)
{
int?sockfd,new_fd,nbytes;
char?buffer[RECVBUFSIZE];
int?i;
new_fd?=?*((int?*)?arg);
free(arg);
while(1)
{
if((nbytes=recv(new_fd,buffer,?RECVBUFSIZE,0))==-1)
{
fprintf(stderr,"Read?Error:%sn",strerror(errno));
exit(1);
}
if(nbytes?==?-1)
{//客戶端出錯(cuò)了?返回值-1
close(new_fd);
break;
}
if(nbytes?==?0)
{//客戶端主動(dòng)斷開連接,返回值是0
close(new_fd);
break;
}
buffer[nbytes]='';
printf("I?have?received:%sn",buffer);
if(send(new_fd,buffer,strlen(buffer),0)==-1)
{
fprintf(stderr,"Write?Error:%sn",strerror(errno));
exit(1);
}
}
}
int?main(int?argc,?char?*argv[])
{
char?buffer[RECVBUFSIZE];
int?sockfd,new_fd,nbytes;
struct?sockaddr_in?server_addr;
struct?sockaddr_in?client_addr;
int?sin_size,portnumber;
char?hello[]="Hello!?Socket?communication?world!n";
pthread_t?tid;
int?*pconnsocke?=?NULL;
int?ret,i;
if(argc!=2)
{
fprintf(stderr,"Usage:%s?portnumberan",argv[0]);
exit(1);
}
/*端口號(hào)不對(duì),退出*/
if((portnumber=atoi(argv[1]))<0)
{
fprintf(stderr,"Usage:%s?portnumberan",argv[0]);
exit(1);
}
/*服務(wù)器端開始建立socket描述符??sockfd用于監(jiān)聽*/
if((sockfd=socket(AF_INET,SOCK_STREAM,0))==-1)
{
fprintf(stderr,"Socket?error:%sna",strerror(errno));
exit(1);
}
/*服務(wù)器端填充?sockaddr結(jié)構(gòu)*/
bzero(&server_addr,sizeof(struct?sockaddr_in));
server_addr.sin_family?????=AF_INET;
/*自動(dòng)填充主機(jī)IP*/
server_addr.sin_addr.s_addr=htonl(INADDR_ANY);//自動(dòng)獲取網(wǎng)卡地址
server_addr.sin_port???????=htons(portnumber);
/*捆綁sockfd描述符*/
if(bind(sockfd,(struct?sockaddr?*)(&server_addr),sizeof(struct?sockaddr))==-1)
{
fprintf(stderr,"Bind?error:%sna",strerror(errno));
exit(1);
}
/*監(jiān)聽sockfd描述符*/
if(listen(sockfd,?10)==-1)
{
fprintf(stderr,"Listen?error:%sna",strerror(errno));
exit(1);
}
while(1)
{
/*服務(wù)器阻塞,直到客戶程序建立連接*/
sin_size=sizeof(struct?sockaddr_in);
if((new_fd?=?accept(sockfd,(struct?sockaddr?*)&client_addr,&sin_size))==-1)
{
fprintf(stderr,"Accept?error:%sna",strerror(errno));
exit(1);
}
pconnsocke?=?(int?*)?malloc(sizeof(int));
*pconnsocke?=?new_fd;
ret?=?pthread_create(&tid,?NULL,?rec_func,?(void?*)?pconnsocke);
if?(ret?<?0)
{
perror("pthread_create?err");
return?-1;
}
}
//close(sockfd);
exit(0);
}
客戶端
/*********************************************
服務(wù)器程序??TCPServer.c
公眾號(hào):一口Linux
*********************************************/
#include?<stdio.h>
#include?<sys/types.h>
#include?<sys/socket.h>
#include?<arpa/inet.h>
#include?<errno.h>
#include?<string.h>
#include?<pthread.h>
#include?<stdlib.h>
#define?RECVBUFSIZE?1024
void?*func(void?*arg)
{
int?sockfd,new_fd,nbytes;
char?buffer[RECVBUFSIZE];
new_fd?=?*((int?*)?arg);
free(arg);
while(1)
{
if((nbytes=recv(new_fd,buffer,?RECVBUFSIZE,0))==-1)
{
fprintf(stderr,"Read?Error:%sn",strerror(errno));
exit(1);
}
buffer[nbytes]='';
printf("I?have?received:%sn",buffer);
}
}
int?main(int?argc,?char?*argv[])
{
int?sockfd;
char?buffer[RECVBUFSIZE];
struct?sockaddr_in?server_addr;
struct?hostent?*host;
int?portnumber,nbytes;
pthread_t?tid;
int?*pconnsocke?=?NULL;
int?ret;
//檢測(cè)參數(shù)個(gè)數(shù)
if(argc!=3)
{
fprintf(stderr,"Usage:%s?hostname?portnumberan",argv[0]);
exit(1);
}
//argv2?存放的是端口號(hào)?,讀取該端口,轉(zhuǎn)換成整型變量
if((portnumber=atoi(argv[2]))<0)
{
fprintf(stderr,"Usage:%s?hostname?portnumberan",argv[0]);
exit(1);
}
//創(chuàng)建一個(gè)?套接子
if((sockfd=socket(AF_INET,SOCK_STREAM,0))==-1)
{
fprintf(stderr,"Socket?Error:%san",strerror(errno));
exit(1);
}
//填充結(jié)構(gòu)體,ip和port必須是服務(wù)器的
bzero(&server_addr,sizeof(server_addr));
server_addr.sin_family=AF_INET;
server_addr.sin_port=htons(portnumber);
server_addr.sin_addr.s_addr?=?inet_addr(argv[1]);//argv【1】?是server?ip地址
/*?í?§3ìDò·¢?eá??ó???ó*/
if(connect(sockfd,(struct?sockaddr?*)(&server_addr),sizeof(struct?sockaddr))==-1)
{
fprintf(stderr,"Connect?Error:%san",strerror(errno));
exit(1);
}
//創(chuàng)建線程
pconnsocke?=?(int?*)?malloc(sizeof(int));
*pconnsocke?=?sockfd;
ret?=?pthread_create(&tid,?NULL,?func,?(void?*)?pconnsocke);
if?(ret?<?0)
{
perror("pthread_create?err");
return?-1;
}
while(1)
{
#if?1
printf("input?msg:");
scanf("%s",buffer);
if(send(sockfd,buffer,strlen(buffer),0)==-1)
{
fprintf(stderr,"Write?Error:%sn",strerror(errno));
exit(1);
}
#endif
}
close(sockfd);
exit(0);
}
編譯
編譯線程,需要用到pthread庫(kù),編譯命令如下:
- gcc s.c -o s -lpthread
- gcc cli.c -o c -lpthread先本機(jī)測(cè)試
- 開啟一個(gè)終端 ./s 8888
- 再開一個(gè)終端 ./cl 127.0.0.1 8888,輸入一個(gè)字符串"qqqqqqq"
- 再開一個(gè)終端 ./cl 127.0.0.1 8888,輸入一個(gè)字符串"yikoulinux"
有讀者可能會(huì)注意到,server創(chuàng)建子線程的時(shí)候用的是以下代碼:
pconnsocke?=?(int?*)?malloc(sizeof(int));
*pconnsocke?=?new_fd;
ret?=?pthread_create(&tid,?NULL,?rec_func,?(void?*)?pconnsocke);
if?(ret?<?0)
{
perror("pthread_create?err");
return?-1;
}
為什么必須要malloc一塊內(nèi)存專門存放這個(gè)新的套接字呢?
這個(gè)是一個(gè)很隱蔽,很多新手都容易犯的錯(cuò)誤。下一章,我會(huì)專門給大家講解。