memberlist库的简单用法如下,注意下面使用for循环来执行 list.Join ,原因是一开始各节点都没有runing,直接执行 Join 会出现连接拒绝的错误。

package main
import (
func main() {
	/* Create the initial memberlist from a safe configuration.
	   Please reference the godoc for other default config types.
	list, err := memberlist.Create(memberlist.DefaultLocalConfig())
	if err != nil {
		panic("Failed to create memberlist: " + err.Error())
	t := time.NewTicker(time.Second * 5)
	for {
		select {
		case <-t.C:
			// Join an existing cluster by specifying at least one known member.
			n, err := list.Join([]string{""})
			if err != nil {
				fmt.Println("Failed to join cluster: " + err.Error())
			fmt.Println("member number is:", n)
			goto END
	for {
		select {
		case <-t.C:
			// Ask for members of the cluster
			for _, member := range list.Members() {
				fmt.Printf("Member: %s %s\n", member.Name, member.Addr)
	// Continue doing whatever you need, memberlist will maintain membership
	// information in the background. Delegates can be used for receiving
	// events when members join or leave.


  • Create:根据入参配置创建一个 Memberlist ,初始化阶段 Memberlist 仅包含本节点状态。注意此时并不会连接到其他节点,执行成功之后就可以允许其他节点加入该memberlist。

  • Join:使用已有的 Memberlist 来尝试连接给定的主机,并与之同步状态,以此来加入某个cluster。执行该操作可以让其他节点了解到本节点的存在。最后返回成功建立连接的节点数以及错误信息,如果没有与任何节点建立连接,则返回错误。




memberlist.Create 的入参要求给出相应的 配置 信息, DefaultLocalConfig() 给出了通用的配置信息,但还需要实现相关接口来实现成员状态的同步以及用户数据的收发。注意下面有些接口是必选的,有些则可选:

type Config struct {
	// ...
	// Delegate and Events are delegates for receiving and providing
	// data to memberlist via callback mechanisms. For Delegate, see
	// the Delegate interface. For Events, see the EventDelegate interface.
	// The DelegateProtocolMin/Max are used to guarantee protocol-compatibility
	// for any custom messages that the delegate might do (broadcasts,
	// local/remote state, etc.). If you don't set these, then the protocol
	// versions will just be zero, and version compliance won't be done.
	Delegate                Delegate
	Events                  EventDelegate
	Conflict                ConflictDelegate
	Merge                   MergeDelegate
	Ping                    PingDelegate
	Alive                   AliveDelegate

memberlist使用如下 类型 的消息来同步集群状态和处理用户消息:

const (
	pingMsg messageType = iota
	userMsg // User mesg, not handled by us



type Delegate interface {
	// NodeMeta is used to retrieve meta-data about the current node
	// when broadcasting an alive message. It's length is limited to
	// the given byte size. This metadata is available in the Node structure.
	NodeMeta(limit int) []byte

	// NotifyMsg is called when a user-data message is received.
	// Care should be taken that this method does not block, since doing
	// so would block the entire UDP packet receive loop. Additionally, the byte
	// slice may be modified after the call returns, so it should be copied if needed

	// GetBroadcasts is called when user data messages can be broadcast.
	// It can return a list of buffers to send. Each buffer should assume an
	// overhead as provided with a limit on the total byte size allowed.
	// The total byte size of the resulting data to send must not exceed
	// the limit. Care should be taken that this method does not block,
	// since doing so would block the entire UDP packet receive loop.
	GetBroadcasts(overhead, limit int) [][]byte

	// LocalState is used for a TCP Push/Pull. This is sent to
	// the remote side in addition to the membership information. Any
	// data can be sent here. See MergeRemoteState as well. The `join`
	// boolean indicates this is for a join instead of a push/pull.
	LocalState(join bool) []byte

	// MergeRemoteState is invoked after a TCP Push/Pull. This is the
	// state received from the remote side and is the result of the
	// remote side's LocalState call. The 'join'
	// boolean indicates this is for a join instead of a push/pull.
	MergeRemoteState(buf []byte, join bool)


  • NotifyMsg:用于接收用户消息( userMsg )。注意不能阻塞该方法,否则会阻塞整个UDP/TCP报文接收循环。此外由于数据可能在方法调用时被修改,因此应该事先拷贝数据。

    该方法用于接收通过UDP/TCP方式发送的用户消息( userMsg ):

    注意UDP方式并不是立即发送的,它会随gossip周期性发送或在处理 pingMsg 等消息时发送从GetBroadcasts获取到的用户消息。

    func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error
    func (m *Memberlist) SendToAddress(a Address, msg []byte) error
    func (m *Memberlist) SendReliable(to *Node, msg []byte) error
  • GetBroadcasts:用于在gossip周期性调度或处理处理 pingMsg 等消息时携带用户消息,因此并不是即时的。通常会把需要发送的消息通过 TransmitLimitedQueue.QueueBroadcast 保存起来,然后在发送时通过 TransmitLimitedQueue.GetBroadcasts 获取需要发送的消息。见下面 TransmitLimitedQueue 的描述。

  • LocalState:用于TCP Push/Pull,用于向远端发送除成员之外的信息(可以发送任意数据),用于定期同步成员状态。参数 join 用于表示将该方法用于join阶段,而非push/pull。

  • MergeRemoteState:TCP Push/Pull之后调用,接收到远端的状态(即远端调用LocalState的结果)。参数 join 用于表示将该方法用于join阶段,而非push/pull。



仅用于接收成员的joining 和leaving通知,可以用于更新本地的成员状态信息。

type EventDelegate interface {
	// NotifyJoin is invoked when a node is detected to have joined.
	// The Node argument must not be modified.

	// NotifyLeave is invoked when a node is detected to have left.
	// The Node argument must not be modified.

	// NotifyUpdate is invoked when a node is detected to have
	// updated, usually involving the meta data. The Node argument
	// must not be modified.

ChannelEventDelegate 实现了简单的 EventDelegate 接口:

type ChannelEventDelegate struct {
  Ch chan<- NodeEvent



type ConflictDelegate interface {
	// NotifyConflict is invoked when a name conflict is detected
	NotifyConflict(existing, other *Node)


在集群执行merge操作时调用。 NotifyMerge 方法的参数 peers 提供了对端成员信息。 可以不实现该接口。

type MergeDelegate interface {
	// NotifyMerge is invoked when a merge could take place.
	// Provides a list of the nodes known by the peer. If
	// the return value is non-nil, the merge is canceled.
	NotifyMerge(peers []*Node) error


用于通知观察者完成一个ping消息( pingMsg )要花费多长时间。可以在 NotifyPingComplete 中(使用histogram)统计ping的执行时间。

type PingDelegate interface {
	// AckPayload is invoked when an ack is being sent; the returned bytes will be appended to the ack
	AckPayload() []byte
	// NotifyPing is invoked when an ack for a ping is received
	NotifyPingComplete(other *Node, rtt time.Duration, payload []byte)


当接收到 aliveMsg 消息时调用的接口,可以用于添加日志和指标等信息。

type AliveDelegate interface {
	// NotifyAlive is invoked when a message about a live
	// node is received from the network.  Returning a non-nil
	// error prevents the node from being considered a peer.
	NotifyAlive(peer *Node) error



// Broadcast is something that can be broadcasted via gossip to
// the memberlist cluster.
type Broadcast interface {
	// Invalidates checks if enqueuing the current broadcast
	// invalidates a previous broadcast
	Invalidates(b Broadcast) bool

	// Returns a byte form of the message
	Message() []byte

	// Finished is invoked when the message will no longer
	// be broadcast, either due to invalidation or to the
	// transmit limit being reached

Broadcast 接口通常作为 TransmitLimitedQueue.QueueBroadcast 的入参:

func (q *TransmitLimitedQueue) QueueBroadcast(b Broadcast) {
	q.queueBroadcast(b, 0)


type simpleBroadcast []byte

func (b simpleBroadcast) Message() []byte                       { return []byte(b) }
func (b simpleBroadcast) Invalidates(memberlist.Broadcast) bool { return false }
func (b simpleBroadcast) Finished()


TransmitLimitedQueue主要用于处理广播消息。有两个主要的方法: QueueBroadcast 和 GetBroadcasts ,前者用于保存广播消息,后者用于在发送的时候获取需要广播的消息。随gossip周期性调度或在处理 pingMsg 等消息时调用 GetBroadcasts 方法。

// TransmitLimitedQueue is used to queue messages to broadcast to
// the cluster (via gossip) but limits the number of transmits per
// message. It also prioritizes messages with lower transmit counts
// (hence newer messages).
type TransmitLimitedQueue struct {
	// NumNodes returns the number of nodes in the cluster. This is
	// used to determine the retransmit count, which is calculated
	// based on the log of this.
	NumNodes func() int

	// RetransmitMult is the multiplier used to determine the maximum
	// number of retransmissions attempted.
	RetransmitMult int

	mu    sync.Mutex
	tq    *btree.BTree // stores *limitedBroadcast as btree.Item
	tm    map[string]*limitedBroadcast
	idGen int64



GossipInterval 周期性调度的有两个方法:

  • gossip :用于同步 aliveMsg 、 deadMsg 、 suspectMsg 消息
  • probe :用于使用 pingMsg 消息探测节点状态
// GossipInterval and GossipNodes are used to configure the gossip
	// behavior of memberlist.
	// GossipInterval is the interval between sending messages that need
	// to be gossiped that haven't been able to piggyback on probing messages.
	// If this is set to zero, non-piggyback gossip is disabled. By lowering
	// this value (more frequent) gossip messages are propagated across
	// the cluster more quickly at the expense of increased bandwidth.
	// GossipNodes is the number of random nodes to send gossip messages to
	// per GossipInterval. Increasing this number causes the gossip messages
	// to propagate across the cluster more quickly at the expense of
	// increased bandwidth.
	// GossipToTheDeadTime is the interval after which a node has died that
	// we will still try to gossip to it. This gives it a chance to refute.
	GossipInterval      time.Duration
	GossipNodes         int
	GossipToTheDeadTime time.Duration


  • 周期性同步:
    • 以 PushPullInterval 为周期,使用 Delegate.LocalState 和 Delegate.MergeRemoteState 以TCP方式同步用户信息;
    • 使用 Delegate.GetBroadcasts 随gossip发送用户信息。
  • 主动发送:使用 SendReliable 等方法实现主动发送用户消息。


alertmanager通过两种方式发送用户消息,即UDP方式和TCP方式。在alertmanager中,当要发送的数据大于 MaxGossipPacketSize/2 将采用TCP方式( SendReliable 方法),否则使用UDP方式( Broadcast 接口)。

func (c *Channel) Broadcast(b []byte) {
	b, err := proto.Marshal(&clusterpb.Part{Key: c.key, Data: b})
	if err != nil {

	if OversizedMessage(b) {
		select {
		case c.msgc <- b: //从c.msgc 接收数据,并使用SendReliable发送
			level.Debug(c.logger).Log("msg", "oversized gossip channel full")
	} else {

func OversizedMessage(b []byte) bool {
	return len(b) > MaxGossipPacketSize/2


这里 实现了一个简单的基于gossip管理集群信息,并通过TCP给集群成员发送信息的例子。