From a1ca9466cdba97d6d04a994e5bd548e34e7df30e Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 20 Apr 2023 10:23:51 +0800 Subject: [PATCH] enh: adjust size limit of appyQ and negotiationWin --- include/util/taoserror.h | 1 + include/util/tdef.h | 5 ++++- source/libs/sync/src/syncPipeline.c | 10 +++++++++- source/util/src/terror.c | 1 + 4 files changed, 15 insertions(+), 2 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ab89466a19..0d9292cc6b 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -547,6 +547,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal #define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916) #define TSDB_CODE_SYN_WRITE_STALL TAOS_DEF_ERROR_CODE(0, 0x0917) +#define TSDB_CODE_SYN_NEGO_WIN_EXCEEDED TAOS_DEF_ERROR_CODE(0, 0X0918) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) // tq diff --git a/include/util/tdef.h b/include/util/tdef.h index 2f86395dad..9dd454bb68 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -285,8 +285,11 @@ typedef enum ELogicConditionType { #define TSDB_DNODE_ROLE_VNODE 2 #define TSDB_MAX_REPLICA 5 + #define TSDB_SYNC_LOG_BUFFER_SIZE 4096 -#define TSDB_SYNC_LOG_BUFFER_RETENTION (TSDB_SYNC_LOG_BUFFER_SIZE >> 4) +#define TSDB_SYNC_LOG_BUFFER_RETENTION 256 +#define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512 +#define TSDB_SYNC_NEGOTIATION_WIN 512 #define TSDB_TBNAME_COLUMN_INDEX (-1) #define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 6bebef77dc..5e6058ef03 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -53,8 +53,15 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt goto _err; } + if (pNode->restoreFinish && index - pBuf->commitIndex >= TSDB_SYNC_NEGOTIATION_WIN) { + terrno = TSDB_CODE_SYN_NEGO_WIN_EXCEEDED; + sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64, pNode->vgId, terrstr(), + index, pBuf->commitIndex); + goto _err; + } + SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm); - if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= pBuf->size) { + if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= TSDB_SYNC_APPLYQ_SIZE_LIMIT) { terrno = TSDB_CODE_SYN_WRITE_STALL; sError("vgId:%d, failed to append since %s. index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64, pNode->vgId, terrstr(), index, pBuf->commitIndex, appliedIndex); @@ -83,6 +90,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt _err: syncLogBufferValidate(pBuf); taosThreadMutexUnlock(&pBuf->mutex); + taosMsleep(1); return -1; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 002d605793..34b09761c8 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -426,6 +426,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync leader is restor TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot msg") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRITE_STALL, "Sync write stall") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEGO_WIN_EXCEEDED, "Sync negotiation win exceeded") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error") //tq