enh: not allow to insert if Tsdb applied lagging behind too far

This commit is contained in:
Benguang Zhao 2023-03-09 18:56:12 +08:00
parent 1392711e66
commit 552d0bc8a0
4 changed files with 15 additions and 4 deletions

View File

@ -541,7 +541,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913) #define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913)
#define TSDB_CODE_SYN_RESTORING TAOS_DEF_ERROR_CODE(0, 0x0914) #define TSDB_CODE_SYN_RESTORING TAOS_DEF_ERROR_CODE(0, 0x0914)
#define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal #define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal
#define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916) // #define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916)
#define TSDB_CODE_SYN_WRITE_STALL TAOS_DEF_ERROR_CODE(0, 0x0917)
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq // tq

View File

@ -2165,8 +2165,8 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
// append to log buffer // append to log buffer
if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) { if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index); sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
terrno = TSDB_CODE_SYN_BUFFER_FULL; ASSERT(terrno != 0);
(void)syncLogFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, TSDB_CODE_SYN_BUFFER_FULL); (void)syncLogFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno);
syncEntryDestroy(pEntry); syncEntryDestroy(pEntry);
return -1; return -1;
} }

View File

@ -48,7 +48,16 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
SyncIndex index = pEntry->index; SyncIndex index = pEntry->index;
if (index - pBuf->startIndex >= pBuf->size) { if (index - pBuf->startIndex >= pBuf->size) {
sError("vgId:%d, failed to append due to sync log buffer full. index:%" PRId64 "", pNode->vgId, index); terrno = TSDB_CODE_SYN_BUFFER_FULL;
sError("vgId:%d, failed to append since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
goto _err;
}
SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
if (index - appliedIndex >= pBuf->size) {
terrno = TSDB_CODE_SYN_WRITE_STALL;
sError("vgId:%d, failed to append since %s. index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64,
pNode->vgId, terrstr(), index, pBuf->commitIndex, appliedIndex);
goto _err; goto _err;
} }

View File

@ -422,6 +422,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BATCH_ERROR, "Sync batch error")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync is restoring") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RESTORING, "Sync is restoring")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot msg") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot msg")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRITE_STALL, "Sync write stall")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
//tq //tq