这一章节我们将详细描述网络关键API的实现,主要包括Listen、Accept、Read、Write等。 另外,为了突出关键流程,我们选择忽略所有的错误。这样可以使得代码看起来更为简单。 而且我们只关注tcp协议实现,udp和unix socket不是我们关心的。
Listen
func Listen(net, laddr string) (Listener, error) {
la, err := resolveAddr("listen", net, laddr, noDeadline)
......
switch la := la.toAddr().(type) {
case *TCPAddr:
l, err = ListenTCP(net, la)
case *UnixAddr:
......
}
......
}
// 对于tcp协议,返回的的是TCPListener
func ListenTCP(net string, laddr *TCPAddr) (*TCPListener, error) {
......
fd, err := internetSocket(net, laddr, nil, noDeadline, syscall.SOCK_STREAM, 0, "listen")
......
return &TCPListener{fd}, nil
}
func internetSocket(net string, laddr, raddr sockaddr, deadline time.Time, sotype, proto int, mode string) (fd *netFD, err error) {
......
return socket(net, family, sotype, proto, ipv6only, laddr, raddr, deadline)
}
func socket(net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, deadline time.Time) (fd *netFD, err error) {
// 创建底层socket,设置属性为O_NONBLOCK
s, err := sysSocket(family, sotype, proto)
......
setDefaultSockopts(s, family, sotype, ipv6only)
// 创建新netFD结构
fd, err = newFD(s, family, sotype, net)
......
if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
// 调用底层listen监听创建的套接字
fd.listenStream(laddr, listenerBacklog)
return fd, nil
case syscall.SOCK_DGRAM:
......
}
}
}
// 最终调用该函数来创建一个socket
// 并且将socket属性设置为O_NONBLOCK
func sysSocket(family, sotype, proto int) (int, error) {
syscall.ForkLock.RLock()
s, err := syscall.Socket(family, sotype, proto)
if err == nil {
syscall.CloseOnExec(s)
}
syscall.ForkLock.RUnlock()
if err != nil {
return -1, err
}
if err = syscall.SetNonblock(s, true); err != nil {
syscall.Close(s)
return -1, err
}
return s, nil
}
func (fd *netFD) listenStream(laddr sockaddr, backlog int) error {
if err := setDefaultListenerSockopts(fd.sysfd)
if lsa, err := laddr.sockaddr(fd.family); err != nil {
return err
} else if lsa != nil {
// Bind绑定至该socket
if err := syscall.Bind(fd.sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
}
// 监听该socket
if err := syscall.Listen(fd.sysfd, backlog);
// 这里非常关键:初始化socket与异步IO相关的内容
if err := fd.init(); err != nil {
return err
}
lsa, _ := syscall.Getsockname(fd.sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
我们这里看到了如何实现Listen。流程基本都很简单,但是因为我们使用了异步编程,因此,我们在Listen完该socket后,还必须将其添加到监听队列中,以后该socket有事件到来时能够及时通知到。
对linux有所了解的应该都知道epoll,没错golang使用的就是epoll机制来实现socket事件通知。那我们看对一个监听socket,是如何将其添加到epoll的监听队列中呢?
func (fd *netFD) init() error {
if err := fd.pd.Init(fd); err != nil {
return err
}
return nil
}
func (pd *pollDesc) Init(fd *netFD) error {
// 利用了Once机制,保证一个进程只会执行一次
// runtime_pollServerInit:
// TEXT net·runtime_pollServerInit(SB),NOSPLIT,$0-0
// JMP runtime·netpollServerInit(SB)
serverInit.Do(runtime_pollServerInit)
// runtime_pollOpen:
// TEXT net·runtime_pollOpen(SB),NOSPLIT,$0-0
// JMP runtime·netpollOpen(SB)
ctx, errno := runtime_pollOpen(uintptr(fd.sysfd))
if errno != 0 {
return syscall.Errno(errno)
}
pd.runtimeCtx = ctx
return nil
}
这里就是socket异步编程的关键:
netpollServerInit()初始化异步编程结构,对于epoll,该函数是netpollinit,且使用Once机制保证一个进程 只会初始化一次;
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd >= 0 {
return
}
epfd = epollcreate(1024)
if epfd >= 0 {
closeonexec(epfd)
return
}
......
}
netpollOpen则在socket被创建出来后将其添加到epoll队列中,对于epoll,该函数被实例化为netpollopen。
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
OK,看到这里,我们也就明白了,监听一个套接字的时候无非就是传统的socket异步编程,然后将该socket添加到 epoll的事件监听队列中。
Accept
既然我们描述的重点的tcp协议,因此,我们看看TCPListener的Accept方法是怎么实现的:
func (l *TCPListener) Accept() (Conn, error) {
c, err := l.AcceptTCP()
......
}
func (l *TCPListener) AcceptTCP() (*TCPConn, error) {
......
fd, err := l.fd.accept()
......
// 返回给调用者一个新的TCPConn
return newTCPConn(fd), nil
}
func (fd *netFD) accept() (netfd *netFD, err error) {
// 为什么对该函数加读锁?
if err := fd.readLock(); err != nil {
return nil, err
}
defer fd.readUnlock()
......
for {
// 这个accept是golang包装的系统调用
// 用来处理跨平台
s, rsa, err = accept(fd.sysfd)
if err != nil {
if err == syscall.EAGAIN {
// 如果没有可用连接,WaitRead()阻塞该协程
// 后面会详细分析WaitRead.
if err = fd.pd.WaitRead(); err == nil {
continue
}
} else if err == syscall.ECONNABORTED {
// 如果连接在Listen queue时就已经被对端关闭
continue
}
}
break
}
netfd, err = newFD(s, fd.family, fd.sotype, fd.net)
......
// 这个前面已经分析,将该fd添加到epoll队列中
err = netfd.init()
......
lsa, _ := syscall.Getsockname(netfd.sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
OK,从前面的编程事例中我们知道,一般在主协程中会accept新的connection,使用异步编程我们知道,如果没有 新连接到来,该协程会一直被阻塞,直到新连接到来有人唤醒了该协程。
一般在主协程中调用accept,如果返回值为EAGAIN,则调用WaitRead来阻塞当前协程,后续在该socket有事件到来时被唤醒,WaitRead以及唤醒过程我们会在后面仔细分析。
Read
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
return c.fd.Read(b)
}
func (fd *netFD) Read(p []byte) (n int, err error) {
// 为什么对函数调用加读锁
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
// 这个又是干嘛?
if err := fd.pd.PrepareRead(); err != nil {
return 0, &OpError{"read", fd.net, fd.raddr, err}
}
for {
n, err = syscall.Read(int(fd.sysfd), p)
if err != nil {
n = 0
// 如果返回EAGIN,阻塞当前协程直到有数据可读被唤醒
if err == syscall.EAGAIN {
if err = fd.pd.WaitRead(); err == nil {
continue
}
}
}
// 检查错误,封装io.EOF
err = chkReadErr(n, err, fd)
break
}
if err != nil && err != io.EOF {
err = &OpError{"read", fd.net, fd.raddr, err}
}
return
}
func chkReadErr(n int, err error, fd *netFD) error {
if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM && fd.sotype != syscall.SOCK_RAW {
return io.EOF
}
return err
}
Read的流程与Accept流程极其一致,阅读起来也很简单。相信不用作过多解释,自己看吧。 需要注意的是每次Read不能保证可以读到想读的那么多内容,比如缓冲区大小是10,而实际可能只读到5,应用程序需要能够处理这种情况。
Write
func (fd *netFD) Write(p []byte) (nn int, err error) {
// 为什么这里加写锁
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.writeUnlock()
// 这个是干什么?
if err := fd.pd.PrepareWrite(); err != nil {
return 0, &OpError{"write", fd.net, fd.raddr, err}
}
// nn记录总共写入的数据量,每次Write可能只能写入部分数据
for {
var n int
n, err = syscall.Write(int(fd.sysfd), p[nn:])
if n > 0 {
nn += n
}
// 如果数组数据已经全部写完,函数返回
if nn == len(p) {
break
}
// 如果写入数据时被block了,阻塞当前协程
if err == syscall.EAGAIN {
if err = fd.pd.WaitWrite(); err == nil {
continue
}
}
if err != nil {
n = 0
break
}
// 如果返回值为0,代表了什么?
if n == 0 {
err = io.ErrUnexpectedEOF
break
}
}
if err != nil {
err = &OpError{"write", fd.net, fd.raddr, err}
}
return nn, err
}
注意Write语义与Read不一样的地方:
Write尽量将用户缓冲区的内容全部写入至底层socket,如果遇到socket暂时不可写入,会阻塞当前协程; Read在某次读取成功时立即返回,可能会导致读取的数据量少于用户缓冲区的大小; 为什么会在实现上有此不同,我想可能read的优先级比较高吧,应用程序可能一直在等着,我们不能等到数据一直读完才返回,会阻塞用户。 而写不一样,优先级相对较低,而且用户一般也不着急写立即返回,所以可以将所有的数据全部写入,而且这样 也能简化应用程序的写法。
总结
上面我们基本说完了golang网络编程内的关键API流程,我们遗留了一个关键内容:当系统调用返回EAGAIN时,会 调用WaitRead/WaitWrite来阻塞当前协程,我会在接下来的章节中继续分析。