这个目前为止我没看出有啥毛病。持续的跑,是周期性的波动还是逐渐增加?
很稳定在平均值附近,我通过加载监控指标发现一个奇怪的数值:
tikv_client_go_batch_recv_latency_bucket{result="ok",le="4.194304e+09"} 1067
tikv_client_go_batch_recv_latency_bucket{result="ok",le="8.192e+06"} 7527
sleep的测试时延是4秒,而no sleep是8毫秒。这个是server端攒够请求再回包吗?
tikv_client_go_txn_commit_backoff_seconds_bucket{le="0.001"} 24
tikv_client_go_txn_cmd_duration_seconds_bucket{scope="general",type="commit",le="0.002"} 1
tikv_client_go_txn_cmd_duration_seconds_bucket{scope="general",type="commit",le="0.004"} 23
tikv_client_go_request_seconds_bucket{scope="false",stale_read="false",store="4",type="Prewrite",le="0.002"} 6
tikv_client_go_request_seconds_bucket{scope="false",stale_read="false",store="4",type="Prewrite",le="0.004"} 23
batch_size = 0 时,这个几个bucket的95分位在2毫秒左右,能看出什么问题吗?
batch size =0 ,server端会攒批吗?
batch size != 0的时延很高,看起来和攒批有关
tikv_client_go_request_seconds_bucket{scope="true",stale_read="false",store="0",type="StoreSafeTS",le="0.001"} 33
这个指标应该不在commit的链路里把,95分位时延也是1ms了
- 尝试将多次写操作合并成一次或几次批量写入,利用TiKV的Batch接口减少网络往返和Raft提交次数。批量操作能显著减少提交延迟和网络开销。
关闭cmd-batch,client-go继续使用batch接口可以吗?似乎server端在攒batch,单条put时延很高
我尝试复现了一下,但没有遇到相同的情况
sleep 1s 的情况下 gRPC p99 duration 稳定在 1.6ms/796us (prewrite/commit) (忽略两个尖峰,条件有限环境不干净,有其他后台任务)
不 sleep:~5ms/~1.4ms (prewrite/commit)
(左侧 sleep 1s,18:32:30 开始是无 sleep workload)
PD x 1 + TiKV x 3, v6.5.8
client go v2.0.7 (c853ddc68c41ff9dad1ed8ce11c3f062a84bf220)
package main
import (
"context"
"flag"
"os"
"strings"
"time"
"github.com/tikv/client-go/v2/txnkv"
)
var (
client *txnkv.Client
pdAddr = flag.String("pd", "127.0.0.1:2379", "pd address")
doSleep = flag.Bool("sleep", false, "do sleep")
)
// Init initializes information.
func initStore() {
var err error
client, err = txnkv.NewClient([]string{*pdAddr})
if err != nil {
panic(err)
}
}
func lockPuts(keys [][]byte, values [][]byte) error {
tx, err := client.Begin()
if err != nil {
return err
}
tx.SetPessimistic(false)
// err = tx.LockKeysWithWaitTime(context.Background(), kv.LockNoWait, keys...)
// if err != nil {
// return err
// }
for i := 0; i < len(keys); i++ {
key, val := keys[i], values[i]
err := tx.Set(key, val)
if err != nil {
tx.Rollback()
return err
}
}
tx.Commit(context.Background())
return nil
}
func main() {
pdAddr := os.Getenv("PD_ADDR")
if pdAddr != "" {
os.Args = append(os.Args, "-pd", pdAddr)
}
flag.Parse()
initStore()
for {
key := []byte("key")
value := []byte(strings.Repeat("v", 65536))
err := lockPuts([][]byte{key}, [][]byte{value})
if err != nil {
panic(err)
}
if *doSleep {
time.Sleep(time.Second)
}
}
}
PD 或者 TiKV 有什么特殊配置吗?
没有特殊配置,tiup部署的时候默认配置,你这个是nvme盘吗?2毫秒的时延是正常值?
你加这两个参数试一下:
tx.SetEnable1PC(true)
tx.SetEnableAsyncCommit(true)
我后面测的2.3ms (sleep 1 secs)/ 1.2ms(no sleep)是 1pc的场景
avg duration能看到是多少吗?
请提供可复现问题的最小可运行代码吧,以及相应的 PD 和 TiKV 配置
package main
import (
"context"
"flag"
"fmt"
"time"
"strings"
"errors"
"net/http"
_ "net/http/pprof"
//"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/txnkv"
"github.com/tikv/client-go/v2/metrics"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
type Config struct {
StoreEngine string
DataPath string
TikvClientUrls string
TikvLockKeys bool
}
type KV struct {
Key []byte
Value []byte
}
type Batch struct {
KV
IsDeleted bool
}
type tikvStore struct {
client *txnkv.Client
cfg *Config
}
func newTikvStore(cfg *Config) (*tikvStore, error) {
store := &tikvStore{
cfg: cfg,
}
var err error
fmt.Print(cfg.TikvClientUrls)
addr := strings.Split(cfg.TikvClientUrls, ",")
store.client, err = txnkv.NewClient(addr)
if err != nil {
return nil, err
}
return store, nil
}
func (t *tikvStore) Engine() string {
return t.cfg.StoreEngine
}
func (t *tikvStore) beginPessimisticTxn(ctx context.Context) (*txnkv.KVTxn, error) {
select {
case <-ctx.Done():
return nil, errors.New("cancel")
default:
break
}
txn, err := t.client.Begin()
if err != nil {
return nil, err
}
txn.SetPessimistic(false)
txn.SetEnable1PC(true)
txn.SetEnableAsyncCommit(true)
return txn, nil
}
func (t *tikvStore) WriteBatch(ctx context.Context, b []*Batch) (err error) {
tx, err := t.beginPessimisticTxn(ctx)
if err != nil {
return err
}
if t.cfg.TikvLockKeys {
keys := make([][]byte, 0, len(b))
for _, kv := range b {
keys = append(keys, kv.Key)
}
if err != nil {
return err
}
}
defer func() {
if err != nil {
txErr := tx.Rollback()
if txErr != nil {
panic(txErr)
}
}
}()
for _, kv := range b {
if len(kv.Key) == 0 {
return errors.New("empty key is not supported")
}
if kv.IsDeleted {
err := tx.Delete(kv.Key)
if err != nil {
return err
}
} else {
err := tx.Set(kv.Key, kv.Value)
if err != nil {
return err
}
}
}
err = tx.Commit(ctx)
return err
}
var (
pdAddr = flag.String("pd", "127.0.0.1:2379", "pd address")
action = flag.String("action", "put", "put or lockPut")
count = flag.Int("count", 10000, "loop count")
valueLen = flag.Int("valueLen", 2048, "value length")
)
func main() {
flag.Parse()
if *pdAddr == "" {
panic("pd nil")
}
metrics.InitMetrics("tikv", "client_go")
metrics.RegisterMetrics()
http.Handle("/metrics", promhttp.Handler())
httpaddr := fmt.Sprintf("%s:%d", "10.22.126.33",8080)
go func() {
http.ListenAndServe(httpaddr, nil)
}()
config.UpdateGlobal(func(c *config.Config) {
c.TiKVClient.MaxBatchSize = 0
})
cfg := &Config {
StoreEngine: "tikv",
TikvClientUrls: *pdAddr,
}
if *action == "lockPut" {
cfg.TikvLockKeys = true
} else {
cfg.TikvLockKeys = false
}
s,err := newTikvStore(cfg)
if err != nil {
panic(err)
}
value := make([]byte, *valueLen)
for i := 0; i < *valueLen; i++ {
value[i] = byte(i)
}
b := &Batch{
KV: KV{
Key: []byte("key1dasdasdasddddddddd"),
Value: value,
},
IsDeleted: false,
}
b1 := &Batch{
KV: KV{
Key: []byte("key2dasdasdasddddddddd"),
Value: value,
},
IsDeleted: false,
}
go func() {
for {
s.WriteBatch(context.Background(), []*Batch{b1})
}
}()
var total int64 = 0
for i := 0; i < *count; i++ {
start := time.Now()
err := s.WriteBatch(context.Background(), []*Batch{b})
if err != nil {
panic(err)
}
end := time.Since(start)
fmt.Println("cost", end.Microseconds())
total += end.Microseconds()
//time.Sleep(1*time.Second)
}
fmt.Println("avg: %d", total/int64(*count))
}
v6.5.8的默认配置,什么都没有显示
# WARNING: This file is auto-generated. Do not edit! All your modification will be overwritten!
# You can use 'tiup cluster edit-config' and 'tiup cluster reload' to update the configuration
# All configuration items you want to change can be added to:
# server_configs:
# tikv:
# aa.b1.c3: value
# aa.b2.c4: value
# WARNING: This file is auto-generated. Do not edit! All your modification will be overwritten!
# You can use 'tiup cluster edit-config' and 'tiup cluster reload' to update the configuration
# All configuration items you want to change can be added to:
# server_configs:
# pd:
# aa.b1.c3: value
# aa.b2.c4: value
师兄,能问下你们测试用什么部署,我用TiOperator部署时,PD老是起不来。