tikv go-client间隔1秒使用,事务请求的时延会翻倍

这个目前为止我没看出有啥毛病。持续的跑,是周期性的波动还是逐渐增加?

很稳定在平均值附近,我通过加载监控指标发现一个奇怪的数值:

tikv_client_go_batch_recv_latency_bucket{result="ok",le="4.194304e+09"} 1067 
tikv_client_go_batch_recv_latency_bucket{result="ok",le="8.192e+06"} 7527   

sleep的测试时延是4秒,而no sleep是8毫秒。这个是server端攒够请求再回包吗?

tikv_client_go_txn_commit_backoff_seconds_bucket{le="0.001"} 24
tikv_client_go_txn_cmd_duration_seconds_bucket{scope="general",type="commit",le="0.002"} 1
tikv_client_go_txn_cmd_duration_seconds_bucket{scope="general",type="commit",le="0.004"} 23
tikv_client_go_request_seconds_bucket{scope="false",stale_read="false",store="4",type="Prewrite",le="0.002"} 6
tikv_client_go_request_seconds_bucket{scope="false",stale_read="false",store="4",type="Prewrite",le="0.004"} 23

batch_size = 0 时,这个几个bucket的95分位在2毫秒左右,能看出什么问题吗?
batch size =0 ,server端会攒批吗?
batch size != 0的时延很高,看起来和攒批有关

tikv_client_go_request_seconds_bucket{scope="true",stale_read="false",store="0",type="StoreSafeTS",le="0.001"} 33 

这个指标应该不在commit的链路里把,95分位时延也是1ms了

  1. 尝试将多次写操作合并成一次或几次批量写入,利用TiKV的Batch接口减少网络往返和Raft提交次数。批量操作能显著减少提交延迟和网络开销。

关闭cmd-batch,client-go继续使用batch接口可以吗?似乎server端在攒batch,单条put时延很高

我尝试复现了一下,但没有遇到相同的情况

sleep 1s 的情况下 gRPC p99 duration 稳定在 1.6ms/796us (prewrite/commit) (忽略两个尖峰,条件有限环境不干净,有其他后台任务)
不 sleep:~5ms/~1.4ms (prewrite/commit)

(左侧 sleep 1s,18:32:30 开始是无 sleep workload)

PD x 1 + TiKV x 3, v6.5.8
client go v2.0.7 (c853ddc68c41ff9dad1ed8ce11c3f062a84bf220)

package main

import (
	"context"
	"flag"
	"os"
	"strings"
	"time"

	"github.com/tikv/client-go/v2/txnkv"
)

var (
	client  *txnkv.Client
	pdAddr  = flag.String("pd", "127.0.0.1:2379", "pd address")
	doSleep = flag.Bool("sleep", false, "do sleep")
)

// Init initializes information.
func initStore() {
	var err error
	client, err = txnkv.NewClient([]string{*pdAddr})
	if err != nil {
		panic(err)
	}
}

func lockPuts(keys [][]byte, values [][]byte) error {
	tx, err := client.Begin()
	if err != nil {
		return err
	}
	tx.SetPessimistic(false)
	// err = tx.LockKeysWithWaitTime(context.Background(), kv.LockNoWait, keys...)
	// if err != nil {
	// 		return err
	// }

	for i := 0; i < len(keys); i++ {
		key, val := keys[i], values[i]
		err := tx.Set(key, val)
		if err != nil {
			tx.Rollback()
			return err
		}
	}

	tx.Commit(context.Background())
	return nil
}

func main() {
	pdAddr := os.Getenv("PD_ADDR")
	if pdAddr != "" {
		os.Args = append(os.Args, "-pd", pdAddr)
	}
	flag.Parse()
	initStore()

	for {
		key := []byte("key")
		value := []byte(strings.Repeat("v", 65536))

		err := lockPuts([][]byte{key}, [][]byte{value})
		if err != nil {
			panic(err)
		}

		if *doSleep {
			time.Sleep(time.Second)
		}
	}
}

PD 或者 TiKV 有什么特殊配置吗?

1 个赞

没有特殊配置,tiup部署的时候默认配置,你这个是nvme盘吗?2毫秒的时延是正常值?
你加这两个参数试一下:

tx.SetEnable1PC(true)
tx.SetEnableAsyncCommit(true)

我后面测的2.3ms (sleep 1 secs)/ 1.2ms(no sleep)是 1pc的场景

avg duration能看到是多少吗?

请提供可复现问题的最小可运行代码吧,以及相应的 PD 和 TiKV 配置

1 个赞
package main

import (
	"context"
	"flag"
	"fmt"
	"time"
	"strings"
	"errors"

       "net/http"
       _ "net/http/pprof"

	//"github.com/tikv/client-go/v2/kv"
	"github.com/tikv/client-go/v2/config"
	"github.com/tikv/client-go/v2/txnkv"
	"github.com/tikv/client-go/v2/metrics"
	"github.com/prometheus/client_golang/prometheus/promhttp"
)


type Config struct {
	StoreEngine    string
	DataPath       string
	TikvClientUrls string
	TikvLockKeys   bool
}

type KV struct {
	Key   []byte
	Value []byte
}

type Batch struct {
	KV
	IsDeleted bool
}

type tikvStore struct {
	client *txnkv.Client
	cfg    *Config
}

func newTikvStore(cfg *Config) (*tikvStore, error) {
	store := &tikvStore{
		cfg: cfg,
	}
	var err error
	fmt.Print(cfg.TikvClientUrls)
	addr := strings.Split(cfg.TikvClientUrls, ",")
	store.client, err = txnkv.NewClient(addr)
	if err != nil {
		return nil, err
	}

	return store, nil
}

func (t *tikvStore) Engine() string {
	return t.cfg.StoreEngine
}

func (t *tikvStore) beginPessimisticTxn(ctx context.Context) (*txnkv.KVTxn, error) {
	select {
	case <-ctx.Done():
		return nil, errors.New("cancel")
	default:
		break
	}

	txn, err := t.client.Begin()
	if err != nil {
		return nil, err
	}
	txn.SetPessimistic(false)
	txn.SetEnable1PC(true)
	txn.SetEnableAsyncCommit(true)

	return txn, nil
}

func (t *tikvStore) WriteBatch(ctx context.Context, b []*Batch) (err error) {
	tx, err := t.beginPessimisticTxn(ctx)
	if err != nil {
		return err
	}

	if t.cfg.TikvLockKeys {
		keys := make([][]byte, 0, len(b))
		for _, kv := range b {
			keys = append(keys, kv.Key)
		}

		if err != nil {
			return err
		}
	}

	defer func() {
		if err != nil {
			txErr := tx.Rollback()
			if txErr != nil {
				panic(txErr)
			}
		}
	}()

	for _, kv := range b {
		if len(kv.Key) == 0 {
			return errors.New("empty key is not supported")
		}

		if kv.IsDeleted {
			err := tx.Delete(kv.Key)
			if err != nil {
				return err
			}
		} else {
			err := tx.Set(kv.Key, kv.Value)
			if err != nil {
				return err
			}
		}
	}

	err = tx.Commit(ctx)

	return err
}

var (
	pdAddr   = flag.String("pd", "127.0.0.1:2379", "pd address")
	action   = flag.String("action", "put", "put or lockPut")
	count    = flag.Int("count", 10000, "loop count")
	valueLen = flag.Int("valueLen", 2048, "value length")
)


func main() {
	flag.Parse()

	if *pdAddr == "" {
		panic("pd nil")
	}

	metrics.InitMetrics("tikv", "client_go")
	metrics.RegisterMetrics()
	http.Handle("/metrics", promhttp.Handler())


       httpaddr := fmt.Sprintf("%s:%d", "10.22.126.33",8080)
       go func() {
               http.ListenAndServe(httpaddr, nil)
       }()

	config.UpdateGlobal(func(c *config.Config) {
		c.TiKVClient.MaxBatchSize = 0
	})


	cfg := &Config {
		StoreEngine: "tikv",
		TikvClientUrls: *pdAddr,
	}

	if *action == "lockPut" {
		cfg.TikvLockKeys = true
	} else {
		cfg.TikvLockKeys = false
	}

	s,err := newTikvStore(cfg)
	if err != nil {
		panic(err)
	}

        value := make([]byte, *valueLen)
        for i := 0; i < *valueLen; i++ {
                value[i] = byte(i)
        }
	b := &Batch{
		KV: KV{
			Key:  []byte("key1dasdasdasddddddddd"),
			Value: value,
		},
		IsDeleted: false,
	}

	b1 := &Batch{
                KV: KV{
                        Key:  []byte("key2dasdasdasddddddddd"),
                        Value: value,
                },
                IsDeleted: false,
        }

	go func() {
		for {
			s.WriteBatch(context.Background(), []*Batch{b1})
		}
	}()

	var total int64 = 0
	for i := 0; i < *count; i++ {
		start := time.Now()
		err := s.WriteBatch(context.Background(), []*Batch{b})
		if err != nil {
			panic(err)
		}
		end := time.Since(start)
		fmt.Println("cost", end.Microseconds())
		total += end.Microseconds()
		//time.Sleep(1*time.Second)
	}

	fmt.Println("avg: %d", total/int64(*count))
}

v6.5.8的默认配置,什么都没有显示

# WARNING: This file is auto-generated. Do not edit! All your modification will be overwritten!
# You can use 'tiup cluster edit-config' and 'tiup cluster reload' to update the configuration
# All configuration items you want to change can be added to:
# server_configs:
#   tikv:
#     aa.b1.c3: value
#     aa.b2.c4: value
# WARNING: This file is auto-generated. Do not edit! All your modification will be overwritten!
# You can use 'tiup cluster edit-config' and 'tiup cluster reload' to update the configuration
# All configuration items you want to change can be added to:
# server_configs:
#   pd:
#     aa.b1.c3: value
#     aa.b2.c4: value

师兄,能问下你们测试用什么部署,我用TiOperator部署时,PD老是起不来。