DialOptions

DialOptions 是最重要的一环,负责配置每一次 rpc 请求的时候的一应选择。


结构

先来看看这个的结构

链接

// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
	unaryInt  UnaryClientInterceptor
	streamInt StreamClientInterceptor

	chainUnaryInts  []UnaryClientInterceptor
	chainStreamInts []StreamClientInterceptor

	cp          Compressor
	dc          Decompressor
	bs          backoff.Strategy
	block       bool
	insecure    bool
	timeout     time.Duration
	scChan      <-chan ServiceConfig
	authority   string
	copts       transport.ConnectOptions
	callOptions []CallOption
	// This is used by v1 balancer dial option WithBalancer to support v1
	// balancer, and also by WithBalancerName dial option.
	balancerBuilder balancer.Builder
	// This is to support grpclb.
	resolverBuilder             resolver.Builder
	channelzParentID            int64
	disableServiceConfig        bool
	disableRetry                bool
	disableHealthCheck          bool
	healthCheckFunc             internal.HealthChecker
	minConnectTimeout           func() time.Duration
	defaultServiceConfig        *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.
	defaultServiceConfigRawJSON *string
}

由于命名非常规范,加上注释很容易看懂每一个 field 配置的哪一条属性。如果掠过看的 大概有 压缩解压器,超时阻塞设置,认证安全转发,负载均衡,服务持久化的信息存储 ,配置,心跳检测等。


其一应函数方法都是设置 其中字段的。


如何设置

这里是 grpc 设计较好的地方,通过函数设置,同时设有生成函数的函数。什么意思呢?首先结合图来理解,这也是整个 grpc 设置的精华部分 


grpc-setOperation.svg

这里的意思是 , DialOptions 是一个导出接口,实现函数是 apply 同时接受参数 dialOptions 来修改它。

而实际上,是使用 newFuncDialOption 函数包装一个 修改 dialOptions 的方法给 funcDialOption 结构体,在实际 Dial 调用的时候 是使用闭包 调用 funcDialOption 结构体的 apply 方法。

可以在这里看一下 Dial 方法的源码(Dial 调用的是 DialContext

起作用的就是 opt.apply()

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
	cc := &ClientConn{
		target:            target,
		csMgr:             &connectivityStateManager{},
		conns:             make(map[*addrConn]struct{}),
		dopts:             defaultDialOptions(),
		blockingpicker:    newPickerWrapper(),
		czData:            new(channelzData),
		firstResolveEvent: grpcsync.NewEvent(),
	}
	···
	for _, opt := range opts {
		opt.apply(&cc.dopts)
	}
    ···
}


这里的 options 可以说是 client 发起 rpc 请求的核心中转站。

另一个重要的接口,同时也集中在 dialOptions 结构体中初始化处理的是 

callOptions []CallOption 


CallOption

CallOption 是一个接口,定义在 rpc_util 包内


结构

// CallOption configures a Call before it starts or extracts information from
// a Call after it completes.
type CallOption interface {
	// before is called before the call is sent to any server.  If before
	// returns a non-nil error, the RPC fails with that error.
	before(*callInfo) error

	// after is called after the call has completed.  after cannot return an
	// error, so any failures should be reported via output parameters.
	after(*callInfo)
}


操作的是 callInfo 结构里的数据,其被包含在 dialOptions  结构体中,

即每一次 dial 的时候进行调用。


callInfo

同时它自身定义很有意思,操作的是 callInfo  结构体

// callInfo contains all related configuration and information about an RPC.
type callInfo struct {
	compressorType        string
	failFast              bool
	stream                ClientStream
	maxReceiveMessageSize *int
	maxSendMessageSize    *int
	creds                 credentials.PerRPCCredentials
	contentSubtype        string
	codec                 baseCodec
	maxRetryRPCBufferSize int
}

可以看到 callInfo 中字段用来表示 单次调用中独有的自定义选项如 压缩,流控,认证,编解码器等。


一个实现

简单看一个 CallOption 接口的实现

// Header returns a CallOptions that retrieves the header metadata
// for a unary RPC.
func Header(md *metadata.MD) CallOption {
	return HeaderCallOption{HeaderAddr: md}
}

// HeaderCallOption is a CallOption for collecting response header metadata.
// The metadata field will be populated *after* the RPC completes.
// This is an EXPERIMENTAL API.
type HeaderCallOption struct {
	HeaderAddr *metadata.MD
}

func (o HeaderCallOption) before(c *callInfo) error { return nil }
func (o HeaderCallOption) after(c *callInfo) {
	if c.stream != nil {
		*o.HeaderAddr, _ = c.stream.Header()
	}
}

重点看到,实际操作是在 before 和 after 方法中执行,它们会在 Client 发起请求的时候自动执行,顾名思义,一个在调用前执行,一个在调用后执行。


实现注意

这里可以看出,这里也是通过函数返回一个拥有这两个方法的结构体,注意这一个设计,可以作为你自己的 Option 设计的时候的参考。


两种方法

有两种方法让 Client 接受你的 CallOption 设置

  1. 在 Client 使用方法的时候直接作为 参数传递,将刚才所说的函数-返回一个实现了 CallOption 接口的结构体。
  2. 在 生成 Client 的时候就传递设置。具体如下
  1. 通过 dialOptions.go 中的 函数 grpc.WithDefaultCallOptions() 
  2. 这个函数会将 CallOption 设置到 dialOptions 中的字段 []CallOption 中。
// WithDefaultCallOptions returns a DialOption which sets the default
// CallOptions for calls over the connection.
func WithDefaultCallOptions(cos ...CallOption) DialOption {
	return newFuncDialOption(func(o *dialOptions) {
		o.callOptions = append(o.callOptions, cos...)
	})
}


有没有感觉有点不好理解?给你们一个实例

  1. 使用的第一种方法
response, err := myclient.MyCall(ctx, request, grpc.CallContentSubtype("mycodec"))
  1. 使用第二种方法
myclient := grpc.Dial(ctx, target, grpc.WithDefaultCallOptions(grpc.CallContentSubtype("mycodec")))

这里假设 我们设置了一个 mycodec 的译码器。马上下面解释它的设计。


值得注意的是, 我好像只提到了在 Client 调用时设置,callOption  只在客户端设置的情况是不是让大家感到困惑。

实际上 gRPC server 端会自动检测 callOption 的设置,并检测自己是否支持此项选择,如果不支持则会返回失败。也就是说,在 Server 端注册的所有 Codec 译码器之后,Client 直接使用相应的设置就好了。


Codec

在 gRPC 中 Codec 有两个接口定义,一个是 baseCodec 包含正常的 Marshal 和 Unmarshal 方法,另一个是拥有名字的 Codec 定义在 encoding 包内,这是由于在注册 registry 的时候会使用到这个方法。


接口

type Codec interface {
	// Marshal returns the wire format of v.
	Marshal(v interface{}) ([]byte, error)
	// Unmarshal parses the wire format into v.
	Unmarshal(data []byte, v interface{}) error
	// String returns the name of the Codec implementation.  This is unused by
	// gRPC.
	String() string
}

就是这个方法

// RegisterCodec registers the provided Codec for use with all gRPC clients and
// servers.
//
// The Codec will be stored and looked up by result of its Name() method, which
// should match the content-subtype of the encoding handled by the Codec.  This
// is case-insensitive, and is stored and looked up as lowercase.  If the
// result of calling Name() is an empty string, RegisterCodec will panic. See
// Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
// more details.
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe.  If multiple Compressors are
// registered with the same name, the one registered last will take effect.
func RegisterCodec(codec Codec) {
	if codec == nil {
		panic("cannot register a nil Codec")
	}
	if codec.Name() == "" {
		panic("cannot register Codec with empty string result for Name()")
	}
	contentSubtype := strings.ToLower(codec.Name())
	registeredCodecs[contentSubtype] = codec
}


Compressor

同时 encoding 包中还定义了 Compressor 接口,参照 Codec 理解即可。

// Compressor is used for compressing and decompressing when sending or
// receiving messages.
type Compressor interface {
	// Compress writes the data written to wc to w after compressing it.  If an
	// error occurs while initializing the compressor, that error is returned
	// instead.
	Compress(w io.Writer) (io.WriteCloser, error)
	// Decompress reads data from r, decompresses it, and provides the
	// uncompressed data via the returned io.Reader.  If an error occurs while
	// initializing the decompressor, that error is returned instead.
	Decompress(r io.Reader) (io.Reader, error)
	// Name is the name of the compression codec and is used to set the content
	// coding header.  The result must be static; the result cannot change
	// between calls.
	Name() string
}


MetaData 

这个包对应 context 中的 Value field 也就是 key-value 形式的存储


在其他包中简写是 MD


结构

type MD map[string][]string


函数

实现了完善的存储功能,从单一读写到批量(采用 pair 模式,...string 作为参数,len(string)%2==1 时会报错,由于会有孤立的没有配对的元信息。


另外几个函数是实现了从 context 中的读取和写入(这里的写入是 使用 context.WithValue 方法,即生成 parent context 的 copy。


注意⚠️


Refer

gRPC 官方也有 MetaData 的解释