From 7e093163dceacd27f21a52fde328a9377f67ceea Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 11 May 2022 07:43:14 +0800 Subject: [PATCH 1/3] fix: commit table in mem and file --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 76 +++++++++++++++++++++++- source/dnode/vnode/src/tsdb/tsdbRead.c | 6 +- source/dnode/vnode/src/tsdb/tsdbSma.c | 1 - 3 files changed, 78 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 1315963090..7429d74dad 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -70,6 +70,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); static void tsdbResetCommitFile(SCommitH *pCommith); static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid); static int tsdbCommitToTable(SCommitH *pCommith, int tid); +static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx); static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable); static int tsdbComparKeyBlock(const void *arg1, const void *arg2); static int tsdbWriteBlockInfo(SCommitH *pCommih); @@ -349,7 +350,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) { return -1; } - +#if 0 // Loop to commit each table data for (int tid = 0; tid < pCommith->niters; tid++) { SCommitIter *pIter = pCommith->iters + tid; @@ -363,6 +364,46 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { return -1; } } +#endif + // Loop to commit each table data in mem and file + int mIter = 0, fIter = 0; + int32_t nBlkIdx = taosArrayGetSize(pCommith->readh.aBlkIdx); + + while (true) { + SBlockIdx *pIdx = NULL; + SCommitIter *pIter = NULL; + if (mIter < pCommith->niters) { + pIter = pCommith->iters + mIter; + if (fIter < nBlkIdx) { + pIdx = taosArrayGet(pCommith->readh.aBlkIdx, fIter); + } + } else if (fIter < nBlkIdx) { + pIdx = taosArrayGet(pCommith->readh.aBlkIdx, fIter); + } else { + break; + } + if (pIter && pIter->pTable && (!pIdx || (pIter->pTable->uid <= pIdx->uid))) { + if (tsdbCommitToTable(pCommith, mIter) < 0) { + tsdbCloseCommitFile(pCommith, true); + // revert the file change + tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); + return -1; + } + + if (pIdx && (pIter->pTable->uid == pIdx->uid)) { + ++fIter; + } + ++mIter; + } else if (pIdx) { + if (tsdbMoveBlkIdx(pCommith, pIdx) < 0) { + tsdbCloseCommitFile(pCommith, true); + // revert the file change + tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); + return -1; + } + ++fIter; + } + } if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) < 0) { @@ -838,6 +879,39 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { return 0; } +static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) { + SReadH *pReadh = &pCommith->readh; + int nBlocks = pIdx->numOfBlocks; + int bidx = 0; + SBlock *pBlock; + + tsdbResetCommitTable(pCommith); + + pReadh->pBlkIdx = pIdx; + + if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { + return -1; + } + + while (bidx < nBlocks) { + if (tsdbMoveBlock(pCommith, bidx) < 0) { + return -1; + } + ++bidx; + } + + STable table = {.tid = pIdx->uid, .uid = pIdx->uid, .pSchema = NULL}; + TSDB_COMMIT_TABLE(pCommith) = &table; + + if (tsdbWriteBlockInfo(pCommith) < 0) { + tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), + TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); + return -1; + } + + return 0; +} + static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) { STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 60f2f74f5b..4638066935 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -372,13 +372,13 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle, } if (level == TSDB_RETENTION_L0) { - tsdbDebug("%p rsma level %d is selected to query\n", pReadHandle, level); + tsdbDebug("%p rsma level %d is selected to query", pReadHandle, TSDB_RETENTION_L0); return VND_RSMA0(pVnode); } else if (level == TSDB_RETENTION_L1) { - tsdbDebug("%p rsma level %d is selected to query\n", pReadHandle, level); + tsdbDebug("%p rsma level %d is selected to query", pReadHandle, TSDB_RETENTION_L1); return VND_RSMA1(pVnode); } else { - tsdbDebug("%p rsma level %d is selected to query\n", pReadHandle, level); + tsdbDebug("%p rsma level %d is selected to query", pReadHandle, TSDB_RETENTION_L2); return VND_RSMA2(pVnode); } } diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 32051c2de4..61515a3be1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -1943,7 +1943,6 @@ static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pStore) { if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) { - tsdbDebug("vgId:%d no need to update tbUids since empty uidStore", REPO_ID(pTsdb)); return TSDB_CODE_SUCCESS; } From a8e91544252970b6de43239aaa6695e51d0084c2 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 11 May 2022 07:48:45 +0800 Subject: [PATCH 2/3] enh: format optimization --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 7429d74dad..26b1dc7274 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -366,8 +366,8 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { } #endif // Loop to commit each table data in mem and file - int mIter = 0, fIter = 0; - int32_t nBlkIdx = taosArrayGetSize(pCommith->readh.aBlkIdx); + int mIter = 0, fIter = 0; + int nBlkIdx = taosArrayGetSize(pCommith->readh.aBlkIdx); while (true) { SBlockIdx *pIdx = NULL; From 8cb2edb25ca85b835a93db43b0a0a35d058f2009 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 11 May 2022 07:55:01 +0800 Subject: [PATCH 3/3] enh: code optimization --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 26b1dc7274..5f54e0cb48 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -883,7 +883,6 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) { SReadH *pReadh = &pCommith->readh; int nBlocks = pIdx->numOfBlocks; int bidx = 0; - SBlock *pBlock; tsdbResetCommitTable(pCommith); @@ -895,6 +894,8 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) { while (bidx < nBlocks) { if (tsdbMoveBlock(pCommith, bidx) < 0) { + tsdbError("vgId:%d failed to move block into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), + TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); return -1; } ++bidx;