由 SuKai March 2, 2022
boltdb是一个go语言开发的嵌入式kv数据库。
今天我们一起看看golang如何操作BoltDB,再看看Etcd是如何操作BoltDB的,Etcd对BoltDB做了很多优化,复杂度提高很多。
Etcd是一个基于Raft开发的分布式key-value存储。Kubernetes使用Etcd来作为集群数据的存储。Etcd存储层包含预写日志(WAL)、快照(Snapshot)、boltdb。其中WAL与Snapshot实现了故障恢复和数据回滚或重做,让数据尽量不丢失,boltdb则保存了集群元数据和用户写入的数据。
使用场景
在自动驾驶汽车算法模型开发中,需要大量的2D图片和3D点云图片,我们将这些图片上传到打标系统,进行数据标注,标注的数据用于算法模型的数据集。
这些图片文件的上传,需要有一个有状态的客户端来完成文件上传工作,客户端可以在本地创建上传任务,记录上传位置,任务启停等功能。
这里我们使用go来开发客户端,选择boltdb这种简单的嵌入式kv数据库作为存储。
BoltDB基本概念
DB数据库文件
BoltDB的数据库只有一个文件,一个进程打开此文件后,其他进程无法使用,只有等这个进程释放文件锁。对应关系型数据库中database。
Bucket
对应关系型数据库中的table,操作有CreateBucket, CreateBucketIfNotExists, DeleteBucket。
Key/Value键值对
boltdb中键值对都是使用字节数组值进行存储,keys以字节排序的顺序存储在一个bucket中。
Transaction事务
boltdb在某一时刻,只允许一个读写事务或者允许多个只读事务。用户可以通过db的Begin方法启动一个事务,通过Rollback和Commit方法自己控制提交和回滚,Close关闭事务。同时boltdb还提供了内置隐式事务Update, View, Batch方法。
读写事务
db.Update()启动一个读写事务,可重复读的事务。return error,回滚整个修改,return nil提交修改。
只读事务
db.View()打开一个只读事务,无法做写入操作。
批处理读写事务
db.Batch()多次读写事务合并为一次事务,使用上和Update读写事务相同,boltdb自动将其分批,分批写入磁盘减少并发读写事务等待磁盘I/O的开销。
代码学习
普通使用案例
定义Client接口,实现CreateBucketIfNotExists创建Bucket,Put写入更新数据,Delete删除数据,Range查询单条数据或数据列表,ForEach遍历数据。可以看到只使用了两种不同的事务Update和View来完成操作。
主要操作有三种Get, Put, Delete,这里只用到了Put,Delete操作,通过Range获取单条或者多条数据,另外通过Cursor游标,Seek偏移,Next来遍历数据。
package boltdb
import (
"bytes"
"math"
"go.etcd.io/bbolt"
bolt "go.etcd.io/bbolt"
"go.uber.org/zap"
)
type client struct {
db *bbolt.DB
log *zap.Logger
}
type Client interface {
CreateBucketIfNotExists(bucket Bucket)
Put(bucketType Bucket, key, value []byte)
Delete(bucketType Bucket, key []byte)
Range(bucketType Bucket, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte)
ForEach(bucketType Bucket, visitor func(k, v []byte) error)
Close() error
}
func NewClient(path string, log *zap.Logger) Client {
db, err := bbolt.Open(path, 0600, nil)
if err != nil {
log.Panic("failed to open database", zap.String("path", path), zap.Error(err))
}
return &client{db: db, log: log}
}
func (c *client) CreateBucketIfNotExists(bucketType Bucket) {
_ = c.db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(bucketType.Name())
if err != nil && err != bolt.ErrBucketExists {
c.log.Error(
"failed to create a bucket",
zap.Stringer("bucket-name", bucketType),
zap.Error(err),
)
}
return err
})
}
func (c *client) Put(bucketType Bucket, key, value []byte) {
_ = c.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(bucketType.Name())
if bucket == nil {
c.log.Error("failed to find a bucket",
zap.String("bucket-name", bucketType.String()))
}
err := bucket.Put(key, value)
if err != nil {
c.log.Error(
"failed to write to a bucket",
zap.Stringer("bucket-name", bucketType),
zap.Error(err),
)
}
return err
})
}
func (c *client) Delete(bucketType Bucket, key []byte) {
_ = c.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(bucketType.Name())
if b == nil {
return bolt.ErrBucketNotFound
}
err := b.Delete(key)
if err != nil {
c.log.Error(
"failed to delete a key",
zap.Stringer("bucket-name", bucketType),
zap.Error(err),
)
}
return err
})
}
func (c *client) Range(bucketType Bucket, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
if limit <= 0 {
limit = math.MaxInt64
}
var isMatch func(b []byte) bool
if len(endKey) > 0 {
isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
} else {
isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
limit = 1
}
_ = c.db.View(func(tx *bolt.Tx) error {
cursor := tx.Bucket(bucketType.Name()).Cursor()
for ck, cv := cursor.Seek(key); ck != nil && isMatch(ck); ck, cv = cursor.Next() {
vs = append(vs, cv)
keys = append(keys, ck)
if limit == int64(len(keys)) {
break
}
}
return nil
})
return keys, vs
}
func (c *client) ForEach(bucketType Bucket, visitor func(k, v []byte) error) {
_ = c.db.View(func(tx *bolt.Tx) error {
return tx.Bucket(bucketType.Name()).ForEach(visitor)
})
}
func (c *client) Close() error {
return c.db.Close()
}
Etcd代码
在Etcd代码中实现了读写分离,可以看到Etcd的数据读写都是通过boltdb原生的事务实现的,同时为了减少磁盘写入,batch批量进行写操作。同时Etcd增加了对boltdb的Cache功能,在Cache中读写数据。
下面我们粗略看一下Etcd代码:
1,Backend接口完成对boltdb的操作
2,其中ReadTx接口完成boltdb的读操作
3,其中BatchTx接口完成boltdb的批量写操作
Backend接口
Etcd定义了一个Backend接口来完成boltdb的操作,其中包含ReadTx接口实现只读事务,BatchTx接口实现读写事务。
可以看到在构造backend时:
1,readTx有两个buffer,一个是txReadBuffer,一个是txReadBufferCache,这两个buffer一个用于ReadTx,一个用于ConcurrentReadTx,txReadBufferCache是对txReadBuffer的镜像。
2,batchTx是使用的batchTxBuffered,也就是写操作调用batchTxBuffered的方法,下面介绍batchTxBuffered方法时可以看到,batchTxBuffered的写操作都是先写boltdb,再写buffer,同时还会将buffer的数据同步到readTx的txReadBuffer中去。
3,backend的run方法中,会有一个定时器,根据batchInterval定时将批量写操作进行提交写入磁盘,同时关闭时也进行提交。下面介绍commit时,可以发现有三种情况会提交磁盘:1,batchInterval时间间隔到期。2,BatchLimit写操作数量达到限制。3,关闭时。
type Backend interface {
// ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
ReadTx() ReadTx
BatchTx() BatchTx
// ConcurrentReadTx returns a non-blocking read transaction.
ConcurrentReadTx() ReadTx
Snapshot() Snapshot
Hash(ignores func(bucketName, keyName []byte) bool) (uint32, error)
// Size returns the current size of the backend physically allocated.
// The backend can hold DB space that is not utilized at the moment,
// since it can conduct pre-allocation or spare unused space for recycling.
// Use SizeInUse() instead for the actual DB size.
Size() int64
// SizeInUse returns the current size of the backend logically in use.
// Since the backend can manage free space in a non-byte unit such as
// number of pages, the returned value can be not exactly accurate in bytes.
SizeInUse() int64
// OpenReadTxN returns the number of currently open read transactions in the backend.
OpenReadTxN() int64
Defrag() error
ForceCommit()
Close() error
}
func newBackend(bcfg BackendConfig) *backend {
b := &backend{
bopts: bopts,
db: db,
batchInterval: bcfg.BatchInterval,
batchLimit: bcfg.BatchLimit,
readTx: &readTx{
baseReadTx: baseReadTx{
buf: txReadBuffer{
txBuffer: txBuffer{make(map[BucketID]*bucketBuffer)},
bufVersion: 0,
},
buckets: make(map[BucketID]*bolt.Bucket),
txWg: new(sync.WaitGroup),
txMu: new(sync.RWMutex),
},
},
txReadBufferCache: txReadBufferCache{
mu: sync.Mutex{},
bufVersion: 0,
buf: nil,
},
}
b.batchTx = newBatchTxBuffered(b)
go b.run()
return b
}
func (b *backend) run() {
defer close(b.donec)
t := time.NewTimer(b.batchInterval)
defer t.Stop()
for {
select {
case <-t.C:
case <-b.stopc:
b.batchTx.CommitAndStop()
return
}
if b.batchTx.safePending() != 0 {
b.batchTx.Commit()
}
t.Reset(b.batchInterval)
}
}
ReadTx只读接口
baseReadTx实现了ReadTx接口,可以看到baseReadTx中有两个读写锁,用于保护对txReadBuffer和boltdb的数据访问。txReadBuffer用于是上面提到的Cache缓存。
type ReadTx interface {
Lock()
Unlock()
RLock()
RUnlock()
UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error
}
type baseReadTx struct {
// mu protects accesses to the txReadBuffer
mu sync.RWMutex
buf txReadBuffer
// TODO: group and encapsulate {txMu, tx, buckets, txWg}, as they share the same lifecycle.
// txMu protects accesses to buckets and tx on Range requests.
txMu *sync.RWMutex
tx *bolt.Tx
buckets map[BucketID]*bolt.Bucket
// txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
txWg *sync.WaitGroup
}
UnsafeForEach方法
实现对bucket遍历调用visitor处理数据。
步骤主要为:
1,从buf中遍历bucket数据,调用getDups将数据写入到dups Map中。
2,从boltdb中遍历bucket数据,调用visitNoDup将不在buf中的数据调用visitor进行数据处理。
3,再次从buf中遍历bucket数据,调用visitor进行数据处理。
func (baseReadTx *baseReadTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
dups := make(map[string]struct{})
getDups := func(k, v []byte) error {
dups[string(k)] = struct{}{}
return nil
}
visitNoDup := func(k, v []byte) error {
if _, ok := dups[string(k)]; ok {
return nil
}
return visitor(k, v)
}
if err := baseReadTx.buf.ForEach(bucket, getDups); err != nil {
return err
}
baseReadTx.txMu.Lock()
err := unsafeForEach(baseReadTx.tx, bucket, visitNoDup)
baseReadTx.txMu.Unlock()
if err != nil {
return err
}
return baseReadTx.buf.ForEach(bucket, visitor)
}
UnsafeRange方法
UnsafeRange实现获取bucket中单条或多条数据
步骤主要为:
1,从buf中获取bucket的数据
2,从boltdb中获取bucket的数据
3,合并buf和boltdb数据
func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if endKey == nil {
// forbid duplicates for single keys
limit = 1
}
if limit <= 0 {
limit = math.MaxInt64
}
if limit > 1 && !bucketType.IsSafeRangeBucket() {
panic("do not use unsafeRange on non-keys bucket")
}
keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit)
if int64(len(keys)) == limit {
return keys, vals
}
// find/cache bucket
bn := bucketType.ID()
baseReadTx.txMu.RLock()
bucket, ok := baseReadTx.buckets[bn]
baseReadTx.txMu.RUnlock()
lockHeld := false
if !ok {
baseReadTx.txMu.Lock()
lockHeld = true
bucket = baseReadTx.tx.Bucket(bucketType.Name())
baseReadTx.buckets[bn] = bucket
}
// ignore missing bucket since may have been created in this batch
if bucket == nil {
if lockHeld {
baseReadTx.txMu.Unlock()
}
return keys, vals
}
if !lockHeld {
baseReadTx.txMu.Lock()
}
c := bucket.Cursor()
baseReadTx.txMu.Unlock()
k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
return append(k2, keys...), append(v2, vals...)
}
BatchTx接口
batchTx,batchTxBuffered实现BatchTx接口,分别针对boltdb和buffer数据的操作,可以看到batchTx中有一个tx事务指针,有一个sync.Mutex互斥锁,有一个pending计数器,这个pending计数器用于写操作的计数,每一次put/delete操作pending++,当数量到达backend.batchLimit时,会触发一次commit。
type BatchTx interface {
ReadTx
UnsafeCreateBucket(bucket Bucket)
UnsafeDeleteBucket(bucket Bucket)
UnsafePut(bucket Bucket, key []byte, value []byte)
UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
UnsafeDelete(bucket Bucket, key []byte)
// Commit commits a previous tx and begins a new writable one.
Commit()
// CommitAndStop commits the previous tx and does not create a new one.
CommitAndStop()
}
type batchTx struct {
sync.Mutex
tx *bolt.Tx
backend *backend
pending int
}
type batchTxBuffered struct {
batchTx
buf txWriteBuffer
}
Commit提交
上面提到Etcd都是使用原生的boltdb事务来进行数据操作的,并且是批量写入磁盘的。在Etcd构造backend实例时会第一次调用commit,之后Etcd每次调用commit时,会根据stop参数判断,只要不是关闭,都会调用begin再开启一个新的事务赋值给batchTx.tx。这样Etcd的操作都在一个batch事务里。
func (t *batchTx) Commit() {
t.Lock()
t.commit(false)
t.Unlock()
}
func (t *batchTxBuffered) Commit() {
t.Lock()
t.commit(false)
t.Unlock()
}
func (t *batchTx) CommitAndStop() {
t.Lock()
t.commit(true)
t.Unlock()
}
func (t *batchTx) commit(stop bool) {
// commit the last tx
if t.tx != nil {
if t.pending == 0 && !stop {
return
}
start := time.Now()
// gofail: var beforeCommit struct{}
err := t.tx.Commit()
// gofail: var afterCommit struct{}
rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
commitSec.Observe(time.Since(start).Seconds())
atomic.AddInt64(&t.backend.commits, 1)
t.pending = 0
if err != nil {
t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
}
}
if !stop {
t.tx = t.backend.begin(true)
}
}
batchTxBuffered的Put方法
写入操作都会同时写buffer和boltdb
func (t *batchTxBuffered) UnsafePut(bucket Bucket, key []byte, value []byte) {
t.batchTx.UnsafePut(bucket, key, value)
t.buf.put(bucket, key, value)
}
func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
t.batchTx.UnsafeSeqPut(bucket, key, value)
t.buf.putSeq(bucket, key, value)
}
batchTxBuffered的Unlock方法
主要调用writeback实现从写buffer到读buffer的数据同步
func (t *batchTxBuffered) Unlock() {
if t.pending != 0 {
t.backend.readTx.Lock() // blocks txReadBuffer for writing.
t.buf.writeback(&t.backend.readTx.buf)
t.backend.readTx.Unlock()
if t.pending >= t.backend.batchLimit {
t.commit(false)
}
}
t.batchTx.Unlock()
}
总结一下,Etcd对boltdb使用更深入,自已管理事务,实现读写缓存,读写缓存之间进行同步,提高了性能,减少了磁盘I/O。
至此,我们了解了golang操作的boltdb方法,对boltdb的知识有了一些初步了解。我们也要开始使用boltdb作为我们的客户端数据库了。