当前位置: 首页 > news >正文

seo关键词选择及优化/seo优化宣传

seo关键词选择及优化,seo优化宣传,淘客网站模版,广州h5页面设计go语言grpc之server端源码分析二DialContextparseTargetAndFindResolvergetResolvernewCCResolverWrapperccResolverWrapper.UpdateStatecc.maybeApplyDefaultServiceConfigccBalancerWrapper.updateClientConnState上一篇文章分析了ClientConn的主要结构体成员,然后…

go语言grpc之server端源码分析二

  • DialContext
    • parseTargetAndFindResolver
      • getResolver
    • newCCResolverWrapper
      • ccResolverWrapper.UpdateState
        • cc.maybeApplyDefaultServiceConfig
        • ccBalancerWrapper.updateClientConnState

上一篇文章分析了ClientConn的主要结构体成员,然后接下来看一下对应的实现也就是DialContext方法。

DialContext

// DialContext creates a client connection to the given target. By default, it's
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
// dial, use WithBlock() dial option.
//
// In the non-blocking case, the ctx does not act against the connection. It
// only controls the setup steps.
//
// In the blocking case, ctx can be used to cancel or expire the pending
// connection. Once this function returns, the cancellation and expiration of
// ctx will be noop. Users should call ClientConn.Close to terminate all the
// pending operations after this function returns.
//
// The target name syntax is defined in
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {cc := &ClientConn{target:            target,csMgr:             &connectivityStateManager{},conns:             make(map[*addrConn]struct{}),dopts:             defaultDialOptions(),blockingpicker:    newPickerWrapper(),czData:            new(channelzData),firstResolveEvent: grpcsync.NewEvent(),}// 初始化ctxcc.ctx, cc.cancel = context.WithCancel(context.Background())// 加载额外的配置for _, opt := range opts {opt.apply(&cc.dopts)}// 将额外的配置串起来chainUnaryClientInterceptors(cc)chainStreamClientInterceptors(cc)defer func() {if err != nil {cc.Close()}}() // 删掉一些用不到的ssl的配置// 设置cc.dopts.copts.UserAgent 为 "grpc-go/1.45.0"if cc.dopts.copts.UserAgent != "" {cc.dopts.copts.UserAgent += " " + grpcUA} else {cc.dopts.copts.UserAgent = grpcUA}// 获取使用的resolverBuilderresolverBuilder, err := cc.parseTargetAndFindResolver()if err != nil {return nil, err}// 获取authority 这里是 localtion:8002cc.authority, err = determineAuthority(cc.parsedTarget.Endpoint, cc.target, cc.dopts)if err != nil {return nil, err}// 初始化balancercc.balancerBuildOpts = balancer.BuildOptions{DialCreds:        credsClone,CredsBundle:      cc.dopts.copts.CredsBundle,Dialer:           cc.dopts.copts.Dialer,Authority:        cc.authority,CustomUserAgent:  cc.dopts.copts.UserAgent,ChannelzParentID: cc.channelzID,Target:           cc.parsedTarget,}// 对resolverBuilder增加覆盖初始化rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)if err != nil {return nil, fmt.Errorf("failed to build resolver: %v", err)}cc.mu.Lock()cc.resolverWrapper = rWrappercc.mu.Unlock()return cc, nil
}

上面的是在删除了很多不用的代码精简后的结果,然后看一下这个主要是下面的两个方法。

  • parseTargetAndFindResolver
  • newCCResolverWrapper

parseTargetAndFindResolver

首先看一下代码的实现

func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {channelz.Infof(logger, cc.channelzID, "original dial target is: %q", cc.target)// 接下targetvar rb resolver.BuilderparsedTarget, err := parseTarget(cc.target)if err != nil {channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err)} else {// 根据scheme获取Resolver 如果存在那么就直接返回rb = cc.getResolver(parsedTarget.Scheme)if rb != nil {cc.parsedTarget = parsedTargetreturn rb, nil}}// 如果没有对应的resolver那么就使用 passthrough 对应的resolver//获取默认的scheme 这里就是passthroughdefScheme := resolver.GetDefaultScheme()channelz.Infof(logger, cc.channelzID, "fallback to scheme %q", defScheme)// 这里的canonicalTarget就是passthrough:///localhost:8002canonicalTarget := defScheme + ":///" + cc.target// 根据canonicalTarget 去解析目标parsedTarget, err = parseTarget(canonicalTarget)if err != nil {return nil, err}// 回去的就是passthrough对应的Resolverrb = cc.getResolver(parsedTarget.Scheme)if rb == nil {return nil, fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.Scheme)}// 添加到parsedTarget中去cc.parsedTarget = parsedTargetreturn rb, nil
}

然后看一下parseTarget这个,其实就是如果我们传入的scheme://host:port.然后解析到Target结构体,我们在上一篇文章也说了,也就是

type Target struct {// Deprecated: use URL.Scheme instead.Scheme string// Deprecated: use URL.Host instead.Authority string// Deprecated: use URL.Path or URL.Opaque instead. The latter is set when// the former is empty.Endpoint string// URL contains the parsed dial target with an optional default scheme added// to it if the original dial target contained no scheme or contained an// unregistered scheme. Any query params specified in the original dial// target can be accessed from here.URL url.URL
}

getResolver

然后就是根据getResolver获取对应的Builder。先看一下对应的方法

func (cc *ClientConn) getResolver(scheme string) resolver.Builder {for _, rb := range cc.dopts.resolvers {if scheme == rb.Scheme() {return rb}}return resolver.Get(scheme)
}

然后Build的实现是

// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {// Build creates a new resolver for the given target.//// gRPC dial calls Build synchronously, and fails if the returned error is// not nil.Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)// Scheme returns the scheme supported by this resolver.// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.Scheme() string
}

而这里的ClientConn是一个interface,实现是

// ClientConn contains the callbacks for resolver to notify any updates
// to the gRPC ClientConn.
//
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type ClientConn interface {// UpdateState updates the state of the ClientConn appropriately.UpdateState(State) error// ReportError notifies the ClientConn that the Resolver encountered an// error.  The ClientConn will notify the load balancer and begin calling// ResolveNow on the Resolver with exponential backoff.ReportError(error)// NewAddress is called by resolver to notify ClientConn a new list// of resolved addresses.// The address list should be the complete list of resolved addresses.//// Deprecated: Use UpdateState instead.NewAddress(addresses []Address)// NewServiceConfig is called by resolver to notify ClientConn a new// service config. The service config should be provided as a json string.//// Deprecated: Use UpdateState instead.NewServiceConfig(serviceConfig string)// ParseServiceConfig parses the provided service config and returns an// object that provides the parsed config.ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult
}

然后 Resolver的实现前面提到过,这里是


// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {// ResolveNow will be called by gRPC to try to resolve the target name// again. It's just a hint, resolver can ignore this if it's not necessary.//// It could be called multiple times concurrently.ResolveNow(ResolveNowOptions)// Close closes the resolver.Close()
}

然后 这里

func Register(b Builder) {m[b.Scheme()] = b
}
var (// m is a map from scheme to resolver builder.m = make(map[string]Builder)// defaultScheme is the default scheme to use.defaultScheme = "passthrough"
)

可以看到这里默认的Scheme 就是passthrough。然后看一下在哪里将passthrough的resolver进行注册。

const scheme = "passthrough"type passthroughBuilder struct{}func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {r := &passthroughResolver{target: target,cc:     cc,}r.start()return r, nil
}func (*passthroughBuilder) Scheme() string {return scheme
}type passthroughResolver struct {target resolver.Targetcc     resolver.ClientConn
}func (r *passthroughResolver) start() {r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})
}func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOptions) {}func (*passthroughResolver) Close() {}func init() {resolver.Register(&passthroughBuilder{})
}

可以看出来这里是利用了init方法注册了,然后返回的是passthroughBuilder这个方法.

所以这里parseTargetAndFindResolver方法也就说完了,返回的就是passthroughBuilder结构体,这个里面的build方法我们放到后面再说。

newCCResolverWrapper

然后这个方法有两个参数,第一个是cc也就是在DialContext方法刚开始就初始的ClientConn,然后就是resolverBuilder,也就是passthroughBuilder。然后看一下这个方法的实现。

// newCCResolverWrapper uses the resolver.Builder to build a Resolver and
// returns a ccResolverWrapper object which wraps the newly built resolver.
func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {// 初始ccResolverWrapperccr := &ccResolverWrapper{cc:   cc,done: grpcsync.NewEvent(),}var credsClone credentials.TransportCredentialsif creds := cc.dopts.copts.TransportCredentials; creds != nil {credsClone = creds.Clone()}rbo := resolver.BuildOptions{DisableServiceConfig: cc.dopts.disableServiceConfig,DialCreds:            credsClone,CredsBundle:          cc.dopts.copts.CredsBundle,Dialer:               cc.dopts.copts.Dialer,}var err error// We need to hold the lock here while we assign to the ccr.resolver field// to guard against a data race caused by the following code path,// rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up// accessing ccr.resolver which is being assigned here.ccr.resolverMu.Lock()defer ccr.resolverMu.Unlock()// 调用传入的resolver.Builder的build方法ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)if err != nil {return nil, err}return ccr, nil
}

然后这个方法的逻辑就是初始化ccResolverWrapper,然后就是调用resolver.Builder的build获取resolver,放入到ccResolverWrapper,然后在把ccResolverWrapper返回。
因为这里的rb其实就是passthroughResolver,然后看一下这个的build方法。
根据上面的源码passthroughResolver的Build其实就是初始化passthroughResolver,然后调用传入的ccr的UpdateState方法。参数就是resolver.State。


func (r *passthroughResolver) start() {r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})
}

因为这里cc就是ccr也就是ccResolverWrapper.所以看一下ccResolverWrapper的UpdateState方法。

ccResolverWrapper.UpdateState

先看一下源码的实现

func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {ccr.incomingMu.Lock()defer ccr.incomingMu.Unlock()ccr.curState = sif err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {return balancer.ErrBadResolverState}return nil
}

这里的ccr.cc就是ClientConn,也就是一开始在DialContext中初始化,然后看一下ClientConn的updateResolverState方法。

func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {defer cc.firstResolveEvent.Fire()cc.mu.Lock()// Check if the ClientConn is already closed. Some fields (e.g.// balancerWrapper) are set to nil when closing the ClientConn, and could// cause nil pointer panic if we don't have this check.if cc.conns == nil {cc.mu.Unlock()return nil}// 删除err不为nil的逻辑var ret errorif cc.dopts.disableServiceConfig {channelz.Infof(logger, cc.channelzID, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig)cc.maybeApplyDefaultServiceConfig(s.Addresses)} else if s.ServiceConfig == nil {cc.maybeApplyDefaultServiceConfig(s.Addresses)// TODO: do we need to apply a failing LB policy if there is no// default, per the error handling design?}    cc.blockingpicker.updatePicker(base.NewErrPicker(err))cc.csMgr.updateState(connectivity.TransientFailure)cc.mu.Unlock()return ret}}}var balCfg serviceconfig.LoadBalancingConfigif cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {balCfg = cc.sc.lbConfig.cfg}cbn := cc.curBalancerNamebw := cc.balancerWrappercc.mu.Unlock()if cbn != grpclbName {// Filter any grpclb addresses since we don't have the grpclb balancer.for i := 0; i < len(s.Addresses); {if s.Addresses[i].Type == resolver.GRPCLB {copy(s.Addresses[i:], s.Addresses[i+1:])s.Addresses = s.Addresses[:len(s.Addresses)-1]continue}i++}}uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})if ret == nil {ret = uccsErr // prefer ErrBadResolver state since any other error is// currently meaningless to the caller.}return ret
}

上面的代码很多其实也就是两个方法。
第一个是当 s.ServiceConfig == nil 的时候调用cc.maybeApplyDefaultServiceConfig(s.Addresses).
第一个也就是
bw := cc.balancerWrapper
uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})

然后接下来看一下这两个方法

cc.maybeApplyDefaultServiceConfig

这里的address就是在初始化的时候传入的 localhost:8002.
然后看一下这个方法的实现

func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {if sc == nil {// should never reach here.return}cc.sc = sc// 生成balancerBuilderif cc.dopts.balancerBuilder == nil {// Only look at balancer types and switch balancer if balancer dial// option is not set.var newBalancerName stringif cc.sc != nil && cc.sc.lbConfig != nil {newBalancerName = cc.sc.lbConfig.name} else {var isGRPCLB boolfor _, a := range addrs {if a.Type == resolver.GRPCLB {isGRPCLB = truebreak}}if isGRPCLB {newBalancerName = grpclbName} else if cc.sc != nil && cc.sc.LB != nil {newBalancerName = *cc.sc.LB} else {newBalancerName = PickFirstBalancerName}}// 生成PickFirstBalancerName的balancecc.switchBalancer(newBalancerName)} else if cc.balancerWrapper == nil {// Balancer dial option was set, and this is the first time handling// resolved addresses. Build a balancer with dopts.balancerBuilder.cc.curBalancerName = cc.dopts.balancerBuilder.Name()cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)}
}

因为cc.dopts.balancerBuilder 这里为nil,同时newBalancerName为else中的逻辑,也就是PickFirstBalancerName也就是pick_first。
所以这个方法的逻辑也就是cc.switchBalancer(“pick_first”)。

// Caller must hold cc.mu.
func (cc *ClientConn) switchBalancer(name string) {if strings.EqualFold(cc.curBalancerName, name) {return}channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)if cc.dopts.balancerBuilder != nil {channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")return}if cc.balancerWrapper != nil {// Don't hold cc.mu while closing the balancers. The balancers may call// methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex// would cause a deadlock in that case.cc.mu.Unlock()cc.balancerWrapper.close()cc.mu.Lock()}builder := balancer.Get(name)if builder == nil {channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)builder = newPickfirstBuilder()} else {channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name)}cc.curBalancerName = builder.Name()cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
}

看一下这里的pick_first的实现

// PickFirstBalancerName is the name of the pick_first balancer.
const PickFirstBalancerName = "pick_first"func newPickfirstBuilder() balancer.Builder {return &pickfirstBuilder{}
}type pickfirstBuilder struct{}func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {return &pickfirstBalancer{cc: cc}
}

然后看一下newCCBalancerWrapper这个方法

func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {ccb := &ccBalancerWrapper{cc:       cc,updateCh: buffer.NewUnbounded(),closed:   grpcsync.NewEvent(),done:     grpcsync.NewEvent(),subConns: make(map[*acBalancerWrapper]struct{}),}go ccb.watcher()ccb.balancer = b.Build(ccb, bopts)_, ccb.hasExitIdle = ccb.balancer.(balancer.ExitIdler)return ccb
}

这里是初始化ccBalancerWrapper这个结构体,然后调用build实例化balancer。
然后看一下Build这个方法,也就是pickfirstBuilder。这里的build其实就是返回了pickfirstBalancer这个结构体,看一下实现

type pickfirstBalancer struct {state connectivity.Statecc    balancer.ClientConnsc    balancer.SubConn
}

然后看一下ccb.watcher方法,也就是

// watcher balancer functions sequentially, so the balancer can be implemented
// lock-free.
func (ccb *ccBalancerWrapper) watcher() {for {select {case t := <-ccb.updateCh.Get():ccb.updateCh.Load()if ccb.closed.HasFired() {break}switch u := t.(type) {case *scStateUpdate:ccb.balancerMu.Lock()ccb.balancer.UpdateSubConnState(u.sc, balancer.SubConnState{ConnectivityState: u.state, ConnectionError: u.err})ccb.balancerMu.Unlock()case *acBalancerWrapper:ccb.mu.Lock()if ccb.subConns != nil {delete(ccb.subConns, u)ccb.cc.removeAddrConn(u.getAddrConn(), errConnDrain)}ccb.mu.Unlock()case exitIdle:if ccb.cc.GetState() == connectivity.Idle {if ei, ok := ccb.balancer.(balancer.ExitIdler); ok {// We already checked that the balancer implements// ExitIdle before pushing the event to updateCh, but// check conditionally again as defensive programming.ccb.balancerMu.Lock()ei.ExitIdle()ccb.balancerMu.Unlock()}}default:logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t)}case <-ccb.closed.Done():}if ccb.closed.HasFired() {ccb.balancerMu.Lock()ccb.balancer.Close()ccb.balancerMu.Unlock()ccb.mu.Lock()scs := ccb.subConnsccb.subConns = nilccb.mu.Unlock()ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil})ccb.done.Fire()// Fire done before removing the addr conns.  We can safely unblock// ccb.close and allow the removeAddrConns to happen// asynchronously.for acbw := range scs {ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)}return}}
}

这里主要是scStateUpdate这个case,这里可以看出来当状态有更新的时候,会调用对应balance的UpdateSubConnState方法,在这里的实现是pickfirst是

func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {if logger.V(2) {logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", sc, s)}if b.sc != sc {if logger.V(2) {logger.Infof("pickfirstBalancer: ignored state change because sc is not recognized")}return}b.state = s.ConnectivityStateif s.ConnectivityState == connectivity.Shutdown {b.sc = nilreturn}switch s.ConnectivityState {case connectivity.Ready:b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{result: balancer.PickResult{SubConn: sc}}})case connectivity.Connecting:b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable}})case connectivity.Idle:b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &idlePicker{sc: sc}})case connectivity.TransientFailure:b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState,Picker:            &picker{err: s.ConnectionError},})}
}

注意这里的cc是ccBalancerWrapper,所以也就是调用ccBalancerWrapper的UpdateState方法,也就是

func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {ccb.mu.Lock()defer ccb.mu.Unlock()if ccb.subConns == nil {return}// Update picker before updating state.  Even though the ordering here does// not matter, it can lead to multiple calls of Pick in the common start-up// case where we wait for ready and then perform an RPC.  If the picker is// updated later, we could call the "connecting" picker when the state is// updated, and then call the "ready" picker after the picker gets updated.ccb.cc.blockingpicker.updatePicker(s.Picker)ccb.cc.csMgr.updateState(s.ConnectivityState)
}

然后blockingpicker的updatePicker是

// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
func (pw *pickerWrapper) updatePicker(p balancer.Picker) {pw.mu.Lock()if pw.done {pw.mu.Unlock()return}pw.picker = p// pw.blockingCh should never be nil.close(pw.blockingCh)pw.blockingCh = make(chan struct{})pw.mu.Unlock()
}

其实就是更新pickerWrapper,并且通知通过close通知picker有更新。
然后调用updateState也就是csMgr

// updateState updates the connectivity.State of ClientConn.
// If there's a change it notifies goroutines waiting on state change to
// happen.
func (csm *connectivityStateManager) updateState(state connectivity.State) {csm.mu.Lock()defer csm.mu.Unlock()if csm.state == connectivity.Shutdown {return}if csm.state == state {return}csm.state = statechannelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)if csm.notifyChan != nil {// There are other goroutines waiting on this channel.close(csm.notifyChan)csm.notifyChan = nil}
}

这里的就是更新ClientConn中的connectivityStateManager的状态。

ccBalancerWrapper.updateClientConnState

上面的cc.applyServiceConfigAndBalancer说完了,然后就是updateClientConnState方法,

func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {ccb.balancerMu.Lock()defer ccb.balancerMu.Unlock()return ccb.balancer.UpdateClientConnState(*ccs)
}

然后就是调用balancer的UpdateClientConnState。注意这里的balancer还是pickfirst。看一下实现

func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) error {if len(cs.ResolverState.Addresses) == 0 {b.ResolverError(errors.New("produced zero addresses"))return balancer.ErrBadResolverState}if b.sc == nil {var err errorb.sc, err = b.cc.NewSubConn(cs.ResolverState.Addresses, balancer.NewSubConnOptions{})if err != nil {if logger.V(2) {logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)}b.state = connectivity.TransientFailureb.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure,Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)},})return balancer.ErrBadResolverState}b.state = connectivity.Idleb.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &picker{result: balancer.PickResult{SubConn: b.sc}}})b.sc.Connect()} else {b.cc.UpdateAddresses(b.sc, cs.ResolverState.Addresses)b.sc.Connect()}return nil
}

这里的sc就是subConn。然后就是调用cc的NewSubConn,也就是ccBalancerWrapper的NewSubConn。然后看一下实现

func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {if len(addrs) <= 0 {return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")}ccb.mu.Lock()defer ccb.mu.Unlock()if ccb.subConns == nil {return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")}ac, err := ccb.cc.newAddrConn(addrs, opts)if err != nil {return nil, err}acbw := &acBalancerWrapper{ac: ac}acbw.ac.mu.Lock()ac.acbw = acbwacbw.ac.mu.Unlock()ccb.subConns[acbw] = struct{}{}return acbw, nil
}

然后就是ac的实现,也就是

// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
//
// Caller needs to make sure len(addrs) > 0.
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {ac := &addrConn{state:        connectivity.Idle,cc:           cc,addrs:        addrs,scopts:       opts,dopts:        cc.dopts,czData:       new(channelzData),resetBackoff: make(chan struct{}),}ac.ctx, ac.cancel = context.WithCancel(cc.ctx)// Track ac in cc. This needs to be done before any getTransport(...) is called.cc.mu.Lock()if cc.conns == nil {cc.mu.Unlock()return nil, ErrClientConnClosing}if channelz.IsOn() {ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{Desc:     "Subchannel Created",Severity: channelz.CtInfo,Parent: &channelz.TraceEventDesc{Desc:     fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),Severity: channelz.CtInfo,},})}cc.conns[ac] = struct{}{}cc.mu.Unlock()return ac, nil
}

所以就是返回了acBalancerWrapper这个结构体,然后看一下ccBalancerWrapper的UpdateState方法,这个上面说过,主要是更新
ccb.cc.blockingpicker.updatePicker(s.Picker)
ccb.cc.csMgr.updateState(s.ConnectivityState)
这两个方法。
然后就是connect‘,其实就是根据地址去真正的连接后端的地址。

然后看一下acBalancerWrapper的connect方法,也就是

func (acbw *acBalancerWrapper) Connect() {acbw.mu.Lock()defer acbw.mu.Unlock()go acbw.ac.connect()
}

然后看一下addrConn的connect实现,

// connect starts creating a transport.
// It does nothing if the ac is not IDLE.
// TODO(bar) Move this to the addrConn section.
func (ac *addrConn) connect() error {ac.mu.Lock()if ac.state == connectivity.Shutdown {ac.mu.Unlock()return errConnClosing}if ac.state != connectivity.Idle {ac.mu.Unlock()return nil}// Update connectivity state within the lock to prevent subsequent or// concurrent calls from resetting the transport more than once.// 更新ac的状态ac.updateConnectivityState(connectivity.Connecting, nil)ac.mu.Unlock()// 更新地址ac.resetTransport()return nil
}

然后就是 ac.tryAllAddrs方法,然后就是在调用ac.createTransport。最后调用transport.NewClientTransport方法。看一下实现

// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {return newHTTP2Client(connectCtx, ctx, addr, opts, onPrefaceReceipt, onGoAway, onClose)
}

到这里就是http2的逻辑了,然后看一下newHTTP2Client的实现

// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {scheme := "http"ctx, cancel := context.WithCancel(ctx)defer func() {if err != nil {cancel()}}()// gRPC, resolver, balancer etc. can specify arbitrary data in the// Attributes field of resolver.Address, which is shoved into connectCtx// and passed to the dialer and credential handshaker. This makes it possible for// address specific arbitrary data to reach custom dialers and credential handshakers.connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)if err != nil {if opts.FailOnNonTempDialError {return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)}return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)}// Any further errors will close the underlying connectiondefer func(conn net.Conn) {if err != nil {conn.Close()}}(conn)kp := opts.KeepaliveParams// Validate keepalive parameters.if kp.Time == 0 {kp.Time = defaultClientKeepaliveTime}if kp.Timeout == 0 {kp.Timeout = defaultClientKeepaliveTimeout}keepaliveEnabled := falseif kp.Time != infinity {if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)}keepaliveEnabled = true}var (isSecure boolauthInfo credentials.AuthInfo)transportCreds := opts.TransportCredentialsperRPCCreds := opts.PerRPCCredentialsif b := opts.CredsBundle; b != nil {if t := b.TransportCredentials(); t != nil {transportCreds = t}if t := b.PerRPCCredentials(); t != nil {perRPCCreds = append(perRPCCreds, t)}}if transportCreds != nil {rawConn := conn// Pull the deadline from the connectCtx, which will be used for// timeouts in the authentication protocol handshake. Can ignore the// boolean as the deadline will return the zero value, which will make// the conn not timeout on I/O operations.deadline, _ := connectCtx.Deadline()rawConn.SetDeadline(deadline)conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, rawConn)rawConn.SetDeadline(time.Time{})if err != nil {return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)}for _, cd := range perRPCCreds {if cd.RequireTransportSecurity() {if ci, ok := authInfo.(interface {GetCommonAuthInfo() credentials.CommonAuthInfo}); ok {secLevel := ci.GetCommonAuthInfo().SecurityLevelif secLevel != credentials.InvalidSecurityLevel && secLevel < credentials.PrivacyAndIntegrity {return nil, connectionErrorf(true, nil, "transport: cannot send secure credentials on an insecure connection")}}}}isSecure = trueif transportCreds.Info().SecurityProtocol == "tls" {scheme = "https"}}dynamicWindow := trueicwz := int32(initialWindowSize)if opts.InitialConnWindowSize >= defaultWindowSize {icwz = opts.InitialConnWindowSizedynamicWindow = false}writeBufSize := opts.WriteBufferSizereadBufSize := opts.ReadBufferSizemaxHeaderListSize := defaultClientMaxHeaderListSizeif opts.MaxHeaderListSize != nil {maxHeaderListSize = *opts.MaxHeaderListSize}t := &http2Client{ctx:                   ctx,ctxDone:               ctx.Done(), // Cache Done chan.cancel:                cancel,userAgent:             opts.UserAgent,conn:                  conn,remoteAddr:            conn.RemoteAddr(),localAddr:             conn.LocalAddr(),authInfo:              authInfo,readerDone:            make(chan struct{}),writerDone:            make(chan struct{}),goAway:                make(chan struct{}),framer:                newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),fc:                    &trInFlow{limit: uint32(icwz)},scheme:                scheme,activeStreams:         make(map[uint32]*Stream),isSecure:              isSecure,perRPCCreds:           perRPCCreds,kp:                    kp,statsHandler:          opts.StatsHandler,initialWindowSize:     initialWindowSize,onPrefaceReceipt:      onPrefaceReceipt,nextID:                1,maxConcurrentStreams:  defaultMaxStreamsClient,streamQuota:           defaultMaxStreamsClient,streamsQuotaAvailable: make(chan struct{}, 1),czData:                new(channelzData),onGoAway:              onGoAway,onClose:               onClose,keepaliveEnabled:      keepaliveEnabled,bufferPool:            newBufferPool(),}if md, ok := addr.Metadata.(*metadata.MD); ok {t.md = *md} else if md := imetadata.Get(addr); md != nil {t.md = md}t.controlBuf = newControlBuffer(t.ctxDone)if opts.InitialWindowSize >= defaultWindowSize {t.initialWindowSize = opts.InitialWindowSizedynamicWindow = false}if dynamicWindow {t.bdpEst = &bdpEstimator{bdp:               initialWindowSize,updateFlowControl: t.updateFlowControl,}}if t.statsHandler != nil {t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{RemoteAddr: t.remoteAddr,LocalAddr:  t.localAddr,})connBegin := &stats.ConnBegin{Client: true,}t.statsHandler.HandleConn(t.ctx, connBegin)}if channelz.IsOn() {t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))}if t.keepaliveEnabled {t.kpDormancyCond = sync.NewCond(&t.mu)go t.keepalive()}// Start the reader goroutine for incoming message. Each transport has// a dedicated goroutine which reads HTTP2 frame from network. Then it// dispatches the frame to the corresponding stream entity.go t.reader()// Send connection preface to server.n, err := t.conn.Write(clientPreface)if err != nil {err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)t.Close(err)return nil, err}if n != len(clientPreface) {err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))t.Close(err)return nil, err}var ss []http2.Settingif t.initialWindowSize != defaultWindowSize {ss = append(ss, http2.Setting{ID:  http2.SettingInitialWindowSize,Val: uint32(t.initialWindowSize),})}if opts.MaxHeaderListSize != nil {ss = append(ss, http2.Setting{ID:  http2.SettingMaxHeaderListSize,Val: *opts.MaxHeaderListSize,})}err = t.framer.fr.WriteSettings(ss...)if err != nil {err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)t.Close(err)return nil, err}// Adjust the connection flow control window if needed.if delta := uint32(icwz - defaultWindowSize); delta > 0 {if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)t.Close(err)return nil, err}}t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)if err := t.framer.writer.Flush(); err != nil {return nil, err}go func() {t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)err := t.loopy.run()if err != nil {if logger.V(logLevel) {logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)}}// Do not close the transport.  Let reader goroutine handle it since// there might be data in the buffers.t.conn.Close()t.controlBuf.finish()close(t.writerDone)}()return t, nil
}

可以看出来这里的http2的实现和之前是大同小异的,这里就不多描述了。
接下来化了一个流程图来进行帮助记忆。
在这里插入图片描述

相关文章:

【go语言grpc之client端源码分析二】

go语言grpc之server端源码分析二DialContextparseTargetAndFindResolvergetResolvernewCCResolverWrapperccResolverWrapper.UpdateStatecc.maybeApplyDefaultServiceConfigccBalancerWrapper.updateClientConnState上一篇文章分析了ClientConn的主要结构体成员&#xff0c;然后…...

centos7安装RabbitMQ

1、查看本机基本信息 查看Linux发行版本 uname -a # Linux VM-0-8-centos 3.10.0-1160.11.1.el7.x86_64 #1 SMP Fri Dec 18 16:34:56 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux cat /etc/redhat-release # CentOS Linux release 7.9.2009 (Core)2、创建创建工作目录 mkdir /…...

node基于springboot 口腔卫生防护口腔牙科诊所管理系统

目录 1 绪论 1 1.1课题背景 1 1.2课题研究现状 1 1.3初步设计方法与实施方案 2 1.4本文研究内容 2 2 系统开发环境 4 2.1 JAVA简介 4 2.2MyEclipse环境配置 4 2.3 B/S结构简介 4 2.4MySQL数据库 5 2.5 SPRINGBOOT框架 5 3 系统分析 6 3.1系统可行性分析 6 3.1.1经济可行性 6 3.…...

Linux常用命令之find命令详解

简介 find命令主要用于&#xff1a;用来在指定目录下查找文件。任何位于参数之前的字符串都将被视为欲查找的目录名。 如果使用该命令时&#xff0c;不设置任何参数&#xff0c;则find命令将在当前目录下查找子目录与文件。并且将查找到的子目录和文件全部进行显示。 是我们在…...

CMake 入门学习4 软件包管理

CMake 入门学习4 软件包管理一、Linux下的软件包管理1. 检索已安装的软件包2. 让自己编译软件支持pkg-config搜索3. 在CMakeLists查找已安装的软件包二、适合Windows下的包管理工具1. vcpkg2. Conan(1) 安装Conan(2) 配置Conan(3) 创建工程(4) 安装依赖库(5) 使用依赖库三、CMa…...

【数据库数据乱码错误】存进去的数据乱码(???)

目录 1.当我新增一条数据的时候&#xff0c;成功后查看数据库中的数据时&#xff0c;竟然变成&#xff1f;&#xff1f;&#xff1f;乱码格式了&#xff1a; 2.那么问题有3处需要注意&#xff1a; 第一&#xff1a;settings配置 第二&#xff1a;POM文件 第三&#xff1a;…...

rewrite中的if、break、last

目录 rewrite 作用&#xff1a; 依赖&#xff1a; 打开重定向日志&#xff1a; if 判断&#xff1a; location {} 本身有反复匹配执行特征 在 location 中加入 break 和 last &#xff08;不一样&#xff09; 加了break后&#xff0c;立刻停止向下 且 跳出。 加了last&#xf…...

JavaSE-线程池(5)- 建议使用的方式

JavaSE-线程池&#xff08;5&#xff09;- 建议使用的方式 虽然JDK Executors 工具类提供了默认的创建线程池的方法&#xff0c;但一般建议自定义线程池参数&#xff0c;下面是阿里巴巴开发手册给出的理由&#xff1a; 另外Spring也提供了线程池的实现&#xff0c;比如 Thread…...

城市轨道交通供电系统研究(Matlab代码实现)

&#x1f468;‍&#x1f393;个人主页&#xff1a;研学社的博客&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5;&#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密…...

什么是 RESTful 风格?

一、什么是 REST &#xff1f; REST即表述性状态传递&#xff08;英文&#xff1a;Representational State Transfer&#xff0c;简称REST&#xff09;是Roy Thomas Fielding博士在2000年他的博士论文中提出来的一种软件架构风格。它是一种针对网络应用的设计和开发方式&#…...

从业6年,对敏捷和自动化测试的一点心得

不久前&#xff0c;参加Thoughtworks组织的一场自动化测试的分享&#xff0c;同事由于出差国外不能参加&#xff0c;特意嘱托我提问两个问题&#xff1a; 在互联网这个将“敏捷”与“持续集成”进行积极实践的环境里&#xff0c;“敏捷测试”与“自动化测试”成了一个大家经常…...

ThreeJS 之界面控制

文章目录参考描述界面自适应问题resize 事件修改画布大小修改视锥体的宽高比全屏显示dblclick 事件检测全屏显示状态进入全屏显示状态退出全屏显示状态尾声参考 项目描述ThreeJS官方文档哔哩哔哩老陈打码搜索引擎BingMDN 文档document.mozFullScreenElementMDN 文档Element.re…...

【查找算法】解析学习四大常用的计算机查找算法 | C++

第二十二章 四大查找算法 目录 第二十二章 四大查找算法 ●前言 ●查找算法 ●一、顺序查找法 1.什么是顺序查找法&#xff1f; 2.案例实现 ●二、二分查找法 1.什么是二分查找法&#xff1f; 2.案例实现 ●三、插值查找法 1.什么是插值查找法&#xff1f; 2…...

Android实例仿真之一

目录 零 开局三问 第一问&#xff1a;为什么要有这一章&#xff1f; 第二问&#xff1a;Android算不算是一个嵌入式系统&#xff1f; 第三问&#xff1a;用什么方法来分析Android这个大系统&#xff1f; 一 讨论Android的流行 二 深入浅出Android 零 开局三问 在正式开始…...

软考高级-信息系统管理师之重要工具和技术的口语化表示(最新版)

重要工具和技术的口语化表示 本文主要介绍重要工具和技术的口语化解释 1、 模板、表格和标准:就是用之前的项目的模版、表格、标准,结合本项目进行了修改,在编制一些计划、方案的时候就可以采用这个工具和技术。可以拿来就用的,节约时间、提高质量的。 2、 产品分析:通过一…...

基于springboot+vue的个人健康信息服务平台

基于springbootvue的个人健康信息服务平台 ✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取项目下载方式&#x1f345; 一、项目背…...

SpringBoot2.x实战专题——SpringBoot2 多配置文件【开发环境、测试环境、生产环境】

SpringBoot2.x实战专题——SpringBoot2 多配置文件【开发环境、测试环境、生产环境】 目录SpringBoot2.x实战专题——SpringBoot2 多配置文件【开发环境、测试环境、生产环境】一、创建一个SpringBoot项目二、修改pom.xml中SpringBoot的版本三、配置文件3.1 application-dev.ym…...

测试2:编写测试用例的方法

2.编写测试用例的方法 7种 测试常用的方法&#xff1a;code review 代码静态分析、CI/CD CI–持续集成–开发成员经常集成它们的工作&#xff0c;尽快发现集成错误 CD–持续部署–将集成后的代码部署到更贴近真实运行的环境 2.1 测试用例的描述&#xff1a; 用例编号 用例…...

docker安装配置镜像加速器-拉取创建Mysql容器示例

List item docker 常见命令大全docker安装docker拉取创建Mysql容器docker 安装 1、安装链接&#xff1a;https://blog.csdn.net/BThinker/article/details/123358697 &#xff1b; 2、安装完成需要配置docker镜像加速器 3、docker 镜像加速器推荐使用阿里云的&#xff1a; 编…...

WSL1和WSL2相互转换以及安装路径迁移相关问题

目录 1.从WSL 1如何切换到WSL 2&#xff1f; 2.从WSL 2如何切换回WSL 1&#xff1f; 3.WSL1转换为WSL2后&#xff0c;WSL1里面安装的程序和库需要重装吗&#xff1f; 4.WSL2转换为WSL1后&#xff0c;WSL2里面安装的程序和库需要重装吗&#xff1f; 5.如何备份WSL2&#xf…...

系统分析*

文章目录系统分析分析的任务结构化方法OO的方法的任务常用的详细调查方法有哪些&#xff1f;系统分析的建模TFD业务流程图DFDDD数据流图用例模型&#xff08;重点用例图&#xff09;用例图的内容&#xff1a;用例之间的关系&#xff1a;对象模型&#xff08;类图&#xff09;时…...

【redis】持久化:RDB和AOF

redis的持久化指将数据写入可靠内存中&#xff0c;如ssd。Redis提供了4种持久化策略 RDB&#xff1a;Redis Database&#xff0c;周期性的将某个时间点的数据集快照持久化AOF&#xff1a;Append Only File&#xff0c;每次redis服务接收到写操作(修改内存的操作)&#xff0c;都…...

2023Python接口自动化测试实战教程,附视频实战讲解

这两天一直在找直接用python做接口自动化的方法&#xff0c;在网上也搜了一些博客参考&#xff0c;今天自己动手试了一下。 一、整体结构 上图是项目的目录结构&#xff0c;下面主要介绍下每个目录的作用。 Common:公共方法:主要放置公共的操作的类&#xff0c;比如数据库sql…...

【原创】java+swing+sqlserver药品管理系统设计与实现

之前数据库都是用的mysql&#xff0c;今天我们使用sqlserver在配合swing来开发一个药品管理系统。方便医院工作人员进行药品的管理&#xff0c;基础功能基本都是一些增删改查操作。 功能分析&#xff1a; 药品管理系统主要提供给管理员和员工使用&#xff0c;功能如下&#x…...

软考高级信息系统项目管理师系列之二十七:信息文档管理与配置管理

软考高级信息系统项目管理师系列之二十七:信息文档管理与配置管理 一、信息文档管理与配置管理内容整理二、信息系统文档管理1.信息系统文档概念2.软件文档分类与质量等级三、配置管理1.配置管理2.典型配置项3.配置项4.配置项操作权限5.配置项状态6.配置项版本号7.配置项版本管…...

软考高级-信息系统管理师之项目管理基础(最新版)

项目管理基础 项目管理特点战略管理三个过程IT项目特点项目管理概念项目管理特点软技能PRINCE2的四个要素组织结构职能型组织优缺点职能型组织优点同时,职能型组织也存在着如下缺点:项目型组织优缺点项目型组织优点项目型组织也存在着如下缺点:矩阵型组织优缺点矩阵型组织的优…...

leetcode240+Search a 2D Matrix II+从右上角开始

链接 class Solution { public:bool searchMatrix(vector<vector<int>>& matrix, int target) {if(matrix.size()0 || matrix[0].size()0) return false;int i0, jmatrix[0].size()-1; //从右上角开始while (i<matrix.size()&&j>0) {int x mat…...

0xL4ugh 2023

这回跟着个队伍跑&#xff0c;不过还是2X以后的成绩&#xff0c;前边太卷了。自己会的部分&#xff0c;有些是别人已经提交了的。记录一下。Cryptocrypto 1给了一些数据&#xff0c;像这样就没有别的了ct [0, 1, 1, 2, 5, 10, 20, 40, 79, 159, 317, 635, 1269, 2538, 5077, 1…...

Mybatis(4)之跟着老杜做一个简单的银行转账会话

这是个MVC项目&#xff0c;我不一定可以完整的实现这个项目&#xff0c;但力求把这个复现出来&#xff0c;尽量的复现细节。 第一步&#xff1a;创建数据库 表 创建表如下&#xff1a; 我们使用 int 是为了方便 然后采用 demcial&#xff0c;精确度较高 添加两个用户 然后…...

VBA提高篇_ 22 事件处理

文章目录1.事件编程2.常用工作簿事件名称与对应处理过程名称示例3. 事件编程的步骤4&#xff0e;工作簿事件4.1 Open4.2 BeforeClose4.3 NewSheet5&#xff0e;工作表事件6&#xff0e;变量和过程函数的作用域1.事件编程 写在事件发生地(对应工作簿或工作表) 2.常用工作簿事…...