一般我们聊到 netpoll 时,是指 Go runtime 中借助于epoll对套接字进行批量监听、数据到来时唤醒特定goroutine的机制。对应的代码存放在runtime/netpoll.go 和 runtime/netpoll_epoll.go (只考虑linux) 中。为此 runtime 提供了两大类函数:
第一类:调用方是 Go Runtime。
第二类:调用方是internal/poll、net、net/http等。
这些函数都会被link到 internal/poll.runtime_xxx, xxx 可以是。
runtime_pollServerInit/runtime_pollOpen等。
后面我们挑一些主要的函数来说一下。
netpollGenericInit 保证 poller 被初始化,原子变量netpollInited保证其仅被初始化一次。
func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 {
lockInit(&netpollInitLock, lockRankNetpollInit)
lock(&netpollInitLock)
if netpollInited == 0 {
netpollinit()
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}
这个函数只是一个壳,初始化逻辑封装在netpollinit函数中,依赖于平台具体的实现。linux下,init的逻辑是:
单独创建一对pipe后,runtime就能够按需中断epoll_wait,让netpoll函数立即返回。
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd < 0 {
epfd = epollcreate(1024)
if epfd < 0 {
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
closeonexec(epfd)
}
r, w, errno := nonblockingPipe()
if errno != 0 {
println("runtime: pipe failed with", -errno)
throw("runtime: pipe failed")
}
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
if errno != 0 {
println("runtime: epollctl failed with", -errno)
throw("runtime: epollctl failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
netpoll函数的功能是检查可用的网络连接,它的工作流程是(happy path):
struct pollDesc中包含两个信号量字段,可以表示四种状态:
对应一些辅助函数:
netpoll函数的代码在runtime/netpoll_epoll.go中,部分代码如下:
func netpoll(delay int64) gList {
// epoll fd 为-1,说明不需要poll
if epfd == -1 {
return gList{}
}
var waitms int32
// ...省略一段代码
var events [128]epollevent
retry:
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {
return gList{}
}
goto retry
}
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
// ... read pipe 有数据
// 不需要唤醒任何goroutine
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.setEventErr(ev.events == _EPOLLERR)
// 将goroutine置为 pdReady
// 并添加到toRun *gList
netpollready(&toRun, pd, mode)
}
}
return toRun
}
备注: netpollready 函数借助于netpollunblock修改goroutine状态,并将其加到 io ready 的 goroutine list。
runtime在调用 netpoll 时,通常采用的是 nonblock 模式(delay=0), 只有在 findrunnable 的最后一个环节,会检查是否有单独的M(GMP中的M)进行net polling,如果没有,会block等待delay参数指定的时间。
netpollBreak函数的功能比较简单,但实现比较有意思。它和netpoll函数通过变量netpollWakeSig进行交互,由于在不同的goroutine中,所以对于该变量的操作都是原子操作。
// netpollBreak interrupts an epollwait.
func netpollBreak() {
if atomic.Cas(&netpollWakeSig, 0, 1) {
for {
var b byte
n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
if n == 1 {
break
}
if n == -_EINTR {
continue
}
if n == -_EAGAIN {
return
}
println("runtime: netpollBreak write failed with", -n)
throw("runtime: netpollBreak write failed")
}
}
}
poll_runtime_pollOpen 的逻辑分为三块:
poll_runtime_pollOpen函数的实现位于 runtime/netpoll.go 中, 主要逻辑如下:
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
pd := pollcache.alloc()
lock(&pd.lock)
wg := pd.wg.Load()
if wg != 0 && wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
rg := pd.rg.Load()
if rg != 0 && rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
pd.fd = fd
// ... 省略部分初始化逻辑
unlock(&pd.lock)
errno := netpollopen(fd, pd)
if errno != 0 {
pollcache.free(pd)
return nil, int(errno)
}
return pd, 0
}
// 位于net/netpoll_epoll.go
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
poll_runtime_pollWait 函数只是对 netpollblock 函数的封装,增加了容错。值得注意的是,该函数不是runtime触发的,而是用户程序触发的。
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return pollNoError
}
下面我们看下用户程序如何触发 poll_runtime_xxx 系列的函数。首先,套接字分为两类:LISTEN套接字(Server套接字) 和 ESTABLISHED套接字(TCPConn);
从http server的角度来看,LISTEN套接字注册epoll监听的链路如下:
// net/http/server.go
func ListenAndServe(addr string, handler Handler) error
// net/http/server.go
func (srv *Server) ListenAndServe() error
// net/dial.go
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}
// net/dial.go
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error)
// net/tcpsock_posix.go
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error)
// net/ipsock_posix.go
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error)
// net/sock_posix.go
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error)
// net/sock_posix.go
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error)
if err = fd.init(); err != nil {
return err
}
// net/fd_unix.go
func (fd *netFD) init() error {
// fd.pfd 类型是 poll.FD
return fd.pfd.Init(fd.net, true)
}
// internal/poll/fd_unix.go
func (fd *FD) Init(net string, pollable bool) error {
// We don't actually care about the various network types.
if net == "file" {
fd.isFile = true
}
if !pollable {
fd.isBlocking = 1
return nil
}
err := fd.pd.init(fd)
if err != nil {
// If we could not initialize the runtime poller,
// assume we are using blocking mode.
fd.isBlocking = 1
}
return err
}
// internal/poll/fd_poll_runtime.go
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}
http server accept 新的tcp conn。
// net/http/server.go
func (srv *Server) Serve(l net.Listener) error {
for {
rw, err := l.Accept()
// net/tcpsock.go
func (l *TCPListener) Accept() (Conn, error)
func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
// net/fd_posix.go
func (fd *netFD) accept() (netfd *netFD, err error) {
d, rsa, errcall, err := fd.pfd.Accept()
// 省略部分代码
if err = netfd.init(); err != nil
// 省略部分代码
// internal/poll/fd_unix.go
func (fd *FD) Init(net string, pollable bool) error
// internal/poll/fd_poll_runtime.go
func (pd *pollDesc) init(fd *FD) error
netFD是对套接字(网络文件描述符)的封装。对于Server套接字而言,可以通过accept方法从Server套接字(LISTEN套接字)获取新的TCP连接(或ESTABLISHED套接字)。Linux的accept系统调用返回的ESTABLISHED套接字是一个int值,通过 newFD 和 init 函数将其封装为一个完整的 netFD结构,后面会被封装为一个net.TCPConn。
对于操作系统而言,LISTEN套接字和ESTABLISHED套接字都只是一个int类型的文件描述符,没有本质区别。系统调用accept和read都是从套接字读取数据,所以epoll里会放到一个batch里去监听。
这是 netFD 的定义和accept方法的实现:
// Network file descriptor.
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
func (fd *netFD) accept() (netfd *netFD, err error) {
d, rsa, errcall, err := fd.pfd.Accept()
if err != nil {
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return nil, err
}
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
net.netFD 依赖 poll.FD 实现poll功能。区别正如名字所展示,net.netFD是封装了网络相关的功能,而 poll.FD是更为通用的FD,封装了文件描述符上能进行的操作。其定义如下:
// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex
// System file descriptor. Immutable until Close.
Sysfd int
// I/O poller.
pd pollDesc
// Writev cache.
iovecs *[]syscall.Iovec
// Semaphore signaled when file is closed.
csema uint32
// Non-zero if this file has been set to blocking mode.
isBlocking uint32
// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable.
IsStream bool
// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection.
ZeroReadIsEOF bool
// Whether this is a file rather than a network socket.
isFile bool
}
poll.FD 依赖 poll.pollDesc 实现poll功能。poll.pollDesc 实现了 IO polling 的功能。poll.pollDesc 有一系列的方法,比如 init、wait、close、prepare 等都是对 runtime_pollXXX 函数系列的封装,下面诗pollDesc的部分逻辑:
type pollDesc struct {
runtimeCtx uintptr
}
var serverInit sync.Once
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}
当前标题:GoBIO/NIO探讨:Gonetpoll是如何工作的
当前地址:http://www.shufengxianlan.com/qtweb/news39/330989.html
网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联