Golang操作BoltDB数据库

SuKai March 2, 2022

boltdb是一个go语言开发的嵌入式kv数据库。

今天我们一起看看golang如何操作BoltDB,再看看Etcd是如何操作BoltDB的,Etcd对BoltDB做了很多优化,复杂度提高很多。

Etcd是一个基于Raft开发的分布式key-value存储。Kubernetes使用Etcd来作为集群数据的存储。Etcd存储层包含预写日志(WAL)、快照(Snapshot)、boltdb。其中WAL与Snapshot实现了故障恢复和数据回滚或重做,让数据尽量不丢失,boltdb则保存了集群元数据和用户写入的数据。

使用场景

在自动驾驶汽车算法模型开发中,需要大量的2D图片和3D点云图片,我们将这些图片上传到打标系统,进行数据标注,标注的数据用于算法模型的数据集。

这些图片文件的上传,需要有一个有状态的客户端来完成文件上传工作,客户端可以在本地创建上传任务,记录上传位置,任务启停等功能。

这里我们使用go来开发客户端,选择boltdb这种简单的嵌入式kv数据库作为存储。

BoltDB基本概念

DB数据库文件

BoltDB的数据库只有一个文件,一个进程打开此文件后,其他进程无法使用,只有等这个进程释放文件锁。对应关系型数据库中database。

Bucket

对应关系型数据库中的table,操作有CreateBucket, CreateBucketIfNotExists, DeleteBucket。

Key/Value键值对

boltdb中键值对都是使用字节数组值进行存储,keys以字节排序的顺序存储在一个bucket中。

Transaction事务

boltdb在某一时刻,只允许一个读写事务或者允许多个只读事务。用户可以通过db的Begin方法启动一个事务,通过Rollback和Commit方法自己控制提交和回滚,Close关闭事务。同时boltdb还提供了内置隐式事务Update, View, Batch方法。

读写事务

db.Update()启动一个读写事务,可重复读的事务。return error,回滚整个修改,return nil提交修改。

只读事务

db.View()打开一个只读事务,无法做写入操作。

批处理读写事务

db.Batch()多次读写事务合并为一次事务,使用上和Update读写事务相同,boltdb自动将其分批,分批写入磁盘减少并发读写事务等待磁盘I/O的开销。

代码学习

普通使用案例

定义Client接口,实现CreateBucketIfNotExists创建Bucket,Put写入更新数据,Delete删除数据,Range查询单条数据或数据列表,ForEach遍历数据。可以看到只使用了两种不同的事务Update和View来完成操作。

主要操作有三种Get, Put, Delete,这里只用到了Put,Delete操作,通过Range获取单条或者多条数据,另外通过Cursor游标,Seek偏移,Next来遍历数据。

package boltdb

import (
	"bytes"
	"math"

	"go.etcd.io/bbolt"
	bolt "go.etcd.io/bbolt"
	"go.uber.org/zap"
)

type client struct {
	db  *bbolt.DB
	log *zap.Logger
}

type Client interface {
	CreateBucketIfNotExists(bucket Bucket)
	Put(bucketType Bucket, key, value []byte)
	Delete(bucketType Bucket, key []byte)
	Range(bucketType Bucket, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte)
	ForEach(bucketType Bucket, visitor func(k, v []byte) error)
	Close() error
}

func NewClient(path string, log *zap.Logger) Client {
	db, err := bbolt.Open(path, 0600, nil)
	if err != nil {
		log.Panic("failed to open database", zap.String("path", path), zap.Error(err))
	}

	return &client{db: db, log: log}
}

func (c *client) CreateBucketIfNotExists(bucketType Bucket) {
	_ = c.db.Update(func(tx *bolt.Tx) error {
		_, err := tx.CreateBucketIfNotExists(bucketType.Name())
		if err != nil && err != bolt.ErrBucketExists {
			c.log.Error(
				"failed to create a bucket",
				zap.Stringer("bucket-name", bucketType),
				zap.Error(err),
			)
		}
		return err
	})
}

func (c *client) Put(bucketType Bucket, key, value []byte) {
	_ = c.db.Update(func(tx *bolt.Tx) error {
		bucket := tx.Bucket(bucketType.Name())
		if bucket == nil {
			c.log.Error("failed to find a bucket",
				zap.String("bucket-name", bucketType.String()))
		}

		err := bucket.Put(key, value)
		if err != nil {
			c.log.Error(
				"failed to write to a bucket",
				zap.Stringer("bucket-name", bucketType),
				zap.Error(err),
			)
		}
		return err
	})
}

func (c *client) Delete(bucketType Bucket, key []byte) {
	_ = c.db.Update(func(tx *bolt.Tx) error {
		b := tx.Bucket(bucketType.Name())
		if b == nil {
			return bolt.ErrBucketNotFound
		}

		err := b.Delete(key)
		if err != nil {
			c.log.Error(
				"failed to delete a key",
				zap.Stringer("bucket-name", bucketType),
				zap.Error(err),
			)
		}

		return err
	})
}

func (c *client) Range(bucketType Bucket, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
	if limit <= 0 {
		limit = math.MaxInt64
	}

	var isMatch func(b []byte) bool
	if len(endKey) > 0 {
		isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
	} else {
		isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
		limit = 1
	}

	_ = c.db.View(func(tx *bolt.Tx) error {
		cursor := tx.Bucket(bucketType.Name()).Cursor()

		for ck, cv := cursor.Seek(key); ck != nil && isMatch(ck); ck, cv = cursor.Next() {
			vs = append(vs, cv)
			keys = append(keys, ck)
			if limit == int64(len(keys)) {
				break
			}
		}

		return nil
	})

	return keys, vs
}

func (c *client) ForEach(bucketType Bucket, visitor func(k, v []byte) error) {
	_ = c.db.View(func(tx *bolt.Tx) error {
		return tx.Bucket(bucketType.Name()).ForEach(visitor)
	})
}

func (c *client) Close() error {
	return c.db.Close()
}

Etcd代码

在Etcd代码中实现了读写分离,可以看到Etcd的数据读写都是通过boltdb原生的事务实现的,同时为了减少磁盘写入,batch批量进行写操作。同时Etcd增加了对boltdb的Cache功能,在Cache中读写数据。

下面我们粗略看一下Etcd代码:

1,Backend接口完成对boltdb的操作

2,其中ReadTx接口完成boltdb的读操作

3,其中BatchTx接口完成boltdb的批量写操作

Backend接口

Etcd定义了一个Backend接口来完成boltdb的操作,其中包含ReadTx接口实现只读事务,BatchTx接口实现读写事务。

可以看到在构造backend时:

1,readTx有两个buffer,一个是txReadBuffer,一个是txReadBufferCache,这两个buffer一个用于ReadTx,一个用于ConcurrentReadTx,txReadBufferCache是对txReadBuffer的镜像。

2,batchTx是使用的batchTxBuffered,也就是写操作调用batchTxBuffered的方法,下面介绍batchTxBuffered方法时可以看到,batchTxBuffered的写操作都是先写boltdb,再写buffer,同时还会将buffer的数据同步到readTx的txReadBuffer中去。

3,backend的run方法中,会有一个定时器,根据batchInterval定时将批量写操作进行提交写入磁盘,同时关闭时也进行提交。下面介绍commit时,可以发现有三种情况会提交磁盘:1,batchInterval时间间隔到期。2,BatchLimit写操作数量达到限制。3,关闭时。

type Backend interface {
   // ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
   ReadTx() ReadTx
   BatchTx() BatchTx
   // ConcurrentReadTx returns a non-blocking read transaction.
   ConcurrentReadTx() ReadTx

   Snapshot() Snapshot
   Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error)
   // Size returns the current size of the backend physically allocated.
   // The backend can hold DB space that is not utilized at the moment,
   // since it can conduct pre-allocation or spare unused space for recycling.
   // Use SizeInUse() instead for the actual DB size.
   Size() int64
   // SizeInUse returns the current size of the backend logically in use.
   // Since the backend can manage free space in a non-byte unit such as
   // number of pages, the returned value can be not exactly accurate in bytes.
   SizeInUse() int64
   // OpenReadTxN returns the number of currently open read transactions in the backend.
   OpenReadTxN() int64
   Defrag() error
   ForceCommit()
   Close() error
}


func newBackend(bcfg BackendConfig) *backend {
	
	b := &backend{
		bopts: bopts,
		db:    db,

		batchInterval: bcfg.BatchInterval,
		batchLimit:    bcfg.BatchLimit,

		readTx: &readTx{
			baseReadTx: baseReadTx{
				buf: txReadBuffer{
					txBuffer:   txBuffer{make(map[BucketID]*bucketBuffer)},
					bufVersion: 0,
				},
				buckets: make(map[BucketID]*bolt.Bucket),
				txWg:    new(sync.WaitGroup),
				txMu:    new(sync.RWMutex),
			},
		},
		txReadBufferCache: txReadBufferCache{
			mu:         sync.Mutex{},
			bufVersion: 0,
			buf:        nil,
		},

	}

	b.batchTx = newBatchTxBuffered(b)


	go b.run()
	return b
}


func (b *backend) run() {
	defer close(b.donec)
	t := time.NewTimer(b.batchInterval)
	defer t.Stop()
	for {
		select {
		case <-t.C:
		case <-b.stopc:
			b.batchTx.CommitAndStop()
			return
		}
		if b.batchTx.safePending() != 0 {
			b.batchTx.Commit()
		}
		t.Reset(b.batchInterval)
	}
}

ReadTx只读接口

baseReadTx实现了ReadTx接口,可以看到baseReadTx中有两个读写锁,用于保护对txReadBuffer和boltdb的数据访问。txReadBuffer用于是上面提到的Cache缓存。

type ReadTx interface {
   Lock()
   Unlock()
   RLock()
   RUnlock()

   UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
   UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error
}

type baseReadTx struct {
	// mu protects accesses to the txReadBuffer
	mu  sync.RWMutex
	buf txReadBuffer

	// TODO: group and encapsulate {txMu, tx, buckets, txWg}, as they share the same lifecycle.
	// txMu protects accesses to buckets and tx on Range requests.
	txMu    *sync.RWMutex
	tx      *bolt.Tx
	buckets map[BucketID]*bolt.Bucket
	// txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
	txWg *sync.WaitGroup
}

UnsafeForEach方法

实现对bucket遍历调用visitor处理数据。

步骤主要为:

1,从buf中遍历bucket数据,调用getDups将数据写入到dups Map中。

2,从boltdb中遍历bucket数据,调用visitNoDup将不在buf中的数据调用visitor进行数据处理。

3,再次从buf中遍历bucket数据,调用visitor进行数据处理。

func (baseReadTx *baseReadTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
   dups := make(map[string]struct{})
   getDups := func(k, v []byte) error {
      dups[string(k)] = struct{}{}
      return nil
   }
   visitNoDup := func(k, v []byte) error {
      if _, ok := dups[string(k)]; ok {
         return nil
      }
      return visitor(k, v)
   }
   if err := baseReadTx.buf.ForEach(bucket, getDups); err != nil {
      return err
   }
   baseReadTx.txMu.Lock()
   err := unsafeForEach(baseReadTx.tx, bucket, visitNoDup)
   baseReadTx.txMu.Unlock()
   if err != nil {
      return err
   }
   return baseReadTx.buf.ForEach(bucket, visitor)
}

UnsafeRange方法

UnsafeRange实现获取bucket中单条或多条数据

步骤主要为:

1,从buf中获取bucket的数据

2,从boltdb中获取bucket的数据

3,合并buf和boltdb数据

func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
   if endKey == nil {
      // forbid duplicates for single keys
      limit = 1
   }
   if limit <= 0 {
      limit = math.MaxInt64
   }
   if limit > 1 && !bucketType.IsSafeRangeBucket() {
      panic("do not use unsafeRange on non-keys bucket")
   }
   keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit)
   if int64(len(keys)) == limit {
      return keys, vals
   }

   // find/cache bucket
   bn := bucketType.ID()
   baseReadTx.txMu.RLock()
   bucket, ok := baseReadTx.buckets[bn]
   baseReadTx.txMu.RUnlock()
   lockHeld := false
   if !ok {
      baseReadTx.txMu.Lock()
      lockHeld = true
      bucket = baseReadTx.tx.Bucket(bucketType.Name())
      baseReadTx.buckets[bn] = bucket
   }

   // ignore missing bucket since may have been created in this batch
   if bucket == nil {
      if lockHeld {
         baseReadTx.txMu.Unlock()
      }
      return keys, vals
   }
   if !lockHeld {
      baseReadTx.txMu.Lock()
   }
   c := bucket.Cursor()
   baseReadTx.txMu.Unlock()

   k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
   return append(k2, keys...), append(v2, vals...)
}

BatchTx接口

batchTx,batchTxBuffered实现BatchTx接口,分别针对boltdb和buffer数据的操作,可以看到batchTx中有一个tx事务指针,有一个sync.Mutex互斥锁,有一个pending计数器,这个pending计数器用于写操作的计数,每一次put/delete操作pending++,当数量到达backend.batchLimit时,会触发一次commit。

type BatchTx interface {
   ReadTx
   UnsafeCreateBucket(bucket Bucket)
   UnsafeDeleteBucket(bucket Bucket)
   UnsafePut(bucket Bucket, key []byte, value []byte)
   UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
   UnsafeDelete(bucket Bucket, key []byte)
   // Commit commits a previous tx and begins a new writable one.
   Commit()
   // CommitAndStop commits the previous tx and does not create a new one.
   CommitAndStop()
}

type batchTx struct {
	sync.Mutex
	tx      *bolt.Tx
	backend *backend

	pending int
}

type batchTxBuffered struct {
	batchTx
	buf txWriteBuffer
}

Commit提交

上面提到Etcd都是使用原生的boltdb事务来进行数据操作的,并且是批量写入磁盘的。在Etcd构造backend实例时会第一次调用commit,之后Etcd每次调用commit时,会根据stop参数判断,只要不是关闭,都会调用begin再开启一个新的事务赋值给batchTx.tx。这样Etcd的操作都在一个batch事务里。

func (t *batchTx) Commit() {
   t.Lock()
   t.commit(false)
   t.Unlock()
}

func (t *batchTxBuffered) Commit() {
	t.Lock()
	t.commit(false)
	t.Unlock()
}

func (t *batchTx) CommitAndStop() {
	t.Lock()
	t.commit(true)
	t.Unlock()
}

func (t *batchTx) commit(stop bool) {
	// commit the last tx
	if t.tx != nil {
		if t.pending == 0 && !stop {
			return
		}

		start := time.Now()

		// gofail: var beforeCommit struct{}
		err := t.tx.Commit()
		// gofail: var afterCommit struct{}

		rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
		spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
		writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
		commitSec.Observe(time.Since(start).Seconds())
		atomic.AddInt64(&t.backend.commits, 1)

		t.pending = 0
		if err != nil {
			t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
		}
	}
	if !stop {
		t.tx = t.backend.begin(true)
	}
}

batchTxBuffered的Put方法

写入操作都会同时写buffer和boltdb

func (t *batchTxBuffered) UnsafePut(bucket Bucket, key []byte, value []byte) {
   t.batchTx.UnsafePut(bucket, key, value)
   t.buf.put(bucket, key, value)
}

func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
   t.batchTx.UnsafeSeqPut(bucket, key, value)
   t.buf.putSeq(bucket, key, value)
}

batchTxBuffered的Unlock方法

主要调用writeback实现从写buffer到读buffer的数据同步

func (t *batchTxBuffered) Unlock() {
   if t.pending != 0 {
      t.backend.readTx.Lock() // blocks txReadBuffer for writing.
      t.buf.writeback(&t.backend.readTx.buf)
      t.backend.readTx.Unlock()
      if t.pending >= t.backend.batchLimit {
         t.commit(false)
      }
   }
   t.batchTx.Unlock()
}

总结一下,Etcd对boltdb使用更深入,自已管理事务,实现读写缓存,读写缓存之间进行同步,提高了性能,减少了磁盘I/O。

至此,我们了解了golang操作的boltdb方法,对boltdb的知识有了一些初步了解。我们也要开始使用boltdb作为我们的客户端数据库了。