diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index 81b42f43e8..b34facb3ed 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -223,7 +223,6 @@ static void tsdbDataIterClose(STsdbDataIter *pIter) { tBlockDataDestroy(&pSttDIter->bData); taosArrayDestroy(pSttDIter->aSttBlk); - tsdbDataFReaderClose(&pSttDIter->pReader); } else { ASSERT(0); } @@ -322,13 +321,34 @@ _exit: return code; } +static void tsdbEndCompact(STsdbCompactor *pCompactor) { + tsdbFSDestroy(&pCompactor->fs); + tBlockDataDestroy(&pCompactor->bData); + tDestroyTSchema(pCompactor->tbSkm.pTSchema); + pCompactor->tbSkm.pTSchema = NULL; + taosArrayDestroy(pCompactor->aBlockIdx); + tMapDataClear(&pCompactor->mDataBlk); + taosArrayDestroy(pCompactor->aSttBlk); +} + static int32_t tsdbCommitCompact(STsdbCompactor *pCompactor) { int32_t code = 0; int32_t lino = 0; STsdb *pTsdb = pCompactor->pTsdb; - // TODO + code = tsdbFSPrepareCommit(pTsdb, &pCompactor->fs); + TSDB_CHECK_CODE(code, lino, _exit); + + taosThreadRwlockWrlock(&pTsdb->rwLock); + + code = tsdbFSCommit(pTsdb); + if (code) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + TSDB_CHECK_CODE(code, lino, _exit); + } + + taosThreadRwlockUnlock(&pTsdb->rwLock); _exit: if (code) { @@ -691,8 +711,6 @@ static int32_t tsdbCompactWriteDataBlk(STsdbCompactor *pCompactor) { TSDB_CHECK_CODE(code, lino, _exit); tMapDataReset(&pCompactor->mDataBlk); - pCompactor->tableId.suid = 0; - pCompactor->tableId.uid = 0; _exit: if (code) { @@ -737,7 +755,11 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); } else { - if (pCompactor->bData.suid != pRowInfo->suid) { // different super table + if ((pCompactor->tableId.suid != pRowInfo->suid) || // different super table + (pCompactor->tableId.uid != pRowInfo->uid && + (pRowInfo->suid == 0 || + pCompactor->bData.uid && pCompactor->bData.nRow >= pCompactor->minRows)) // different table + ) { code = tsdbCompactWriteBlockData(pCompactor); TSDB_CHECK_CODE(code, lino, _exit); @@ -747,24 +769,6 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, pTSchema, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); - } else if (pCompactor->bData.uid != pRowInfo->uid) { // different table - if (pRowInfo->suid) { - if (pCompactor->bData.uid && pCompactor->bData.nRow >= pCompactor->minRows) { - code = tsdbCompactWriteBlockData(pCompactor); - TSDB_CHECK_CODE(code, lino, _exit); - } - } else { - // different normal table - code = tsdbCompactWriteBlockData(pCompactor); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbCompactWriteDataBlk(pCompactor); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, - pTSchema, NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); - } } } @@ -772,6 +776,9 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { code = tBlockDataAppendRowEx(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid); TSDB_CHECK_CODE(code, lino, _exit); + pCompactor->tableId.suid = pRowInfo->suid; + pCompactor->tableId.uid = pRowInfo->uid; + // check if block data is full if (pCompactor->bData.nRow >= pCompactor->maxRows) { code = tsdbCompactWriteBlockData(pCompactor); @@ -814,5 +821,6 @@ _exit: } else { tsdbCommitCompact(pCompactor); } + tsdbEndCompact(pCompactor); return code; }