diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index 00cefd3fc2..88a6dd232e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -15,692 +15,6 @@ #include "tsdb.h" -#if 0 - -#define TSDB_ITER_TYPE_MEM 0x0 -#define TSDB_ITER_TYPE_DAT 0x1 -#define TSDB_ITER_TYPE_STT 0x2 - -typedef struct { -} SMemDIter; - -typedef struct { - SDataFReader *pReader; - SArray *aBlockIdx; // SArray - SMapData mDataBlk; // SMapData - SBlockData bData; - int32_t iBlockIdx; - int32_t iDataBlk; - int32_t iRow; -} SDataDIter; - -typedef struct { - SDataFReader *pReader; - int32_t iStt; - SArray *aSttBlk; // SArray - SBlockData bData; - int32_t iSttBlk; - int32_t iRow; -} SSttDIter; - -typedef struct STsdbDataIter { - struct STsdbDataIter *next; - - int32_t flag; - SRowInfo rowInfo; - SRBTreeNode n; - char handle[]; -} STsdbDataIter; - -#define TSDB_DATA_ITER_FROM_RBTN(N) ((STsdbDataIter *)((char *)N - offsetof(STsdbDataIter, n))) - -#define TSDB_FLG_DEEP_COMPACT 0x1 - -// ITER ========================= -static int32_t tsdbDataIterNext(STsdbDataIter *pIter, TABLEID *pExcludeTableId); - -static int32_t tsdbDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) { - const STsdbDataIter *pIter1 = (STsdbDataIter *)((char *)n1 - offsetof(STsdbDataIter, n)); - const STsdbDataIter *pIter2 = (STsdbDataIter *)((char *)n2 - offsetof(STsdbDataIter, n)); - - return tRowInfoCmprFn(&pIter1->rowInfo, &pIter2->rowInfo); -} - -static int32_t tsdbMemDIterOpen(STsdbDataIter **ppIter) { - int32_t code = 0; - int32_t lino = 0; - - STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SMemDIter)); - if (pIter == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - // TODO - -_exit: - if (code) { - *ppIter = NULL; - } else { - *ppIter = pIter; - } - return code; -} - -static int32_t tsdbDataDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter) { - int32_t code = 0; - int32_t lino = 0; - - STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SDataDIter)); - if (NULL == pIter) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - pIter->flag = TSDB_ITER_TYPE_DAT; - - SDataDIter *pDataDIter = (SDataDIter *)pIter->handle; - pDataDIter->pReader = pReader; - pDataDIter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - if (pDataDIter->aBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - code = tsdbReadBlockIdx(pReader, pDataDIter->aBlockIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - if (taosArrayGetSize(pDataDIter->aBlockIdx) == 0) goto _clear_exit; - - // TODO - code = tBlockDataCreate(&pDataDIter->bData); - TSDB_CHECK_CODE(code, lino, _exit); - - pDataDIter->iBlockIdx = -1; - pDataDIter->iDataBlk = 0; - pDataDIter->iRow = 0; - - code = tsdbDataIterNext(pIter, NULL); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - _clear_exit: - *ppIter = NULL; - if (pIter) { - tBlockDataDestroy(&pDataDIter->bData); - tMapDataClear(&pDataDIter->mDataBlk); - taosArrayDestroy(pDataDIter->aBlockIdx); - taosMemoryFree(pIter); - } - } else { - *ppIter = pIter; - } - return code; -} - -static int32_t tsdbSttDIterOpen(SDataFReader *pReader, int32_t iStt, STsdbDataIter **ppIter) { - int32_t code = 0; - int32_t lino = 0; - - STsdbDataIter *pIter = (STsdbDataIter *)taosMemoryCalloc(1, sizeof(*pIter) + sizeof(SSttDIter)); - if (pIter == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - pIter->flag = TSDB_ITER_TYPE_STT; - - SSttDIter *pSttDIter = (SSttDIter *)pIter->handle; - pSttDIter->pReader = pReader; - pSttDIter->iStt = iStt; - pSttDIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); - if (pSttDIter->aSttBlk == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - code = tsdbReadSttBlk(pReader, pSttDIter->iStt, pSttDIter->aSttBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - if (taosArrayGetSize(pSttDIter->aSttBlk) == 0) goto _clear_exit; - - code = tBlockDataCreate(&pSttDIter->bData); - TSDB_CHECK_CODE(code, lino, _exit); - - pSttDIter->iSttBlk = -1; - pSttDIter->iRow = -1; - - code = tsdbDataIterNext(pIter, NULL); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - _clear_exit: - *ppIter = NULL; - if (pIter) { - tBlockDataDestroy(&pSttDIter->bData); - taosArrayDestroy(pSttDIter->aSttBlk); - taosMemoryFree(pIter); - } - } else { - *ppIter = pIter; - } - return code; -} - -static void tsdbDataIterClose(STsdbDataIter *pIter) { - if (pIter == NULL) return; - - if (pIter->flag & TSDB_ITER_TYPE_MEM) { - ASSERT(0); - } else if (pIter->flag & TSDB_ITER_TYPE_DAT) { - ASSERT(0); - } else if (pIter->flag & TSDB_ITER_TYPE_STT) { - SSttDIter *pSttDIter = (SSttDIter *)pIter->handle; - - tBlockDataDestroy(&pSttDIter->bData); - taosArrayDestroy(pSttDIter->aSttBlk); - } else { - ASSERT(0); - } - - taosMemoryFree(pIter); -} - -static int32_t tsdbDataIterNext(STsdbDataIter *pIter, TABLEID *pExcludeTableId) { - int32_t code = 0; - int32_t lino = 0; - - if (pIter->flag & TSDB_ITER_TYPE_MEM) { - // TODO - ASSERT(0); - } else if (pIter->flag & TSDB_ITER_TYPE_DAT) { - // TODO - ASSERT(0); - } else if (pIter->flag & TSDB_ITER_TYPE_STT) { - SSttDIter *pSttDIter = (SSttDIter *)pIter->handle; - - for (;;) { - if (++pSttDIter->iRow >= pSttDIter->bData.nRow) { - for (;;) { - if (++pSttDIter->iSttBlk < taosArrayGetSize(pSttDIter->aSttBlk)) { - SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pSttDIter->aSttBlk, pSttDIter->iSttBlk); - - // check exclusion - if (pExcludeTableId) { - if (pExcludeTableId->uid) { // exclude (suid, uid) - if (pSttBlk->minUid == pExcludeTableId->uid && pSttBlk->maxUid == pExcludeTableId->uid) continue; - } else { // exclude (suid, *) - if (pSttBlk->suid == pExcludeTableId->suid) continue; - } - } - - code = tsdbReadSttBlockEx(pSttDIter->pReader, pSttDIter->iStt, pSttBlk, &pSttDIter->bData); - TSDB_CHECK_CODE(code, lino, _exit); - - pIter->rowInfo.suid = pSttBlk->suid; - pSttDIter->iRow = 0; - break; - } else { - // iter end, all set 0 and exit - pIter->rowInfo.suid = 0; - pIter->rowInfo.uid = 0; - goto _exit; - } - } - } - - pIter->rowInfo.uid = pSttDIter->bData.uid ? pSttDIter->bData.uid : pSttDIter->bData.aUid[pSttDIter->iRow]; - pIter->rowInfo.row = tsdbRowFromBlockData(&pSttDIter->bData, pSttDIter->iRow); - - // check exclusion - if (pExcludeTableId) { - if (pExcludeTableId->uid) { // exclude (suid, uid) - if (pIter->rowInfo.uid == pExcludeTableId->uid) continue; - } else { // exclude (suid, *) - if (pIter->rowInfo.suid == pExcludeTableId->suid) continue; - } - } - - break; - } - } else { - ASSERT(0); - } - -_exit: - return code; -} - -// COMPACT ========================= - -static int32_t tsdbCommitCompact(STsdbCompactor *pCompactor) { - int32_t code = 0; - int32_t lino = 0; - - STsdb *pTsdb = pCompactor->pTsdb; - - code = tsdbFSPrepareCommit(pTsdb, &pCompactor->fs); - TSDB_CHECK_CODE(code, lino, _exit); - - taosThreadRwlockWrlock(&pTsdb->rwLock); - - code = tsdbFSCommit(pTsdb); - if (code) { - taosThreadRwlockUnlock(&pTsdb->rwLock); - TSDB_CHECK_CODE(code, lino, _exit); - } - - taosThreadRwlockUnlock(&pTsdb->rwLock); - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -static int32_t tsdbAbortCompact(STsdbCompactor *pCompactor) { - int32_t code = 0; - int32_t lino = 0; - - STsdb *pTsdb = pCompactor->pTsdb; - - // TODO - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -static int32_t tsdbDeepCompact(STsdbCompactor *pCompactor) { - int32_t code = 0; - int32_t lino = 0; - - STsdb *pTsdb = pCompactor->pTsdb; - - code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -static int32_t tsdbShallowCompact(STsdbCompactor *pCompactor) { - int32_t code = 0; - int32_t lino = 0; - - STsdb *pTsdb = pCompactor->pTsdb; - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -static int32_t tsdbCompactNextRowImpl(STsdbCompactor *pCompactor, TABLEID *pExcludeTableId) { - int32_t code = 0; - int32_t lino = 0; - - for (;;) { - if (pCompactor->pIter) { - code = tsdbDataIterNext(pCompactor->pIter, pExcludeTableId); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pCompactor->pIter->rowInfo.suid == 0 && pCompactor->pIter->rowInfo.uid == 0) { - pCompactor->pIter = NULL; - } else { - SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree); - if (pNode) { - STsdbDataIter *pIter = TSDB_DATA_ITER_FROM_RBTN(pNode); - - int32_t c = tRowInfoCmprFn(&pCompactor->pIter->rowInfo, &pIter->rowInfo); - ASSERT(c); - - if (c > 0) { - tRBTreePut(&pCompactor->rtree, &pCompactor->pIter->n); - pCompactor->pIter = NULL; - } - } - } - } - - if (pCompactor->pIter == NULL) { - SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rtree); - if (pNode) { - pCompactor->pIter = TSDB_DATA_ITER_FROM_RBTN(pNode); - tRBTreeDrop(&pCompactor->rtree, pNode); - - if (pExcludeTableId) { - if (pExcludeTableId->uid) { - if (pCompactor->pIter->rowInfo.uid == pExcludeTableId->uid) continue; - } else { - if (pCompactor->pIter->rowInfo.suid == pExcludeTableId->suid) continue; - } - } - } - } - - break; - } - -_exit: - return code; -} - -static int32_t tDelIdxCmprFn(const SDelIdx *pDelIdx1, const SDelIdx *pDelIdx2) { - if (pDelIdx1->suid < pDelIdx2->suid) { - return -1; - } else if (pDelIdx1->suid > pDelIdx2->suid) { - return 1; - } - - if (pDelIdx1->uid < pDelIdx2->uid) { - return -1; - } else if (pDelIdx1->uid > pDelIdx2->uid) { - return 1; - } - - return 0; -} - -static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) { - int32_t code = 0; - int32_t lino = 0; - - TABLEID excludeTableId; - TABLEID *pExcludeTableId = NULL; - - for (;;) { - code = tsdbCompactNextRowImpl(pCompactor, pExcludeTableId); - TSDB_CHECK_CODE(code, lino, _exit); - - // check if the table of the row exists - if (pCompactor->pIter) { - SRowInfo *pRowInfo = &pCompactor->pIter->rowInfo; - - if (pRowInfo->uid != pCompactor->tbSkm.uid) { - SMetaInfo info; - if (pRowInfo->suid) { // child table - - // check if super table exists - if (pRowInfo->suid != pCompactor->tbSkm.suid) { - if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->uid, &info, NULL) != TSDB_CODE_SUCCESS) { - excludeTableId.suid = pRowInfo->suid; - excludeTableId.uid = 0; - pExcludeTableId = &excludeTableId; - continue; - } - - // super table exists - pCompactor->tbSkm.suid = pRowInfo->suid; - pCompactor->tbSkm.uid = 0; - tDestroyTSchema(pCompactor->tbSkm.pTSchema); - pCompactor->tbSkm.pTSchema = metaGetTbTSchema(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->suid, -1, 1); - if (pCompactor->tbSkm.pTSchema == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - // check if table exists - if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->uid, &info, NULL) != TSDB_CODE_SUCCESS) { - excludeTableId.suid = pRowInfo->suid; - excludeTableId.uid = pRowInfo->uid; - pExcludeTableId = &excludeTableId; - continue; - } - - // table exists - pCompactor->tbSkm.uid = pRowInfo->uid; - } else { // normal table - // check if table exists - if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->uid, &info, NULL) != TSDB_CODE_SUCCESS) { - excludeTableId.suid = pRowInfo->suid; - excludeTableId.uid = pRowInfo->uid; - pExcludeTableId = &excludeTableId; - continue; - } - - // table exists - pCompactor->tbSkm.suid = pRowInfo->suid; - pCompactor->tbSkm.uid = pRowInfo->uid; - tDestroyTSchema(pCompactor->tbSkm.pTSchema); - - pCompactor->tbSkm.pTSchema = metaGetTbTSchema(pCompactor->pTsdb->pVnode->pMeta, pRowInfo->suid, -1, 1); - if (pCompactor->tbSkm.pTSchema == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - // load delData and build the skyline - if (pCompactor->pDelFReader) { - SDelIdx *pDelIdx = - taosArraySearch(pCompactor->aDelIdx, &(SDelIdx){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, - (__compar_fn_t)tDelIdxCmprFn, TD_EQ); - if (pDelIdx) { - code = tsdbReadDelData(pCompactor->pDelFReader, pDelIdx, pCompactor->aDelData); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbBuildDeleteSkyline(pCompactor->aDelData, 0, taosArrayGetSize(pCompactor->aDelData) - 1, - pCompactor->aSkyLine); - TSDB_CHECK_CODE(code, lino, _exit); - - pCompactor->aTSDBKEY = (TSDBKEY *)TARRAY_DATA(pCompactor->aSkyLine); - pCompactor->iKey = 0; - pCompactor->sKey.version = 0; - pCompactor->sKey.ts = pCompactor->aTSDBKEY[0].ts; - } else { - pCompactor->aTSDBKEY = NULL; - } - } - } - - ASSERT(pRowInfo->uid == pCompactor->tbSkm.uid); - - // check if the row is deleted - if (pCompactor->aTSDBKEY && tsdbCompactRowIsDeleted(pCompactor, &pRowInfo->row)) { - continue; - } else { - break; - } - } else { - // iter end, just break out - break; - } - } - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbCompactGetRow(STsdbCompactor *pCompactor, SRowInfo **ppRowInfo, STSchema **ppTSchema) { - int32_t code = 0; - int32_t lino = 0; - - if (pCompactor->pIter == NULL) { - code = tsdbCompactNextRow(pCompactor); - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (pCompactor->pIter) { - ASSERT(pCompactor->pIter->rowInfo.suid == pCompactor->tbSkm.suid); - ASSERT(pCompactor->pIter->rowInfo.uid == pCompactor->tbSkm.uid); - *ppRowInfo = &pCompactor->pIter->rowInfo; - *ppTSchema = pCompactor->tbSkm.pTSchema; - } else { - *ppRowInfo = NULL; - *ppTSchema = NULL; - } - -_exit: - return code; -} - -static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) { - int32_t code = 0; - int32_t lino = 0; - - STsdb *pTsdb = pCompactor->pTsdb; - - // reader - code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet); - TSDB_CHECK_CODE(code, lino, _exit); - - // open iters - STsdbDataIter *pIter; - - pCompactor->iterList = NULL; - tRBTreeCreate(&pCompactor->rtree, tsdbDataIterCmprFn); - - code = tsdbDataDIterOpen(pCompactor->pReader, &pIter); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pIter) { - pIter->next = pCompactor->iterList; - pCompactor->iterList = pIter; - tRBTreePut(&pCompactor->rtree, &pIter->n); - } - - for (int32_t iStt = 0; iStt < pCompactor->pReader->pSet->nSttF; iStt++) { - code = tsdbSttDIterOpen(pCompactor->pReader, iStt, &pIter); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pIter) { - pIter->next = pCompactor->iterList; - pCompactor->iterList = pIter; - tRBTreePut(&pCompactor->rtree, &pIter->n); - } - } - pCompactor->pIter = NULL; - tBlockDataReset(&pCompactor->bData); - - // writer - SDFileSet wSet = {.diskId = (SDiskID){0}, // TODO - .fid = pCompactor->pDFileSet->fid, - .pHeadF = &(SHeadFile){.commitID = pCompactor->cid}, - .pDataF = &(SDataFile){.commitID = pCompactor->cid}, - .pSmaF = &(SSmaFile){.commitID = pCompactor->cid}, - .nSttF = 1, - .aSttF = {&(SSttFile){.commitID = pCompactor->cid}}}; - code = tsdbDataFWriterOpen(&pCompactor->pWriter, pTsdb, &wSet); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pCompactor->aBlockIdx == NULL) { - pCompactor->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - if (pCompactor->aBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - } else { - taosArrayClear(pCompactor->aBlockIdx); - } - - tMapDataReset(&pCompactor->mDataBlk); - - if (pCompactor->aSttBlk == NULL) { - pCompactor->aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); - if (pCompactor->aSttBlk == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - } else { - taosArrayClear(pCompactor->aSttBlk); - } - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } else { - tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); - } - return code; -} - -static void tsdbCloseCompactor(STsdbCompactor *pCompactor) { - for (STsdbDataIter *pIter = pCompactor->iterList; pIter;) { - STsdbDataIter *pIterNext = pIter->next; - tsdbDataIterClose(pIter); - pIter = pIterNext; - } - - tDestroyTSchema(pCompactor->tbSkm.pTSchema); - pCompactor->tbSkm.pTSchema = NULL; - - tsdbDataFReaderClose(&pCompactor->pReader); -} - -extern int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg); -extern int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg); -static int32_t tsdbCompactWriteBlockData(STsdbCompactor *pCompactor) { - int32_t code = 0; - int32_t lino = 0; - - SBlockData *pBData = &pCompactor->bData; - - if (pBData->nRow == 0) goto _exit; - - if (pBData->uid && pBData->nRow >= pCompactor->minRows) { // write to .data file - code = tsdbWriteDataBlock(pCompactor->pWriter, pBData, &pCompactor->mDataBlk, pCompactor->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - - pCompactor->tableId.suid = pBData->suid; - pCompactor->tableId.uid = pBData->uid; - } else { // write to .stt file - code = tsdbWriteSttBlock(pCompactor->pWriter, pBData, pCompactor->aSttBlk, pCompactor->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } - - tBlockDataClear(&pCompactor->bData); - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbCompactWriteDataBlk(STsdbCompactor *pCompactor) { - int32_t code = 0; - int32_t lino = 0; - - if (pCompactor->mDataBlk.nItem == 0) goto _exit; - - SBlockIdx *pBlockIdx = (SBlockIdx *)taosArrayReserve(pCompactor->aBlockIdx, 1); - if (pBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - pBlockIdx->suid = pCompactor->tableId.suid; - pBlockIdx->uid = pCompactor->tableId.uid; - - code = tsdbWriteDataBlk(pCompactor->pWriter, &pCompactor->mDataBlk, pBlockIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - tMapDataReset(&pCompactor->mDataBlk); - -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -#endif - extern int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo); extern int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg); extern int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg); @@ -743,6 +57,48 @@ typedef struct { SBlockData sData; } STsdbCompactor; +static int32_t tsdbCommitCompact(STsdbCompactor *pCompactor) { + int32_t code = 0; + int32_t lino = 0; + + STsdb *pTsdb = pCompactor->pTsdb; + + code = tsdbFSPrepareCommit(pTsdb, &pCompactor->fs); + TSDB_CHECK_CODE(code, lino, _exit); + + taosThreadRwlockWrlock(&pTsdb->rwLock); + + code = tsdbFSCommit(pTsdb); + if (code) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + TSDB_CHECK_CODE(code, lino, _exit); + } + + taosThreadRwlockUnlock(&pTsdb->rwLock); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t tsdbAbortCompact(STsdbCompactor *pCompactor) { + int32_t code = 0; + int32_t lino = 0; + + STsdb *pTsdb = pCompactor->pTsdb; + + // TODO + ASSERT(0); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + static int32_t tsdbCompactWriteTableDataStart(STsdbCompactor *pCompactor, TABLEID *pId) { int32_t code = 0; int32_t lino = 0; @@ -1253,20 +609,18 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { TSDB_CHECK_CODE(code, lino, _exit); } -#if 0 - code = tsdbFSUpsertDelFile(&pCompactor->fs, NULL); TSDB_CHECK_CODE(code, lino, _exit); -#endif _exit: - // // commit/abort compact - // if (code) { - // tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - // tsdbAbortCompact(pCompactor); - // } else { - // tsdbCommitCompact(pCompactor); - // } + // commit/abort compact + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s, commit ID:%" PRId64, TD_VID(pTsdb->pVnode), __func__, lino, + tstrerror(code), pCompactor->commitID); + tsdbAbortCompact(pCompactor); + } else { + tsdbCommitCompact(pCompactor); + } tsdbEndCompact(pCompactor); return code; }