lemon 发布的文章

写在前面

Go中的错误处理与Java、JavaScript或Python等其他主流编程语言略有不同。Go的内置错误不包含堆栈跟踪,也不支持传统的尝试/捕获方法来处理它们。相反,Go中的错误只是函数返回的值,可以用与任何其他数据类型几乎相同的方式来处理它们,这导致了一个惊人的轻量级和简单的设计。

在本文中,我将演示Go中处理错误的基础知识,以及一些可以在代码中遵循的简单策略,以确保程序健壮且易于调试。

error关键字

在golang中error关键字是一个interface,他的定义如下:

type error interface {
    Error() string
}

因此任何实现了interface的类型,都可以有Error() 的表现,它会返回一段字符串,也正是这样简介的表达,让golang中的错误处理异与任何其他编程语言。

实现接口

在golang标准库中,总能让我看到特别优化的设计,就比如如何实现一个错误,官方标准库如下:

- 阅读剩余部分 -

http keep alive

写在开始:

在早期的HTTP/1.0中, 协议是一个建立连接, 交互数据, 断开连接的过程. 然后作为客户端, 我们并不是加载一个uri就完成任务, 客户端需要非常频繁的和同一个服务端交互数据, 这个时候就好比我们有很多很多的电话内容要沟通, 但是又需要一件事情一次电话. 这个能想的到的性能问题.
作为互联网如此重要的协议,几乎承担了互联网绝大多数的应用层流量,为了解决这个问题,HTTP/1.0后期使用请求头: Connection: keep-alive 来告诉对方我请求完毕请不要关闭连接,后面我还有继续复用这个连接, HTTP/1.1 版本则默认使用keep-alive特性(值得注意的是这里的keep-alive特性和tcp的keepalive是完全不同的概念, 解决的也不是同一类问题).

keep-alive特性如何开启

因为golang的client,server端实现均使用HTTP/1.1规范, 所以无论你是否传递Connection: keep-alive头, 均默认开启keep-alive特性.

服务端默认开启keep-alive的源码分析

这一点可以从golang源码中看到:

首先我们看作为服务端, 当启动一个server后, 首先listen, 然后发起server处理:

func ListenAndServe(addr string, handler Handler) error {
    server := &Server{Addr: addr, Handler: handler}
    return server.ListenAndServe()
}
func (srv *Server) ListenAndServe() error {
    if srv.shuttingDown() {
        return ErrServerClosed
    }
    addr := srv.Addr
    if addr == "" {
        addr = ":http"
    }
        // 这里创建tcp链接
    ln, err := net.Listen("tcp", addr)
    if err != nil {
        return err
    }
        // 这里开始接受tcp数据包, 进行处理
    return srv.Serve(ln)
}

之后进入for循环, 一个一个的请求都将启动新的协程去处理, 由此我们也可以发现, 每一个http的请求都是通过新的协程去处理

for {
        rw, err := l.Accept()
                // 此处省略N行代码
        c.setState(c.rwc, StateNew) // before Serve can return
        go c.serve(connCtx)
    }

再进入serve方法后, 可以发现整体就是for循环一直处理该请求链接上的数据, 同时我们可以发现中间有判断doKeepAlives而退出循环, 这也是作为服务端程序, 你如果不想启用http keepalive而可以设置的选项http.Server.SetKeepAlivesEnabled(false)

客户端默认开启keep-alive的源码分析

客户端的源码在net/http/client.go中, 追踪客户端的代码有个核心的思想就是拿req取resp, 在client每一次发起请求都是 client结构体的do方法

if resp, didTimeout, err = c.send(req, deadline); err != nil {

往下追踪后知道client的所有req得到resp都是从 resp, err = rt.RoundTrip(req)发起,这也是设计者设计RoundTripper接口的目的.

最后queueForIdleConn方法可以得知, 我们的客户端在当前client结构体实例上是预先申请好连接的, 所以也就得到了keep-alive的目的.

// queueForIdleConn queues w to receive the next idle connection for w.cm.
// As an optimization hint to the caller, queueForIdleConn reports whether
// it successfully delivered an already-idle connection.
func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
    if t.DisableKeepAlives {
        return false
    }

    t.idleMu.Lock()
    defer t.idleMu.Unlock()

这是idleConn的直接体现, 同时, 方法一开始就判断了, 如果Transport结构体的实例如果设置了DisableKeepAlives的值为非0值, 则关闭连接池.

关于客户端连接池设计中的几个重要的参数

DisableKeepAlives

这个在上文中已经说过, 就是客户端Transport实例明确告诉服务端, 是否要开启长连接.

MaxConnsPerHost

代表每个客户端可以在一个host上创建最多几个连接

想了解 httpclient的连接池, 必须去阅读getConn方法, 如下有代码注释

// 这里获取连接池中的空闲链接
if delivered := t.queueForIdleConn(w); delivered {
    pc := w.pc
    // Trace only for HTTP/1.
    // HTTP/2 calls trace.GotConn itself.
    if pc.alt == nil && trace != nil && trace.GotConn != nil {
        trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))
    }
    // set request canceler to some non-nil function so we
    // can detect whether it was cleared between now and when
    // we enter roundTrip
    t.setReqCanceler(treq.cancelKey, func(error) {})
    return pc, nil
}

cancelc := make(chan error, 1)
t.setReqCanceler(treq.cancelKey, func(err error) { cancelc <- err })

// 如果当前连接池中无空闲链接, 则开始异步创建连接
t.queueForDial(w)

进入 创建连接代码

// queueForDial queues w to wait for permission to begin dialing.
// Once w receives permission to dial, it will do so in a separate goroutine.
func (t *Transport) queueForDial(w *wantConn) {
    w.beforeDial()
        // 如果MaxConnsPerHost不够, 创建
    if t.MaxConnsPerHost <= 0 {
        go t.dialConnFor(w)
        return
    }

    t.connsPerHostMu.Lock()
    defer t.connsPerHostMu.Unlock()

    if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {
        if t.connsPerHost == nil {
            t.connsPerHost = make(map[connectMethodKey]int)
        }
        t.connsPerHost[w.key] = n + 1
        go t.dialConnFor(w)
        return
    }

    if t.connsPerHostWait == nil {
        t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
    }
    q := t.connsPerHostWait[w.key]
    q.cleanFront()
    q.pushBack(w)
    t.connsPerHostWait[w.key] = q
}

总结:

http 作为开发者是最最常用的类库, 其中有一些设计细节是有异于其他http类库的, 我们更需要keep-avlie的设计细节, 关注连接池的使用, 来达到更高的吞吐量, 这些地方优化好, 是可以整体提高我们程序的性能.

背景

由于业务需要,我们在生产环境下利用rabbitmq作为消息通信的服务, 今天发现一个奇怪的现象, 服务端存在大量的Connections,同时有同等多的channels, 但是根据业务上分析, 我们完全用不到这么多的连接, 随后我们进行了排查问题的第一步:

初步思路

打开client所在的服务器

查看连接5672(amqp端口)的tcp数量以及状态

 ss -aoen state established | grep 5672
  ss -aoen state established | grep 5672 | wc -l

这个只是部分截图, 但是通过ss分析,可以肯定两件事情:

  1. 客户端维护的5672 tcp数量是很少的, 也是符合业务场景的
  2. 每一个tcp都维护了正常的keepalive

打开rabbitmq-server所在的服务器

查看连接5672(amqp端口)的tcp数量以及状态

 ss -aoen state established | grep 5672
 ss -aoen state established | grep 5672 | wc -l

通过分析发现:

  1. 服务端tcp数量很大
  2. 服务端tcp没有keepalive

进一步猜想

服务端存活着大量的dead connection, 由于客户端没有正确的关闭, 正常情况下客户端即使没有正常断开连接, 但是应该由服务端心跳检测, 或者借助tcp keepalive来实现服务端断开dead connection.但是这两种方式均为生效

由此有如下猜想:

1.服务端(rabbitmq-server)没有开启tcp keepalive?

  1. 客户端没有配置心跳?

证实猜想

翻阅官方文档, 阅读后, 可以总结为:

作为amqp协议, 鼓励你使用心跳来做conn的维护,客户端 捕获心跳异常来重连, 如果正确配置了心跳,那么服务端在心跳实际发现客户端无响应, 自然会断开.

随后, 去查看php amqp的源码, 发现如下:

    public function __construct(
        $host,
        $port,
        $user,
        $password,
        $vhost = '/',
        $insist = false,
        $login_method = 'AMQPLAIN',
        $login_response = null,
        $locale = 'en_US',
        $connection_timeout = 3.0,
        $read_write_timeout = 3.0,
        $context = null,
        $keepalive = false,
        $heartbeat = 0,
        $channel_rpc_timeout = 0.0,
        $ssl_protocol = null

if ($this->keepalive) {
    $this->enable_keepalive();
}
    protected function enable_keepalive()
    {
        if ($this->protocol === 'ssl') {
            throw new AMQPIOException('Can not enable keepalive: ssl connection does not support keepalive (#70939)');
        }
        .......
        .......

        $socket = socket_import_stream($this->sock);
        socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
    }

由上面的源码可以分析到, php的这个lib, 在默认情况下, 完全禁用了heartbeats以及keepalive, 由此证明猜想.

A zero value indicates that a peer suggests disabling heartbeats entirely. To disable heartbeats, both peers have to opt in and use the value of 0. This is highly recommended against unless the environment is known to use TCP keepalives on every host.

官方文档中明确说明, 除非特殊情况, 否则强烈建议使用心跳.

遗留的疑问

  1. tcp keepalive 还需要服务端应用程序额外开启?

根据php的源码socket_set_option中可以看到需要客户端明确开启keepalive,底层调用 ` socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
` 实现.

额外补充一个golang版本的代码:

这是客户端:

func main() {
    var tcpAddr *net.TCPAddr
    tcpAddr,_ = net.ResolveTCPAddr("tcp","127.0.0.1:9999")

    conn,err := net.DialTCP("tcp",nil,tcpAddr)
    // 这里设置和不设置影响客户端tcp keepalive
    conn.SetKeepAlive(true)

通过ss查看:

上面是未设置SetKeepAlive, 由此可以看出, 客户端需要明确指定keepalive来使用tcp keepalive机制

总结

整体写了心跳以及tcp keepalive的问题, 也发现了线上php amqplib的默认值导致的该问题, 所以这里修改php的初始化方法, 给心跳分配一个时间(golang版本默认值是15s)

tips

这里涉及到后续要删掉那些死连接, 这里提供一个小小的tips

rabbitmqctl list_connections pid port state user recv_cnt send_cnt send_pend name client_properties | more

这里可以获取client属性,然后匹配php版本,全量关闭,然后重连即可.

Too many open files 是提供http服务的应用程序很容易遇到的问题, 通常是由于系统配置不当或者程序本身打开太多文件导致的, 于是网上有两种提示你解决该类问题的办法.

  1. 你的系统配置的最大文件打开数太低了
  2. 你的程序df泄露了

本篇内容主要围绕第一个原因展开描述,问题先从这个issue开始说起

这里有人回复:

Check your hard and soft limits for file descriptors (respectively ulimit -Hn and ulimit -Hs) and your (linux) kernel settings (cat /pro/sys/fs/file-max) as these may need to be adjusted.

这里提到3个概念

  1. ulimit -Hn
  2. ulimit -Hs
  3. /pro/sys/fs/file-max

带着这3个指标, 我们开始查阅资料

什么是ulimit

help ulimit # 注意这里的命令,请切换到bash上操作,而不是zsh

修改 shell 资源限制
在允许此类控制的系统上,提供对于 shell 及其创建的进程所可用的资源的控制。

由此,我们可以看出, 我们跑在linux上的大部分程序, 都是有shell所创建进程, 那么ulimit就如同一个配置文件, 限制了我们使用系统资源.

如果你使用help ulimit, 可以查看到如下参数配置项

选项:
  -S    使用 `soft'(软)资源限制
  -H    使用 `hard'(硬)资源限制
  -a    所有当前限制都被报告
  -b    套接字缓存尺寸
  -c    创建的核文件的最大尺寸
  -d    一个进程的数据区的最大尺寸
  -e    最高的调度优先级(`nice')
  -f    有 shell 及其子进程可以写的最大文件尺寸
  -i    最多的可以挂起的信号数
  -l    一个进程可以锁定的最大内存尺寸
  -m    最大的内存进驻尺寸
  -n    最多的打开的文件描述符个数
  -p    管道缓冲区尺寸
  -q    POSIX 信息队列的最大字节数
  -r    实时调度的最大优先级
  -s    最大栈尺寸
  -t    最大的CPU时间,以秒为单位
  -u    最大用户进程数
  -v    虚拟内存尺寸
  -x    最大的锁数量
  

这其中, 有我们关心的内容, 软资源和硬资源

什么是soft和hard

这里通过查阅资料整理如下:

  1. 理解soft和hard, 首先要想到是权限需求
  2. soft 必须小于hard
  3. hard严格的设定,必定不能超过这个设定的数值
  4. soft警告的设定,可以超过这个设定值,但是若超过则有警告信息
  5. 一个process可以修改当前process的soft或hard

ulimit的修改与生效

  1. ulimit的值总是继承父进程的设置。
  2. 可以利用ulimit设置当前shell进程
  3. 增加hard, 只能由root来

下面通过几个例子来践行上面的理论知识:

  1. 修改/etc/security/limits.conf
一条记录包含4️列,分别是范围domain(即生效的范围,可以是用户名、group名或*代表所有非root用户);t类型type:即soft、hard,或者-代表同时设置soft和hard;项目item,即ulimit中的资源控制项目,名字枚举可以参考文件中的注释;最后就是value。
* hard nofile 1000000 
* soft nofile 1000

Question: 修改后不重启当前shell, limits的修改是否生效
A: 否

  1. 当前shell配置ulimit
#!/bin/bash
ulimit -Sn 2045
nc -l 8888

然后ps -ef 查看nc的进程, 然后 cat /proc/{$pid}/limits

Max open files 2045 1000000 files

可以看到这里已经通过shell修改了soft nofile

修改系统最大的df数

除了ulimit控制外,/proc/sys/fs/file-max这个文件控制了系统内核可以打开的全部文件总数。所以,即便是ulimit里nofile设置为ulimited,也还是受限的。

系统级别: /proc/sys/fs/file-max

用户级别: /etc/security/limits.conf

单个进程: fs.nr_open

ulimit 常见命令

  • ulimit -a # 查看所有soft值
  • ulimit -Ha # 查看所有hard值
  • ulimit -Hn # 查看nofile的hard值
  • ulimit -Sn 1000 # 将nofile的soft值设置为1000
  • ulimit -n 1000 # 同时将nofiles的hard和soft值设置为1000

参考资料:
https://juejin.im/post/5d4cf32f6fb9a06b1d21312c
https://blog.csdn.net/liuleinevermore/article/details/83473919

love

前述

Go1.14已经发布有一阵子了,下面是我学习的过程,也是知识的整理.

下载地址

下载地址

详细说明

可查看本地安装之后的doc文件夹,里面包含了go1.14.html文件,直接在本地打开即可食用

哪些地方做了优化

工具层面

go module

1.Flags
-modfile=file,现在可以直接指定mod file了,不在是固定的项目目录下了

语言&&Runtime

聚合接口

在1.14之前,我们聚合接口的定义不能有相同签名和返回值的方法,但是1.14取消了这个限制.

type Foo interface {
    GetName() string
    SetFooName(string)
}

type Bar interface {
    GetName() string
    SetBarName() string
}

type FooBar interface {
    Foo
    Bar
}
defer零消耗

1.14之前,defer的runtime要维护堆栈,我们要压栈和出栈的性能损耗,但是现在在编译器层做了defer的优化

goroutine 基于信号的抢占式调度实现

经典的例子,设置p为1,这样规避并发,然后go协程初始化到队列等待runtime调度,然而for循环的存在,调度一直调度不到,然后现在1.14可以利用信号机制抢占for的循环资源了.

package main

import (
    "runtime"
)

func main() {
    runtime.GOMAXPROCS(1)

    go func() {
        panic("already call")
    }()

    for {
    }
}

思考的问题:抢占是否影响性能 ?

抢占通常是由于阻塞性系统调用引起的,例如磁盘io,cgo等,所以一定程度上很难形成协程阻塞调度.

计时器优化,内存分配器

这里主要学习 这篇文章

包层面

test包
func TestIt(t *testing.T) {
    t.Cleanup(func() {
        fmt.Println("clean up 一些事情")
    })
    
    t.Log("这是个log")
    time.Sleep(time.Second * 2)
}

go1.14 会在sleep之前就输出log日志.同时增加了Cleanup方法(这个方法很有用,在我们测试完一个用例后可以做一些善后处理,例如rollback db等)

hash/maphash

降低了hash冲突的可能性.

json

InputOffset()方法

func main() {
    const jsonStr = `
        {"Job":"java", "Name":"zhangsan"}
        {"Job":"php", "Name":"lisi"},
        {"Job":"python", "Name":"wangwu"}
`
    type Jober struct {
        Job, Name string
    }

    dec := json.NewDecoder(strings.NewReader(jsonStr))
    for {
        var m Jober
        if err := dec.Decode(&m); err == io.EOF {
            break
        } else if err != nil {
            log.Fatal(err)
        }
        fmt.Println(m)
        fmt.Println(dec.InputOffset())
    }

}

输出:

{java zhangsan}
36
{php lisi}
67
2020/03/11 21:39:57 invalid character ',' looking for beginning of value

这里我特意后面多加了',', 可以看的出来解析的时候加入InputOffset很有用

ioutil

TempDir 第二个参数,随机字符串模式,可以是一个更加定制化的模式

    ioutil.TempDir("/tmp/ss", "name-date-*-test")

这里的 * 可以在一个字符串模式的中间

log
logger = log.New(&buf, "logger", log.Lmsgprefix)

文件名和行数放在prefix的前面

reflect包

StructOf 可以设置私有字段了

func main() {
    typ := reflect.StructOf([]reflect.StructField{
        {
            Name: "Height",
            Type: reflect.TypeOf(float64(0)),
            Tag:  `json:"height"`,
        },
        {
            Name: "age",
            Type: reflect.TypeOf(int(0)),
            Tag:  `json:"age"`,
            PkgPath: "other/path",
        },
    })

    v := reflect.New(typ).Elem()
    v.Field(0).SetFloat(0.4)
    v.Field(1).SetInt(2)
    s := v.Addr().Interface()

    w := new(bytes.Buffer)
    if err := json.NewEncoder(w).Encode(s); err != nil {
        panic(err)
    }

    fmt.Printf("value: %+v\n", s)
    fmt.Printf("json:  %s", w.Bytes())

    r := bytes.NewReader([]byte(`{"height":1.5,"age":10}`))
    if err := json.NewDecoder(r).Decode(s); err != nil {
        panic(err)
    }
    fmt.Printf("value: %+v\n", s)

}

这里的age是一个不可导出的私有属性,但是我同样可以在structof中设置

runtime

Goexit()

func todo() {
    defer func() {
        fmt.Println(recover())
    }()
    defer panic("cancelled goexit")
    runtime.Goexit()
}

func main() {
    todo()
}

这里在1.13中,由于goexit退出当前协程而报错,但是此时会被recover住,但是1.14将会顺利报错.

strconv包

strconv包中的NumError类型现在支持is判断不同的类型,代码如下:

    str := "hello world"
    if _, err := strconv.ParseFloat(str, 64); err != nil {
        if errors.Is(err, strconv.ErrSyntax) {

        }
        if errors.Is(err, strconv.ErrRange) {

        }
    }