7 Etcd服务端实现
7.1 Etcd启动
Etcd有多种启动方式,我们从最简单的方式入手,也就是从embed的etcd.go开始启动,最后会启动EtcdServer。
先看看etcd.go中的启动代码:
func StartEtcd(inCfg *Config) (e *Etcd, err error)
从StartEtcd方法启动etcd服务,参数是初始配置信息config,启动集群间监听进程和客户端监听进程,最后启动EtcdServer。
主要代码:
e = &Etcd{ cfg: *inCfg, stopc: make(chan struct{})} cfg := &e.cfg if e.Peers, err = startPeerListeners(cfg); err != nil { return } if e.sctxs, err = startClientListeners(cfg); err != nil { return } if e.Server, err = etcdserver.NewServer(srvcfg); err != nil { return } e.Server.Start()
startPeerListeners启动Peer监听,等待集群中其他机器连接自己。startClientListeners启动客户端监听Socket,等待客户端请求并响应。最后调用Start方法启动EtcdServer。
7.2 EtcdServer
EtcdServer位于etcdserver/server.go,定义了Server接口和EtcdServer对象。EtcdServer从逻辑上讲代表了一个完整的Etcd服务。
图7.1 Etcd服务端的功能示意图
Etcd服务端主要提供两大类客户端接口:
(1)集群配置
由memberHandler负责,提供添加集群成员,删除成员,更新成员信息三种接口服务。
(2)KV键值:由keysHandler负责。
KeysHandler接收到客户端请求后,调用EtcdServer的Do方法处理请求,Watcher类的客户端请求信息同样包含在keysHandler中了。KV键值响应主要在v2_server.go中定义,etcd新版本同时还提供了v3操作命令集,本文不讨论v3的源码实现。
7.2.1 接口定义
Etcdserver/server.go中定义了Server接口,是服务端的主接口,其中Do方法处理客户端请求。
Server.go中定义了EtcdServer对象,它是Server接口的实现类。Server中的Do接口是专门用来响应客户端请求的。
Server接口定义:
-
start
读取配置文件,启动本Server。
-
stop
停止本Server
-
ID
获取本节点server的ID,集群中所有的机器都有唯一ID,用于标识自己。
-
Leader
获取leader的ID
-
Do
处理客户群请求,返回处理结果。
定义:
func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error)
在server.go中并没有看到Go接口的实现,其实它是在v2_server.go文件中定义的。
-
Process
Process(ctx context.Context, m raftpb.Message) error
处理Raft消息。
-
AddMember
向Etcd集群中增加一台服务器,新增服务器的ID必须唯一标识。
-
RemoveMember
从集群删除一台服务器,删除服务器的ID必须已经存在于集群中。
-
UpdateMember
修改集群成员属性,如果成员ID不存在则返回ErrIDNotFound错误。
7.2.2 实体定义
EtcdServer表示一个独立运行的Etcd节点。
type EtcdServer struct { inflightSnapshots int64 appliedIndex uint64 committedIndex uint64. consistIndex consistentIndex Cfg *ServerConfig readych chan struct{} r raftNode snapCount uint64 w wait.Wait readMu sync.RWMutex readwaitc chan struct{} readNotifier *notifier stop chan struct{} stopping chan struct{} done chan struct{} errorc chan error id types.ID attributes membership.Attributes cluster *membership.RaftCluster store store.Store applyV2 ApplierV2 applyV3 applierV3 applyV3Base applierV3 applyWait wait.WaitTime kv mvcc.ConsistentWatchableKV lessor lease.Lessor bemu sync.Mutex be backend.Backend authStore auth.AuthStore alarmStore *alarm.AlarmStore stats *stats.ServerStats lstats *stats.LeaderStats SyncTicker *time.Ticker compactor *compactor.Periodic peerRt http.RoundTripper reqIDGen *idutil.Generator forceVersionC chan struct{} wgMu sync.RWMutex wg sync.WaitGroup ctx context.Context cancel context.CancelFunc leadTimeMu sync.RWMutex leadElectedTime time.Time }
7.2.3 Do
Do定义在v2_server.go中,处理客户群请求包,调用raftNode的Propose方法。在上一章已经介绍过。
对于KV键值请求,Do方法是在etcdServer/v2_server.go中定义的,它的相关代码逻辑如下:
func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { r.ID = s.reqIDGen.Next() if r.Method == "GET" && r.Quorum { r.Method = "QGET" } v2api := (v2API)(&v2apiStore{ s}) switch r.Method { case "POST": return v2api.Post(ctx, &r) case "PUT": return v2api.Put(ctx, &r) case "DELETE": return v2api.Delete(ctx, &r) case "QGET": return v2api.QGet(ctx, &r) case "GET": return v2api.Get(ctx, &r) case "HEAD": return v2api.Head(ctx, &r) } return Response{}, ErrUnknownMethod }
可以看到对客户端的KV键值请求,最终是通过v2apiStore的相关方法来实现。客户端的命令前缀为"/v2/keys"。支持的命令有以下这些:
-
GET/QGET:读取键值
-
POST:创建一个新的KV键值
-
PUT:重新设置键值的值
-
DELETE:删除已有键值
v2apiStore包含了EtcdServer引用。
type v2apiStore struct{ s *EtcdServer }
除了GET命令,其余Post,Put和Delete每个写操作请求最后都是通过processRaftRequest方法来处理的。
我们先看看GET命令的处理:
func (a *v2apiStore) Get(ctx context.Context, r *pb.Request) (Response, error) { if r.Wait { wc, err := a.s.store.Watch(r.Path, r.Recursive, r.Stream, r.Since) if err != nil { return Response{}, err } return Response{ Watcher: wc}, nil } ev, err := a.s.store.Get(r.Path, r.Recursive, r.Sorted) if err != nil { return Response{}, err } return Response{ Event: ev}, nil }
看到对于普通的GET操作,直接调用store.Get方法获取KV值返回给客户端,如果是Watcher操作,则返回Watcher给客户端,客户端后续通过Watcher接口读取变化值。
对于POST,PUT,DELETE命令,走下述Propose流程处理。
图7.2 Propose流程示意图
比如"DELETE"命令。
func (a *v2apiStore) Delete(ctx context.Context, r *pb.Request) (Response, error) { return a.processRaftRequest(ctx, r) }
processRaftRequest方法的源码如下:
func (a *v2apiStore) processRaftRequest(ctx context.Context, r *pb.Request) (Response, error) { data, err := r.Marshal() if err != nil { return Response{}, err } ch := a.s.w.Register(r.ID) start := time.Now() a.s.r.Propose(ctx, data) proposalsPending.Inc() defer proposalsPending.Dec() select { case x := <-ch: resp := x.(Response) return resp, resp.err case <-ctx.Done(): proposalsFailed.Inc() a.s.w.Trigger(r.ID, nil) // GC wait return Response{}, a.s.parseProposeCtxErr(ctx.Err(), start) case <-a.s.stopping: } return Response{}, ErrStopped }
-
data, err := r.Marshal()语句:
这条语句从pb.request得到请求数据data
-
ch := a.s.w.Register(r.ID)语句:
注册chain,一直等待直到ch有响应数据。
Register方法是wait的Register方法。该方法直到调用wait的Trigger方法后才会有数据从而触发select在该Register Id上线程被唤醒。Wait在pkg/wait中定义。
-
a.s.r.Propose(ctx, data)
Propose方法在node中定义,raftNode在etcdserver/node.go文件中。Propose将写事务请求发给Leader,等待集群间同步。Propose集群间同步消息完成后会唤醒a.s.w.Register语句。
调用raft/node的Propose方法处理写事务请求,进一步调用step方法将写事务封装成MsgProp消息并传递给集群中其他机器。
func (n *node) Propose(ctx context.Context, data []byte) error { return n.step(ctx, pb.Message{ Type: pb.MsgProp, Entries: []pb.Entry{ { Data: data}}}) }
step会调用StepFunc函数来处理MsgProp消息,根据leader,follower,candidate等运行状态分别调用不同的实现函数。
-
select语句
select … case …语句类似于Socket通信中的select语句,它的含义是只要任意一个case语句有数据返回就往下执行,否则就阻塞在这里让出CPU给其他线程执行。
case x := <-ch:当ch有值时,将ch赋值给x变量,同时唤醒case语句被执行,这里将执行以下代码:
resp := x.(Response) return resp, resp.err
此时将ch中的返回结果Response回复给调用者(即客户端)。
case <-ctx.Done():说明上下文被中断,Context的Done()被触发,此时写事务执行失败,返回空Response。
7.2.4 初始化
Etcd服务端主要由5大组件构成,他们的分工如下:
-
etcdServer:主进程,相当于整个Etcd的容器,包含了raftNode,WAL,snapshotter等多个关键组件。
-
raftNode:执行raft协议,保证写事务的集群一致性维护。
-
Store:管理维护Etcd数据库
-
Wal:管理事务日志
-
Snapshotter:负责数据快照,管理store数据库在内存中和磁盘上的相互转换。
raftNode除了负责集群间raft消息交互,还负责事务和快照的存储,保持数据一致性。
Etcd定义了一个storage数据结构,一起负责事务和快照。
type storage struct { *wal.WAL *snap.Snapshotter }
storage中没有指定WAL和Snapshotter的变量名称,这两个类的方法都可直接通过storage来调用,比如WAL的Save方法,可以通过storage.Save来调用,也可以通过storage.WAL.Save来调用,这两者是等价的,在阅读源码的时候要注意这一点,否则对Go语法不太了解的读者会感到迷惑。
func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { st := store.New(StoreClusterPrefix, StoreKeysPrefix) var ( w *wal.WAL n raft.Node s *raft.MemoryStorage id types.ID cl *membership.RaftCluster ) haveWAL := wal.Exist(cfg.WALDir()) ss := snap.New(cfg.SnapDir()) bepath := filepath.Join(cfg.SnapDir(), databaseFilename) beExist := fileutil.Exist(bepath) switch { case haveWAL: snapshot, err = ss.Load() if snapshot != nil { if err = st.Recovery(snapshot.Data); err != nil { plog.Panicf("recovered store from snapshot error: %v", err) } } cfg.Print() if !cfg.ForceNewCluster { id, cl, n, s, w = restartNode(cfg, snapshot) } else { id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot) } cl.SetStore(st) cl.SetBackend(be) cl.Recover(api.UpdateCapability) } if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil { return nil, fmt.