在fabric中,有两类链码,一类是系统链码,一类是用户链码。而链码都需要安装和实例化才能使用,在这当中,它们虽然原理相似,但是实现的方式还是有所不同。在系统链码中,首先要Register,然后再Deploy才能使用;而用户链码则首先要Install,然后再instantiate就可以被外部接口使用了。 在前面的分析中可以知道在Launch函数中,是启动容器的入口。那么就从Launch这个函数开始看(core/chaincode/chaincode_support.go): 1、容器的启动 其它mock部分就不列出来了,供测试使用的,有兴趣可以看看源码。再看一下实例的具体生成成,沿着上面的NewVM来看: 这里只分析启动部分,其它和这个基本差不多: 这里面会判断是否生成了链码的进程,否则: 系统链码直接启动了内存的虚拟机。只有用户链码才会启动Docker,在内部运行虚拟机。所以这二者才分在类的生成中从同一接口继承但分成了两个类。特别需要注意的是要看看上面对Peer侧和链码侧的消息处理的生成过程,这个非常重要。代码里有直接注释,感兴趣可以把代码跟到底看看到底是如何生成的。 2、消息的传递 1、虚拟机的启动 看一下创建容器的代码: 2、消息的传递 即Register调用HandleChaincodeStream,再调用ProcessStream,在其中的默认选项中调用handleMessage,又回到了Peer侧。 通过分析两类链码容器和执行情况,基本上就明白了链码源码执行的环境,这正是对以前的“链码源码分析”的进一步完善。结合这两篇文章基本上就明白了,链码在Fabric上执行的看哪个流程和方式。掌握了链码执行的原理和运行的过程,就可以针对实际情况对其做为相应的优化和修改,从为我所用到我想我用。
一、容器和虚拟机
因此,对容器的启动也可分成这两部分来进行解析,从宏观上把握入口,然后分类进行源码的解析。二、整体的入口
func (cs *ChaincodeSupport) Launch(chainID, chaincodeName, chaincodeVersion string, qe ledger.QueryExecutor) (*Handler, error) { cname := chaincodeName + ":" + chaincodeVersion if h := cs.HandlerRegistry.Handler(cname); h != nil { return h, nil } //此处到得容器相关的信息,包括生产容器的具体类型是系统链码容器还是用户链码容器 //在后面会说明,系统链码启动的容器是:inprocVM---inproContainer,用户链码启动的容器是DockerVM---DockerContainer ccci, err := cs.Lifecycle.ChaincodeContainerInfo(chaincodeName, qe) if err != nil { // TODO: There has to be a better way to do this... if cs.UserRunsCC { chaincodeLogger.Error( "You are attempting to perform an action other than Deploy on Chaincode that is not ready and you are in developer mode. Did you forget to Deploy your chaincode?", ) } return nil, errors.Wrapf(err, "[channel %s] failed to get chaincode container info for %s", chainID, cname) } //启动Runtime中的Launch if err := cs.Launcher.Launch(ccci); err != nil { return nil, errors.Wrapf(err, "[channel %s] could not launch chaincode %s", chainID, cname) } h := cs.HandlerRegistry.Handler(cname) if h == nil { return nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", chainID, cname) } return h, nil } //runtime_launcher.go func (r *RuntimeLauncher) Launch(ccci *ccprovider.ChaincodeContainerInfo) error { var startFailCh chan error var timeoutCh <-chan time.Time startTime := time.Now() cname := ccci.Name + ":" + ccci.Version launchState, alreadyStarted := r.Registry.Launching(cname) if !alreadyStarted { startFailCh = make(chan error, 1) timeoutCh = time.NewTimer(r.StartupTimeout).C codePackage, err := r.getCodePackage(ccci) if err != nil { return err } go func() { //启动Process if err := r.Runtime.Start(ccci, codePackage); err != nil { startFailCh <- errors.WithMessage(err, "error starting container") return } exitCode, err := r.Runtime.Wait(ccci) if err != nil { launchState.Notify(errors.Wrap(err, "failed to wait on container exit")) } launchState.Notify(errors.Errorf("container exited with %d", exitCode)) }() } ...... return err } // Start launches chaincode in a runtime environment. func (c *ContainerRuntime) Start(ccci *ccprovider.ChaincodeContainerInfo, codePackage []byte) error { cname := ccci.Name + ":" + ccci.Version lc, err := c.LaunchConfig(cname, ccci.Type) if err != nil { return err } chaincodeLogger.Debugf("start container: %s", cname) chaincodeLogger.Debugf("start container with args: %s", strings.Join(lc.Args, " ")) chaincodeLogger.Debugf("start container with env:nt%s", strings.Join(lc.Envs, "nt")) scr := container.StartContainerReq{ Builder: &container.PlatformBuilder{ Type: ccci.Type, Name: ccci.Name, Version: ccci.Version, Path: ccci.Path, CodePackage: codePackage, PlatformRegistry: c.PlatformRegistry, }, Args: lc.Args, Env: lc.Envs, FilesToUpload: lc.Files, CCID: ccintf.CCID{ Name: ccci.Name, Version: ccci.Version, }, } //启动Process--注意传入的容器类型 if err := c.Processor.Process(ccci.ContainerType, scr); err != nil { return errors.WithMessage(err, "error starting container") } return nil } func (vmc *VMController) Process(vmtype string, req VMCReq) error { v := vmc.newVM(vmtype) ccid := req.GetCCID() id := ccid.GetName() vmc.lockContainer(id) defer vmc.unlockContainer(id) //启动虚拟机 return req.Do(v) } //到这里容器的实例化就进入到了接口的具体确定阶段,根据不同的类型来确定是SCC或ACC func (vmc *VMController) newVM(typ string) VM { v, ok := vmc.vmProviders[typ] if !ok { vmLogger.Panicf("Programming error: unsupported VM type: %s", typ) } return v.NewVM() }
三、系统容器虚拟机的启动
在上面的Process中最后一行代码中req.Do(v),启动了相关的虚拟机容器。看一下这个接口的定义:type VMCReq interface { Do(v VM) error GetCCID() ccintf.CCID } //StartContainerReq - properties for starting a container. type StartContainerReq struct { ccintf.CCID Builder Builder Args []string Env []string FilesToUpload map[string][]byte } //StopContainerReq - properties for stopping a container. type StopContainerReq struct { ccintf.CCID Timeout uint //by default we will kill the container after stopping Dontkill bool //by default we will remove the container after killing Dontremove bool } func (w WaitContainerReq) Do(v VM) error { exited := w.Exited go func() { exitCode, err := v.Wait(w.CCID) exited(exitCode, err) }() return nil }
// NewVM creates an inproc VM instance func (r *Registry) NewVM() container.VM { return NewInprocVM(r) } // NewInprocVM creates a new InprocVM func NewInprocVM(r *Registry) *InprocVM { return &InprocVM{ registry: r, } }
func (si StartContainerReq) Do(v VM) error { return v.Start(si.CCID, si.Args, si.Env, si.FilesToUpload, si.Builder) } //Start starts a previously registered system codechain func (vm *InprocVM) Start(ccid ccintf.CCID, args []string, env []string, filesToUpload map[string][]byte, builder container.Builder) error { path := ccid.GetName() ipctemplate := vm.registry.getType(path) if ipctemplate == nil { return fmt.Errorf(fmt.Sprintf("%s not registered", path)) } instName := vm.GetVMName(ccid) ipc, err := vm.getInstance(ipctemplate, instName, args, env) if err != nil { return fmt.Errorf(fmt.Sprintf("could not create instance for %s", instName)) } if ipc.running { return fmt.Errorf(fmt.Sprintf("chaincode running %s", path)) } ipc.running = true go func() { defer func() { if r := recover(); r != nil { inprocLogger.Criticalf("caught panic from chaincode %s", instName) } }() ipc.launchInProc(instName, args, env) }() return nil }
func (ipc *inprocContainer) launchInProc(id string, args []string, env []string) error { if ipc.ChaincodeSupport == nil { inprocLogger.Panicf("Chaincode support is nil, most likely you forgot to set it immediately after calling inproccontroller.NewRegsitry()") } //建立一个Send和一个Recv通道 peerRcvCCSend := make(chan *pb.ChaincodeMessage) ccRcvPeerSend := make(chan *pb.ChaincodeMessage) var err error //链码侧通道Handler ccchan := make(chan struct{}, 1) //Peer侧通道Handler ccsupportchan := make(chan struct{}, 1) shimStartInProc := _shimStartInProc // shadow to avoid race in test //链码侧相关 go func() { defer close(ccchan) inprocLogger.Debugf("chaincode started for %s", id) if args == nil { args = ipc.args } if env == nil { env = ipc.env } err := shimStartInProc(env, args, ipc.chaincode, ccRcvPeerSend, peerRcvCCSend) if err != nil { err = fmt.Errorf("chaincode-support ended with err: %s", err) _inprocLoggerErrorf("%s", err) } inprocLogger.Debugf("chaincode ended for %s with err: %s", id, err) }() // shadow function to avoid data race inprocLoggerErrorf := _inprocLoggerErrorf //Peer侧相关 go func() { defer close(ccsupportchan) inprocStream := newInProcStream(peerRcvCCSend, ccRcvPeerSend) inprocLogger.Debugf("chaincode-support started for %s", id) //消息处理 err := ipc.ChaincodeSupport.HandleChaincodeStream(inprocStream) if err != nil { err = fmt.Errorf("chaincode ended with err: %s", err) inprocLoggerErrorf("%s", err) } inprocLogger.Debugf("chaincode-support ended for %s with err: %s", id, err) }() select { case <-ccchan: close(peerRcvCCSend) inprocLogger.Debugf("chaincode %s quit", id) case <-ccsupportchan: close(ccRcvPeerSend) inprocLogger.Debugf("chaincode support %s quit", id) case <-ipc.stopChan: close(ccRcvPeerSend) close(peerRcvCCSend) inprocLogger.Debugf("chaincode %s stopped", id) } return err } // StartInProc is an entry point for system chaincodes bootstrap. It is not an // API for chaincodes. func StartInProc(env []string, args []string, cc Chaincode, recv <-chan *pb.ChaincodeMessage, send chan<- *pb.ChaincodeMessage) error { chaincodeLogger.Debugf("in proc %v", args) var chaincodename string for _, v := range env { if strings.Index(v, "CORE_CHAINCODE_ID_NAME=") == 0 { p := strings.SplitAfter(v, "CORE_CHAINCODE_ID_NAME=") chaincodename = p[1] break } } if chaincodename == "" { return errors.New("error chaincode id not provided") } stream := newInProcStream(recv, send) chaincodeLogger.Debugf("starting chat with peer using name=%s", chaincodename) //看看这是谁,Handler消息处理就在这个函数里,前面分析过,这里就不再赘述 err := chatWithPeer(chaincodename, stream, cc) return err }
看到了chatWithPeer,就想到了handleMessage,这个在前面有详细分析,如果有什么不明白可以看看“链码源码分析”。下面只列出代码:// handleMessage message handles loop for shim side of chaincode/peer stream. func (handler *Handler) handleMessage(msg *pb.ChaincodeMessage, errc chan error) error { if msg.Type == pb.ChaincodeMessage_KEEPALIVE { chaincodeLogger.Debug("Sending KEEPALIVE response") handler.serialSendAsync(msg, nil) // ignore errors, maybe next KEEPALIVE will work return nil } chaincodeLogger.Debugf("[%s] Handling ChaincodeMessage of type: %s(state:%s)", shorttxid(msg.Txid), msg.Type, handler.state) var err error switch handler.state { case ready: err = handler.handleReady(msg, errc) case established: err = handler.handleEstablished(msg, errc) case created: err = handler.handleCreated(msg, errc) default: err = errors.Errorf("[%s] Chaincode handler cannot handle message (%s) with payload size (%d) while in state: %s", msg.Txid, msg.Type, len(msg.Payload), handler.state) } if err != nil { payload := []byte(err.Error()) errorMsg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid} handler.serialSend(errorMsg) return err } return nil }
四、用户容器虚拟机的启动
// NewVM creates a new DockerVM instance func (p *Provider) NewVM() container.VM { return NewDockerVM(p.PeerID, p.NetworkID, p.BuildMetrics) } // NewDockerVM returns a new DockerVM instance func NewDockerVM(peerID, networkID string, buildMetrics *BuildMetrics) *DockerVM { return &DockerVM{ PeerID: peerID, NetworkID: networkID, getClientFnc: getDockerClient, BuildMetrics: buildMetrics, } } // Start starts a container using a previously created docker image func (vm *DockerVM) Start(ccid ccintf.CCID, args, env []string, filesToUpload map[string][]byte, builder container.Builder) error { imageName, err := vm.GetVMNameForDocker(ccid) if err != nil { return err } attachStdout := viper.GetBool("vm.docker.attachStdout") containerName := vm.GetVMName(ccid) logger := dockerLogger.With("imageName", imageName, "containerName", containerName) //通过VM获得客户端 client, err := vm.getClientFnc() if err != nil { logger.Debugf("failed to get docker client", "error", err) return err } //停止容器和虚拟机 vm.stopInternal(client, containerName, 0, false, false) // 此处创建容器 err = vm.createContainer(client, imageName, containerName, args, env, attachStdout) if err == docker.ErrNoSuchImage { //如果没有镜像,则使用builder来创建相关容器 reader, err := builder.Build() if err != nil { return errors.Wrapf(err, "failed to generate Dockerfile to build %s", containerName) } //部署镜像 err = vm.deployImage(client, ccid, reader) if err != nil { return err } //创建镜像后,再创建容器 err = vm.createContainer(client, imageName, containerName, args, env, attachStdout) if err != nil { logger.Errorf("failed to create container: %s", err) return err } } else if err != nil { logger.Errorf("create container failed: %s", err) return err } // stream stdout and stderr to chaincode logger if attachStdout { containerLogger := flogging.MustGetLogger("peer.chaincode." + containerName) streamOutput(dockerLogger, client, containerName, containerLogger) } // upload specified files to the container before starting it // this can be used for configurations such as TLS key and certs //处理容器需要的证书相关的文件 if len(filesToUpload) != 0 { // the docker upload API takes a tar file, so we need to first // consolidate the file entries to a tar payload := bytes.NewBuffer(nil) gw := gzip.NewWriter(payload) tw := tar.NewWriter(gw) for path, fileToUpload := range filesToUpload { cutil.WriteBytesToPackage(path, fileToUpload, tw) } // Write the tar file out if err := tw.Close(); err != nil { return fmt.Errorf("Error writing files to upload to Docker instance into a temporary tar blob: %s", err) } gw.Close() //上传数据 err := client.UploadToContainer(containerName, docker.UploadToContainerOptions{ InputStream: bytes.NewReader(payload.Bytes()), Path: "/", NoOverwriteDirNonDir: false, }) if err != nil { return fmt.Errorf("Error uploading files to the container instance %s: %s", containerName, err) } } // start container with HostConfig was deprecated since v1.10 and removed in v1.2 err = client.StartContainer(containerName, nil) if err != nil { dockerLogger.Errorf("start-could not start container: %s", err) return err } dockerLogger.Debugf("Started container %s", containerName) return nil }
func (vm *DockerVM) createContainer(client dockerClient, imageID, containerID string, args, env []string, attachStdout bool) error { logger := dockerLogger.With("imageID", imageID, "containerID", containerID) logger.Debugw("create container") _, err := client.CreateContainer(docker.CreateContainerOptions{ Name: containerID, Config: &docker.Config{ Cmd: args, Image: imageID, Env: env, AttachStdout: attachStdout, AttachStderr: attachStdout, }, HostConfig: getDockerHostConfig(), }) if err != nil { return err } logger.Debugw("created container") return nil } // See https://goo.gl/tyzwVM for more details. func (c *Client) CreateContainer(opts CreateContainerOptions) (*Container, error) { path := "/containers/create?" + queryString(opts) resp, err := c.do( "POST", path, doOptions{ data: struct { *Config HostConfig *HostConfig `json:"HostConfig,omitempty" yaml:"HostConfig,omitempty" toml:"HostConfig,omitempty"` NetworkingConfig *NetworkingConfig `json:"NetworkingConfig,omitempty" yaml:"NetworkingConfig,omitempty" toml:"NetworkingConfig,omitempty"` }{ opts.Config, opts.HostConfig, opts.NetworkingConfig, }, context: opts.Context, }, ) if e, ok := err.(*Error); ok { if e.Status == http.StatusNotFound { return nil, ErrNoSuchImage } if e.Status == http.StatusConflict { return nil, ErrContainerAlreadyExists } // Workaround for 17.09 bug returning 400 instead of 409. // See https://github.com/moby/moby/issues/35021 if e.Status == http.StatusBadRequest && strings.Contains(e.Message, "Conflict.") { return nil, ErrContainerAlreadyExists } } if err != nil { return nil, err } defer resp.Body.Close() var container Container if err := json.NewDecoder(resp.Body).Decode(&container); err != nil { return nil, err } container.Name = opts.Name return &container, nil } func (c *Client) startContainer(id string, hostConfig *HostConfig, opts doOptions) error { path := "/containers/" + id + "/start" if c.serverAPIVersion == nil { c.checkAPIVersion() } if c.serverAPIVersion != nil && c.serverAPIVersion.LessThan(apiVersion124) { opts.data = hostConfig opts.forceJSON = true } resp, err := c.do("POST", path, opts) if err != nil { if e, ok := err.(*Error); ok && e.Status == http.StatusNotFound { return &NoSuchContainer{ID: id, Err: err} } return err } defer resp.Body.Close() if resp.StatusCode == http.StatusNotModified { return &ContainerAlreadyRunning{ID: id} } return nil }
在chaincode.go的shim包启动时(用户链码启动时)的Start函数中调用了userChaincodeStreamGetter–>chaincodeSupportClient.Register,而chaindcode_support.go中的Register实现了
(protos/peer/chaincode_shim.pb.go) // ChaincodeSupportServer is the server API for ChaincodeSupport service. type ChaincodeSupportServer interface { Register(ChaincodeSupport_RegisterServer) error }
而在前面的链码源码分析中已经分析过,一个用户的链码启动是以在链码的main函数中调用 shim.Start()为开始的,此函数数最终会调用chatWithPeer函数,其中的默认项为调用handleMessage,开始链码铡的消息循环。这样二者再按照前面提到的通过GRPC互相发送消息,就可以展开一个用户链码和Peer侧的通信了.五、总结
推荐一下阿里朋友的PerfMa社区的课程,都是干货:
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算