Hyperledger Fabric 学习联盟链 4 -- Hyperledger Fabric共识排序

Dear 丶 2023-06-15 11:05 103阅读 0赞

0.下载慕课网fabric的源码

appledeMacBook-Air:hyperledger apple$ pwd

/Users/apple/go/src/github.com/hyperledger

appledeMacBook-Air:hyperledger apple$ rm -rf fabric

appledeMacBook-Air:hyperledger apple$ mkdir fabric

appledeMacBook-Air:hyperledger apple$ cd fabric

appledeMacBook-Air:fabric apple$ git clone -b release-1.0 https://git.imooc.com/coding-268/coding-268.git

正克隆到 ‘coding-268’…

Username for ‘https://git.imooc.com‘: andy-upp

Password for ‘https://andy-upp@git.imooc.com‘:

remote: Counting objects: 77566, done.

remote: Compressing objects: 100% (25320/25320), done.

remote: Total 77566 (delta 50159), reused 76891 (delta 49501)

接收对象中: 100% (77566/77566), 72.60 MiB | 261.00 KiB/s, 完成.

处理 delta 中: 100% (50159/50159), 完成.

正在检出文件: 100% (4370/4370), 完成.

appledeMacBook-Air:coding-268 apple$ ll

total 848

drwxr-xr-x 46 apple staff 1472 11 21 11:39 ./

drwxr-xr-x 3 apple staff 96 11 21 11:34 ../

-rw-r—r— 1 apple staff 6 11 21 11:39 .baseimage-release

-rw-r—r— 1 apple staff 101 11 21 11:39 .dockerignore

drwxr-xr-x 12 apple staff 384 11 21 11:39 .git/

-rwxr-xr-x 1 apple staff 199 11 21 11:39 .gitattributes*

drwxr-xr-x 4 apple staff 128 11 21 11:39 .github/

-rwxr-xr-x 1 apple staff 866 11 21 11:39 .gitignore*

-rw-r—r— 1 apple staff 62 11 21 11:39 .gitreview

-rw-r—r— 1 apple staff 2428 11 21 11:39 .travis.yml

-rw-r—r— 1 apple staff 332781 11 21 11:39 CHANGELOG.md

-rw-r—r— 1 apple staff 597 11 21 11:39 CODE_OF_CONDUCT.md

-rw-r—r— 1 apple staff 664 11 21 11:39 CONTRIBUTING.md

-rw-r—r— 1 apple staff 11358 11 21 11:39 LICENSE

-rwxr-xr-x 1 apple staff 16417 11 21 11:39 Makefile*

-rw-r—r— 1 apple staff 4909 11 21 11:39 README.md

drwxr-xr-x 16 apple staff 512 11 21 11:39 bccsp/

drwxr-xr-x 32 apple staff 1024 11 21 11:39 bddtests/

-rw-r—r— 1 apple staff 13 11 21 11:39 ci.properties

drwxr-xr-x 17 apple staff 544 11 21 11:39 common/

drwxr-xr-x 21 apple staff 672 11 21 11:39 core/

drwxr-xr-x 13 apple staff 416 11 21 11:39 devenv/ //fabric 开发环境

-rw-r—r— 1 apple staff 3251 11 21 11:39 docker-env.mk

drwxr-xr-x 8 apple staff 256 11 21 11:39 docs/

drwxr-xr-x 4 apple staff 128 11 21 11:39 events/

drwxr-xr-x 8 apple staff 256 11 21 11:39 examples/

drwxr-xr-x 14 apple staff 448 11 21 11:39 gossip/

drwxr-xr-x 3 apple staff 96 11 21 11:39 gotools/

drwxr-xr-x 12 apple staff 384 11 21 11:39 images/

drwxr-xr-x 7 apple staff 224 11 21 11:39 imocc/

-rw-r—r— 1 apple staff 3201 11 21 11:39 mkdocs.yml

drwxr-xr-x 17 apple staff 544 11 21 11:39 msp/

drwxr-xr-x 20 apple staff 640 11 21 11:39 orderer/

drwxr-xr-x 10 apple staff 320 11 21 11:39 peer/

drwxr-xr-x 3 apple staff 96 11 21 11:39 proposals/

drwxr-xr-x 11 apple staff 352 11 21 11:39 protos/

drwxr-xr-x 3 apple staff 96 11 21 11:39 release/

drwxr-xr-x 10 apple staff 320 11 21 11:39 release_notes/

drwxr-xr-x 6 apple staff 192 11 21 11:39 sampleconfig/

drwxr-xr-x 19 apple staff 608 11 21 11:39 scripts/

-rw-r—r— 1 apple staff 316 11 21 11:39 settings.gradle

drwxr-xr-x 8 apple staff 256 11 21 11:39 test/

-rw-r—r— 1 apple staff 301 11 21 11:39 tox.ini

drwxr-xr-x 4 apple staff 128 11 21 11:39 unit-test/

drwxr-xr-x 8 apple staff 256 11 21 11:39 vendor/

-rw-r—r— 1 apple staff 718 11 21 11:39 目录结构.md

appledeMacBook-Air:coding-268 apple$ cd devenv/

appledeMacBook-Air:devenv apple$ ll

total 80

drwxr-xr-x 13 apple staff 416 11 21 11:39 ./

drwxr-xr-x 46 apple staff 1472 11 21 11:39 ../

-rw-r—r— 1 apple staff 1492 11 21 11:39 README.md

-rw-r—r— 1 apple staff 3555 11 21 11:39 Vagrantfile //通过他可以创建一套跟官方人员一样的开发环境

-rw-r—r— 1 apple staff 1536 11 21 11:39 failure-motd.in

-rw-r—r— 1 apple staff 235 11 21 11:39 golang_buildcmd.sh

-rw-r—r— 1 apple staff 423 11 21 11:39 golang_buildpkg.sh

drwxr-xr-x 3 apple staff 96 11 21 11:39 images/

-rw-r—r— 1 apple staff 2328 11 21 11:39 limits.conf

-rwxr-xr-x 1 apple staff 5315 11 21 11:39 setup.sh*

-rw-r—r— 1 apple staff 3424 11 21 11:39 setupRHELonZ.sh

-rwxr-xr-x 1 apple staff 3273 11 21 11:39 setupUbuntuOnPPC64le.sh*

drwxr-xr-x 3 apple staff 96 11 21 11:39 tools/

1.共识机制介绍

fabric中,广义的共识包括3部分,而通常说共识,主要是指交易排序

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0FuZHk3MTA2NjA1NDE_size_16_color_FFFFFF_t_70

交易排序主要分为3个步骤:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0FuZHk3MTA2NjA1NDE_size_16_color_FFFFFF_t_70 1

首先看交易排序:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0FuZHk3MTA2NjA1NDE_size_16_color_FFFFFF_t_70 2

有限状态机可以理解为:如果一个系统从最初始的状态开始,如果每个状态的改变都一致的话,最后一定会得到一个一致的结果, 对于区块链来说,只有交易的顺序一致,才能保证每个状态一致,最终才能得到一致的结果。

再看区块分发:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0FuZHk3MTA2NjA1NDE_size_16_color_FFFFFF_t_70 3

在这里需要注意的是,排序节点产生的区块,并不是每个记账节点本地存储的区块, 只是一个中间状态的区块,包含排序节点收到的所有交易,不管是否有效,都会打包传递给其他组织的主节点,可以把排序节点的区块理解为一个临时区块,最后记账节点存储区块的时候会用到临时区块的一些属性,比如区块哈希,区块高度等。

最后看多通道数据隔离:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0FuZHk3MTA2NjA1NDE_size_16_color_FFFFFF_t_70 4

所有的应用程序都向排序节点提交交易,客户端进行交易提交的时候,会指定一个通道,表明该交易是发往哪一个通道的,排序节点收到该交易后会按照通道进行拆分,拆分以后再进行排序,也就是说排序不是全局排序,而是按照每一个通道单独排序,最后分别组成区块发往主节点。通道之间是相互隔离的,并不知道彼此的存在,但是每个组织的节点还是可以订阅多个通道。

2.源码阅读建议

主要阅读的源码是

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0FuZHk3MTA2NjA1NDE_size_16_color_FFFFFF_t_70 5

阅读方式是广度优先阅读方法,需要看某个细节时再深入到逻辑

3.共识机制源代码目录结构

# 目录结构

## bccsp 密码学:加密、签名以及证书等等

## bddtests 行为驱动开发 一种新型的软件开发模式,传统的软件开发模式是 需求-》概要设计-〉详细设计-》 开发,但是在行为驱动开发中,为需求-〉开发

## common 公共库 比如 * 错误处理 * 日志处理 * 账本存储 以及* 各种工具 * …

## core fabric的核心库,基本上针对每一个组件都有一个子目录,核心库,组件核心逻辑

## devenv 开发环境、Vagrant

## docs 文档相关

## events 事件监听机制,用于区块链是异步式的分布式系统

## examples 一些例子

## gossip 最终一致性共识算法,用于组织内部区块同步

## images docker镜像打包,之前用到的docker镜像都是通过这个目录下的文件生成的

## msp 成员服务管理 member service provider,读取每一个模块的证书,签名,验签等

## orderer 排序节点入口 留个坑

## peer peer节点入口

## proposals新功能提案

## protos 几乎所有fabric中提供的数据结构以及数据服务都是在这个目录下定义的

节点间通信的协议

grpc: protobuffer + rpc

jsonrpc: json + rpc

4.从入口开始

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0FuZHk3MTA2NjA1NDE_size_16_color_FFFFFF_t_70 6

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0FuZHk3MTA2NjA1NDE_size_16_color_FFFFFF_t_70 7

func main() {

  1. kingpin.Version("0.0.1")
  2. switch kingpin.MustParse(app.Parse(os.Args\[1:\])) \{
  3. // "start" command
  4. case start.FullCommand():
  5. logger.Infof("Starting %s", metadata.GetVersionInfo())
  6. // 载入配置信息
  7. conf := config.Load()
  8. // 初始化日志级别
  9. initializeLoggingLevel(conf)
  10. // 初始化profile
  11. initializeProfilingService(conf)
  12. // 初始化grpc服务端
  13. grpcServer := initializeGrpcServer(conf)
  14. // 载入msp证书
  15. initializeLocalMsp(conf)
  16. // msp证书用于签名者实例化
  17. signer := localmsp.NewSigner()
  18. // 初始化多链manager
  19. manager := initializeMultiChainManager(conf, signer)
  20. // 实例化服务实现
  21. server := NewServer(manager, signer)
  22. // 绑定服务器 + 服务实现
  23. ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)
  24. logger.Info("Beginning to serve requests")
  25. // 启动服务
  26. grpcServer.Start()
  27. // "version" command
  28. case version.FullCommand():
  29. fmt.Println(metadata.GetVersionInfo())
  30. \}

}

需要关注的是 初始化多链manager以及 服务实现

func initializeMultiChainManager(conf *config.TopLevel, signer crypto.LocalSigner) multichain.Manager {

  1. // 创建账本工厂 临时区块 ,提供三种方式 ,file(文件), json, ram(内存),一般使用基于文件的
  2. lf, \_ := createLedgerFactory(conf)
  3. // Are we bootstrapping?
  4. // 检查是否有链的存在?
  5. if len(lf.ChainIDs()) == 0 \{
  6. // 如果没有,启动第一次引导
  7. initializeBootstrapChannel(conf, lf)
  8. \} else \{
  9. logger.Info("Not bootstrapping because of existing chains")
  10. \}
  11. // 实例化共识机制
  12. consenters := make(map\[string\]multichain.Consenter)
  13. consenters\["solo"\] = solo.New()
  14. consenters\["kafka"\] = kafka.New(conf.Kafka.TLS, conf.Kafka.Retry, conf.Kafka.Version)
  15. // 实例化manager
  16. return multichain.NewManagerImpl(lf, consenters, signer)

}

//成员变量broadcast和deliver实际上是两个interface,它们都各自封装了一个Handler方法

type server struct {

  1. bh broadcast.Handler // 交易收集,客户端向orderer提供交易的时候,都会被这个命令处理
  2. dh deliver.Handler // 区块扩散,通过它向其他组织的主节点 进行区块广播

}

// NewServer creates an ab.AtomicBroadcastServer based on the broadcast target and ledger Reader

//NewServer 实现一个内部类

func NewServer(ml multichain.Manager, signer crypto.LocalSigner) ab.AtomicBroadcastServer {

  1. s := &server\{
  2. dh: deliver.NewHandlerImpl(deliverSupport\{Manager: ml\}),
  3. bh: broadcast.NewHandlerImpl(broadcastSupport\{
  4. Manager: ml,
  5. ConfigUpdateProcessor: configupdate.New(ml.SystemChannelID(), configUpdateSupport\{Manager: ml\}, signer),
  6. \}),
  7. \}
  8. return s

}

server类提供了两个方法,分别是Broadcast方法和Deliver方法,前者主要是收集交易,后者主要是广播区块,server 为什么会有这两个方法呢,我们之前看main函数的时候,有一步是 绑定服务器+服务实现, 这是jrpc提供的方法,主要是将jrpc提供的服务与具体的服务实现绑定起来

// Broadcast receives a stream of messages from a client for ordering

func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {

  1. logger.Debugf("Starting new Broadcast handler”)
  2. //defer是为了防止异常崩溃
  3. defer func() \{
  4. if r := recover(); r != nil \{
  5. logger.Criticalf("Broadcast client triggered panic: %s\\n%s", r, debug.Stack())
  6. \}
  7. logger.Debugf("Closing Broadcast stream")
  8. \}()
  9. return s.bh.Handle(srv)

}

// Deliver sends a stream of blocks to a client after ordering

func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error {

  1. logger.Debugf("Starting new Deliver handler")
  2. defer func() \{
  3. if r := recover(); r != nil \{
  4. logger.Criticalf("Deliver client triggered panic: %s\\n%s", r, debug.Stack())
  5. \}
  6. logger.Debugf("Closing Deliver stream")
  7. \}()
  8. return s.dh.Handle(srv)

}

5.Manager 源码阅读

manager.go 路径是 fabric/orderer/multichain/manager,首先看manager接口的定义

// Manager coordinates the creation and access of chains

type Manager interface {

  1. // GetChain retrieves the chain support for a chain (and whether it exists)
  2. // 获取链对象,需要注意的是这里不是直接对链进行操作,而是生成 ChainSupport 接口
  3. GetChain(chainID string) (ChainSupport, bool)
  4. // SystemChannelID returns the channel ID for the system channel
  5. // 获取系统链名,系统链的主要作用是 引导其他链的生成
  6. SystemChannelID() string
  7. // NewChannelConfig returns a bare bones configuration ready for channel
  8. // creation request to be applied on top of it
  9. // 生成或者更新链的配置 (new / update)
  10. NewChannelConfig(envConfigUpdate \*cb.Envelope) (configtxapi.Manager, error)

}

// 配置资源,每个链在生成的时候都会有自己的配置,而这个配置就被包裹在配置资源里面

type configResources struct {

  1. configtxapi.Manager

}

//定义一个方法,获取跟orderer资源相关的配置

func (cr *configResources) SharedConfig() config.Orderer {

  1. oc, ok := cr.OrdererConfig()
  2. if !ok \{
  3. logger.Panicf("\[channel %s\] has no orderer configuration", cr.ChainID())
  4. \}
  5. return oc

}

// 账本资源类,包含了配置资源,以及一个账本的读写对象

type ledgerResources struct {

  1. \*configResources
  2. ledger ledger.ReadWriter

}

// manager的实现类

type multiLedger struct {

  1. chains map\[string\]\*chainSupport // 多链对象,manager包含的所有链表的对象
  2. lock sync.Mutex
  3. consenters map\[string\]Consenter // 共识机制,现阶段所支持的共识机制
  4. ledgerFactory ledger.Factory // 账本读写工厂
  5. signer crypto.LocalSigner // 签名
  6. systemChannelID string // 系统链名
  7. systemChannel \*chainSupport // 系统链

}

// 获取链最新配置交易

func getConfigTx(reader ledger.Reader) *cb.Envelope {

  1. //首先获取ledger的最新的一个区块
  2. lastBlock := ledger.GetBlock(reader, reader.Height()-1)
  3. //找到最新配置交易所在的区块
  4. index, err := utils.GetLastConfigIndexFromBlock(lastBlock)
  5. if err != nil \{
  6. logger.Panicf("Chain did not have appropriately encoded last config in its latest block: %s", err)
  7. \}
  8. configBlock := ledger.GetBlock(reader, index)
  9. if configBlock == nil \{
  10. logger.Panicf("Config block does not exist")
  11. \}
  12. //读取最新的配置交易
  13. return utils.ExtractEnvelopeOrPanic(configBlock, 0)

}

// NewManagerImpl produces an instance of a Manager

func NewManagerImpl(ledgerFactory ledger.Factory, consenters map[string]Consenter, signer crypto.LocalSigner) Manager {

  1. ml := &multiLedger\{
  2. chains: make(map\[string\]\*chainSupport),
  3. ledgerFactory: ledgerFactory,
  4. consenters: consenters,
  5. signer: signer,
  6. \}
  7. // 读取本地存储的链的ID
  8. existingChains := ledgerFactory.ChainIDs()
  9. for \_, chainID := range existingChains \{ // 循环
  10. // 实例化账本读对象 read ledger
  11. rl, err := ledgerFactory.GetOrCreate(chainID)
  12. if err != nil \{
  13. logger.Panicf("Ledger factory reported chainID %s but could not retrieve it: %s", chainID, err)
  14. \}
  15. // 链最新配置交易
  16. configTx := getConfigTx(rl)
  17. if configTx == nil \{
  18. logger.Panic("Programming error, configTx should never be nil here")
  19. \}
  20. // 绑定配置与读写对象
  21. ledgerResources := ml.newLedgerResources(configTx)
  22. chainID := ledgerResources.ChainID()
  23. // 是否有创建其他链的权限
  24. if \_, ok := ledgerResources.ConsortiumsConfig(); ok \{
  25. if ml.systemChannelID != "" \{
  26. logger.Panicf("There appear to be two system chains %s and %s", ml.systemChannelID, chainID)
  27. \}
  28. chain := newChainSupport(createSystemChainFilters(ml, ledgerResources),
  29. ledgerResources,
  30. consenters,
  31. signer)
  32. logger.Infof("Starting with system channel %s and orderer type %s", chainID, chain.SharedConfig().ConsensusType())
  33. ml.lock.Lock()
  34. ml.chains\[chainID\] = chain
  35. ml.lock.Unlock()
  36. ml.systemChannelID = chainID
  37. ml.systemChannel = chain
  38. // We delay starting this chain, as it might try to copy and replace the chains map via newChain before the map is fully built
  39. defer chain.start() // 延迟启动
  40. \} else \{
  41. logger.Debugf("Starting chain: %s", chainID)
  42. chain := newChainSupport(createStandardFilters(ledgerResources),
  43. ledgerResources,
  44. consenters,
  45. signer)
  46. ml.lock.Lock()
  47. ml.chains\[chainID\] = chain
  48. ml.lock.Unlock()
  49. chain.start() // 启动
  50. \}
  51. \}
  52. if ml.systemChannelID == "" \{
  53. logger.Panicf("No system chain found. If bootstrapping, does your system channel contain a consortiums group definition?")
  54. \}
  55. return ml

}

6.ChainSuport 源码阅读

7.区块切割和solo模式源码阅读

8.交易收集和区块扩散源码阅读

9.共识机制总结

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0FuZHk3MTA2NjA1NDE_size_16_color_FFFFFF_t_70 8

main.go 是orderer节点的初始化的流程,然后拿到Manager,这里是多链的控制中枢,所有对链的操作都要从这里拿到CHainSupport对象的实例,

最后是ChainSupport对象,它是链对象的帮助接口,可以说是链的代理,而且也是与链一一对应的, 然后是区块切割的代码,在区块切割中,

重点是 order的方法,这个方法的逻辑就是判断当前排序节点的交易是否应该被切割到一个新的区块链,如果是的话就调用cut方法进行切割。而

solo共识是我们理解fabric共识的一个最好的例子,它将区块链这样一个分布式系统简化成一个单机版的中心化系统,这样理解起来压力就小了很多。最后看的是交易收集以及区块扩散的内容,而这正是我们一开始入口中gRPCserver的两个接口的实现。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0FuZHk3MTA2NjA1NDE_size_16_color_FFFFFF_t_70 9

发表评论

表情:
评论列表 (有 0 条评论,103人围观)

还没有评论,来说两句吧...

相关阅读