diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index b2e1e8ab34..be624ebcc5 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -406,11 +406,9 @@ struct SMemTable { TSKEY maxKey; int64_t nRow; int64_t nDel; - struct { - int32_t nTbData; - int32_t nBucket; - STbData **aBucket; - }; + int32_t nTbData; + int32_t nBucket; + STbData **aBucket; }; struct TSDBROW { diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index fd2f3b84f4..77495e59ac 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -15,32 +15,9 @@ #include "tsdb.h" -typedef enum { MEMORY_DATA_ITER = 0, STT_DATA_ITER } EDataIterT; - -#define USE_STREAM_COMPRESSION 0 - -typedef struct { - SRBTreeNode n; - SRowInfo r; - EDataIterT type; - union { - struct { - int32_t iTbDataP; - STbDataIter iter; - }; // memory data iter - struct { - int32_t iStt; - SArray *aSttBlk; - int32_t iSttBlk; - SBlockData bData; - int32_t iRow; - }; // stt file data iter - }; -} SDataIter; - typedef struct { STsdb *pTsdb; - /* commit data */ + // config int64_t commitID; int32_t minutes; int8_t precision; @@ -48,1314 +25,95 @@ typedef struct { int32_t maxRow; int8_t cmprAlg; int8_t sttTrigger; - SArray *aTbDataP; // memory - STsdbFS fs; // disk - // -------------- - TSKEY nextKey; // reset by each table commit - int32_t commitFid; - int32_t expLevel; - TSKEY minKey; - TSKEY maxKey; - // commit file data - struct { - SDataFReader *pReader; - SArray *aBlockIdx; // SArray - int32_t iBlockIdx; - SBlockIdx *pBlockIdx; - SMapData mBlock; // SMapData - SBlockData bData; - } dReader; - struct { - SDataIter *pIter; - SRBTree rbt; - SDataIter dataIter; - SDataIter aDataIter[TSDB_MAX_STT_TRIGGER]; - int8_t toLastOnly; - }; - struct { - SDataFWriter *pWriter; - SArray *aBlockIdx; // SArray - SArray *aSttBlk; // SArray - SMapData mBlock; // SMapData - SBlockData bData; - SBlockData bDatal; - } dWriter; - SSkmInfo skmTable; - SSkmInfo skmRow; - /* commit del */ - SDelFReader *pDelFReader; - SDelFWriter *pDelFWriter; - SArray *aDelIdx; // SArray - SArray *aDelIdxN; // SArray - SArray *aDelData; // SArray + SArray *aTbDataP; + // context + TSKEY nextKey; // reset by each table commit + int32_t fid; + int32_t expLevel; + TSKEY minKey; + TSKEY maxKey; + SSkmInfo skmTable; + SSkmInfo skmRow; + SBlockData bData; + SColData aColData[4]; // + SArray *aSttBlk; // SArray + SArray *aDelBlk; // SArray } SCommitter; -extern int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg); -extern int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg); -extern int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo); - -static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter, SCommitInfo *pInfo); -static int32_t tsdbCommitData(SCommitter *pCommitter); -static int32_t tsdbCommitDel(SCommitter *pCommitter); -static int32_t tsdbCommitCache(SCommitter *pCommitter); -static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno); -static int32_t tsdbNextCommitRow(SCommitter *pCommitter); - -static int32_t tsdbCommitDelStart(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - STsdb *pTsdb = pCommitter->pTsdb; - SMemTable *pMemTable = pTsdb->imem; - - if ((pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - if ((pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - if ((pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - SDelFile *pDelFileR = pCommitter->fs.pDelFile; - if (pDelFileR) { - code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx); - TSDB_CHECK_CODE(code, lino, _exit); - } - - // prepare new - SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0}; - code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } else { - tsdbDebug("vgId:%d, commit del start", TD_VID(pTsdb->pVnode)); - } - return code; +static int32_t tsdbRowIsDeleted(SCommitter *pCommitter, TSDBROW *pRow) { + // TODO + ASSERT(0); + return 0; } -static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) { - int32_t code = 0; - int32_t lino = 0; - SDelData *pDelData; - tb_uid_t suid; - tb_uid_t uid; - - if (pTbData) { - suid = pTbData->suid; - uid = pTbData->uid; - - if (pTbData->pHead == NULL) { - pTbData = NULL; - } - } - - if (pDelIdx) { - suid = pDelIdx->suid; - uid = pDelIdx->uid; - - code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - taosArrayClear(pCommitter->aDelData); - } - - if (pTbData == NULL && pDelIdx == NULL) goto _exit; - - SDelIdx delIdx = {.suid = suid, .uid = uid}; - - // memory - pDelData = pTbData ? pTbData->pHead : NULL; - for (; pDelData; pDelData = pDelData->pNext) { - if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - // write - code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, &delIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - // put delIdx - if (taosArrayPush(pCommitter->aDelIdxN, &delIdx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - STsdb *pTsdb = pCommitter->pTsdb; - - code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbFSUpsertDelFile(&pCommitter->fs, &pCommitter->pDelFWriter->fDel); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pCommitter->pDelFReader) { - code = tsdbDelFReaderClose(&pCommitter->pDelFReader); - TSDB_CHECK_CODE(code, lino, _exit); - } - - taosArrayDestroy(pCommitter->aDelIdx); - taosArrayDestroy(pCommitter->aDelData); - taosArrayDestroy(pCommitter->aDelIdxN); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) { +static int32_t tsdbCommitTimeSeriesData(SCommitter *pCommitter) { int32_t code = 0; int32_t lino = 0; - if (pCommitter->skmRow.pTSchema) { - if (pCommitter->skmRow.suid == suid) { - if (suid == 0) { - if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit; - } else { - if (sver == pCommitter->skmRow.pTSchema->version) goto _exit; - } - } - } - - pCommitter->skmRow.suid = suid; - pCommitter->skmRow.uid = uid; - tDestroyTSchema(pCommitter->skmRow.pTSchema); - code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - return code; -} - -static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - - ASSERT(pCommitter->dReader.pBlockIdx); - - pCommitter->dReader.iBlockIdx++; - if (pCommitter->dReader.iBlockIdx < taosArrayGetSize(pCommitter->dReader.aBlockIdx)) { - pCommitter->dReader.pBlockIdx = - (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx); - - code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); - TSDB_CHECK_CODE(code, lino, _exit); - - ASSERT(pCommitter->dReader.mBlock.nItem > 0); - } else { - pCommitter->dReader.pBlockIdx = NULL; - } - -_exit: - return code; -} - -static int32_t tDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) { - SDataIter *pIter1 = (SDataIter *)((uint8_t *)n1 - offsetof(SDataIter, n)); - SDataIter *pIter2 = (SDataIter *)((uint8_t *)n2 - offsetof(SDataIter, n)); - - return tRowInfoCmprFn(&pIter1->r, &pIter2->r); -} - -static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - - pCommitter->pIter = NULL; - tRBTreeCreate(&pCommitter->rbt, tDataIterCmprFn); - - // memory - TSDBKEY tKey = {.ts = pCommitter->minKey, .version = VERSION_MIN}; - SDataIter *pIter = &pCommitter->dataIter; - pIter->type = MEMORY_DATA_ITER; - pIter->iTbDataP = 0; - for (; pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP); pIter->iTbDataP++) { - STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP); - tsdbTbDataIterOpen(pTbData, &tKey, 0, &pIter->iter); - TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter); - if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { - pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow)); - pRow = NULL; - } - - if (pRow == NULL) continue; - - pIter->r.suid = pTbData->suid; - pIter->r.uid = pTbData->uid; - pIter->r.row = *pRow; - break; - } - ASSERT(pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP)); - tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter); - - // disk - pCommitter->toLastOnly = 0; - SDataFReader *pReader = pCommitter->dReader.pReader; - if (pReader) { - if (pReader->pSet->nSttF >= pCommitter->sttTrigger) { - int8_t iIter = 0; - for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) { - pIter = &pCommitter->aDataIter[iIter]; - pIter->type = STT_DATA_ITER; - pIter->iStt = iStt; - - code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - if (taosArrayGetSize(pIter->aSttBlk) == 0) continue; - - pIter->iSttBlk = 0; - SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, 0); - code = tsdbReadSttBlockEx(pCommitter->dReader.pReader, iStt, pSttBlk, &pIter->bData); - TSDB_CHECK_CODE(code, lino, _exit); - - pIter->iRow = 0; - pIter->r.suid = pIter->bData.suid; - pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0]; - pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0); - - tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter); - iIter++; - } - } else { - for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) { - SSttFile *pSttFile = pReader->pSet->aSttF[iStt]; - if (pSttFile->size > pSttFile->offset) { - pCommitter->toLastOnly = 1; - break; - } - } - } - } - - code = tsdbNextCommitRow(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); + // TODO _exit: if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); + tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code)); } return code; } -static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - STsdb *pTsdb = pCommitter->pTsdb; - SDFileSet *pRSet = NULL; - - // memory - pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision); - pCommitter->expLevel = tsdbFidLevel(pCommitter->commitFid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec()); - tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey, - &pCommitter->maxKey); - - pCommitter->nextKey = TSKEY_MAX; - - // Reader - SDFileSet tDFileSet = {.fid = pCommitter->commitFid}; - pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &tDFileSet, tDFileSetCmprFn, TD_EQ); - if (pRSet) { - code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet); - TSDB_CHECK_CODE(code, lino, _exit); - - // data - code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - pCommitter->dReader.iBlockIdx = 0; - if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) { - pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0); - code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - pCommitter->dReader.pBlockIdx = NULL; - } - tBlockDataReset(&pCommitter->dReader.bData); - } else { - pCommitter->dReader.pBlockIdx = NULL; - } - - // Writer - SHeadFile fHead = {.commitID = pCommitter->commitID}; - SDataFile fData = {.commitID = pCommitter->commitID}; - SSmaFile fSma = {.commitID = pCommitter->commitID}; - SSttFile fStt = {.commitID = pCommitter->commitID}; - SDFileSet wSet = {.fid = pCommitter->commitFid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma}; - if (pRSet) { - ASSERT(pRSet->nSttF <= pCommitter->sttTrigger); - fData = *pRSet->pDataF; - fSma = *pRSet->pSmaF; - wSet.diskId = pRSet->diskId; - if (pRSet->nSttF < pCommitter->sttTrigger) { - for (int32_t iStt = 0; iStt < pRSet->nSttF; iStt++) { - wSet.aSttF[iStt] = pRSet->aSttF[iStt]; - } - wSet.nSttF = pRSet->nSttF + 1; - } else { - wSet.nSttF = 1; - } - } else { - SDiskID did = {0}; - if (tfsAllocDisk(pTsdb->pVnode->pTfs, pCommitter->expLevel, &did) < 0) { - code = terrno; - TSDB_CHECK_CODE(code, lino, _exit); - } - tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did); - wSet.diskId = did; - wSet.nSttF = 1; - } - wSet.aSttF[wSet.nSttF - 1] = &fStt; - code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet); - TSDB_CHECK_CODE(code, lino, _exit); - - taosArrayClear(pCommitter->dWriter.aBlockIdx); - taosArrayClear(pCommitter->dWriter.aSttBlk); - tMapDataReset(&pCommitter->dWriter.mBlock); - tBlockDataReset(&pCommitter->dWriter.bData); - tBlockDataReset(&pCommitter->dWriter.bDatal); - - // open iter - code = tsdbOpenCommitIter(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitSttBlk(SDataFWriter *pWriter, SDiskDataBuilder *pBuilder, SArray *aSttBlk) { +static int32_t tsdbCommitDelData(SCommitter *pCommitter) { int32_t code = 0; int32_t lino = 0; - if (pBuilder->nRow == 0) return code; - - // gnrt - const SDiskData *pDiskData; - const SBlkInfo *pBlkInfo; - code = tGnrtDiskData(pBuilder, &pDiskData, &pBlkInfo); - TSDB_CHECK_CODE(code, lino, _exit); - - SSttBlk sttBlk = {.suid = pBuilder->suid, - .minUid = pBlkInfo->minUid, - .maxUid = pBlkInfo->maxUid, - .minKey = pBlkInfo->minKey, - .maxKey = pBlkInfo->maxKey, - .minVer = pBlkInfo->minVer, - .maxVer = pBlkInfo->maxVer, - .nRow = pBuilder->nRow}; - // write - code = tsdbWriteDiskData(pWriter, pDiskData, &sttBlk.bInfo, NULL); - TSDB_CHECK_CODE(code, lino, _exit); - - // push - if (taosArrayPush(aSttBlk, &sttBlk) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - // clear - tDiskDataBuilderClear(pBuilder); + // TODO _exit: if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); + tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code)); } return code; } -static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { +static int32_t tsdbCommitNextFSet(SCommitter *pCommitter, int8_t *done) { int32_t code = 0; int32_t lino = 0; - // write aBlockIdx - code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx); + STsdb *pTsdb = pCommitter->pTsdb; + + // fset commit start (TODO) + + // commit fset + code = tsdbCommitTimeSeriesData(pCommitter); TSDB_CHECK_CODE(code, lino, _exit); - // write aSttBlk - code = tsdbWriteSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.aSttBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - // update file header - code = tsdbUpdateDFileSetHeader(pCommitter->dWriter.pWriter); - TSDB_CHECK_CODE(code, lino, _exit); - - // upsert SDFileSet - code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->dWriter.pWriter->wSet); - TSDB_CHECK_CODE(code, lino, _exit); - - // close and sync - code = tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 1); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pCommitter->dReader.pReader) { - code = tsdbDataFReaderClose(&pCommitter->dReader.pReader); - TSDB_CHECK_CODE(code, lino, _exit); - } + // fset commit end (TODO) _exit: if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } return code; } -static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { +static int32_t tsdbCommitterOpen(STsdb *pTsdb, SCommitInfo *pInfo, SCommitter *pCommitter) { int32_t code = 0; int32_t lino = 0; - while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) { - SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx; - code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbCommitterNextTableData(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter); -static int32_t tsdbCommitFileData(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - STsdb *pTsdb = pCommitter->pTsdb; - SMemTable *pMemTable = pTsdb->imem; - - // commit file data start - code = tsdbCommitFileDataStart(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - - // impl - code = tsdbCommitFileDataImpl(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - - // commit file data end - code = tsdbCommitFileDataEnd(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - tsdbDataFReaderClose(&pCommitter->dReader.pReader); - tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0); - } - return code; -} - -// ---------------------------------------------------------------------------- -static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter, SCommitInfo *pInfo) { - int32_t code = 0; - int32_t lino = 0; - - memset(pCommitter, 0, sizeof(*pCommitter)); - ASSERT(pTsdb->imem && "last tsdb commit incomplete"); - + memset(pCommitter, 0, sizeof(SCommitter)); pCommitter->pTsdb = pTsdb; - pCommitter->commitID = pInfo->info.state.commitID; - pCommitter->minutes = pTsdb->keepCfg.days; - pCommitter->precision = pTsdb->keepCfg.precision; - pCommitter->minRow = pInfo->info.config.tsdbCfg.minRows; - pCommitter->maxRow = pInfo->info.config.tsdbCfg.maxRows; - pCommitter->cmprAlg = pInfo->info.config.tsdbCfg.compression; - pCommitter->sttTrigger = pInfo->info.config.sttTrigger; - pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem); - if (pCommitter->aTbDataP == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - code = tsdbFSCopy(pTsdb, &pCommitter->fs); - TSDB_CHECK_CODE(code, lino, _exit); + + // TODO _exit: if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - - // reader - pCommitter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - if (pCommitter->dReader.aBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tBlockDataCreate(&pCommitter->dReader.bData); - TSDB_CHECK_CODE(code, lino, _exit); - - // merger - for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) { - SDataIter *pIter = &pCommitter->aDataIter[iStt]; - pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); - if (pIter->aSttBlk == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tBlockDataCreate(&pIter->bData); - TSDB_CHECK_CODE(code, lino, _exit); - } - - // writer - pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - if (pCommitter->dWriter.aBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - pCommitter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); - if (pCommitter->dWriter.aSttBlk == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tBlockDataCreate(&pCommitter->dWriter.bData); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tBlockDataCreate(&pCommitter->dWriter.bDatal); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static void tsdbCommitDataEnd(SCommitter *pCommitter) { - // reader - taosArrayDestroy(pCommitter->dReader.aBlockIdx); - tMapDataClear(&pCommitter->dReader.mBlock); - tBlockDataDestroy(&pCommitter->dReader.bData); - - // merger - for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) { - SDataIter *pIter = &pCommitter->aDataIter[iStt]; - taosArrayDestroy(pIter->aSttBlk); - tBlockDataDestroy(&pIter->bData); - } - - // writer - taosArrayDestroy(pCommitter->dWriter.aBlockIdx); - taosArrayDestroy(pCommitter->dWriter.aSttBlk); - tMapDataClear(&pCommitter->dWriter.mBlock); - tBlockDataDestroy(&pCommitter->dWriter.bData); - tBlockDataDestroy(&pCommitter->dWriter.bDatal); - tDestroyTSchema(pCommitter->skmTable.pTSchema); - tDestroyTSchema(pCommitter->skmRow.pTSchema); -} - -static int32_t tsdbCommitData(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - - STsdb *pTsdb = pCommitter->pTsdb; - SMemTable *pMemTable = pTsdb->imem; - - // check - if (pMemTable->nRow == 0) goto _exit; - - // start ==================== - code = tsdbCommitDataStart(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - - // impl ==================== - pCommitter->nextKey = pMemTable->minKey; - while (pCommitter->nextKey < TSKEY_MAX) { - code = tsdbCommitFileData(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - } - - // end ==================== - tsdbCommitDataEnd(pCommitter); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitDel(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - - STsdb *pTsdb = pCommitter->pTsdb; - SMemTable *pMemTable = pTsdb->imem; - - if (pMemTable->nDel == 0) { - goto _exit; - } - - // start - code = tsdbCommitDelStart(pCommitter); - if (code) { - TSDB_CHECK_CODE(code, lino, _exit); - } - - // impl - int32_t iDelIdx = 0; - int32_t nDelIdx = taosArrayGetSize(pCommitter->aDelIdx); - int32_t iTbData = 0; - int32_t nTbData = taosArrayGetSize(pCommitter->aTbDataP); - STbData *pTbData; - SDelIdx *pDelIdx; - - ASSERT(nTbData > 0); - - pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData); - pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL; - while (true) { - if (pTbData == NULL && pDelIdx == NULL) break; - - if (pTbData && pDelIdx) { - int32_t c = tTABLEIDCmprFn(pTbData, pDelIdx); - - if (c == 0) { - goto _commit_mem_and_disk_del; - } else if (c < 0) { - goto _commit_mem_del; - } else { - goto _commit_disk_del; - } - } else if (pTbData) { - goto _commit_mem_del; - } else { - goto _commit_disk_del; - } - - _commit_mem_del: - code = tsdbCommitTableDel(pCommitter, pTbData, NULL); - TSDB_CHECK_CODE(code, lino, _exit); - - iTbData++; - pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL; - continue; - - _commit_disk_del: - code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - iDelIdx++; - pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL; - continue; - - _commit_mem_and_disk_del: - code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - iTbData++; - pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL; - iDelIdx++; - pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL; - continue; - } - - // end - code = tsdbCommitDelEnd(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } else { - tsdbDebug("vgId:%d, commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel); + tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); } return code; } -static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) { +static int32_t tsdbCommitterClose(SCommitter *pCommiter, int32_t eno) { int32_t code = 0; - int32_t lino = 0; - STsdb *pTsdb = pCommitter->pTsdb; - - if (eno) { - code = eno; - TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tsdbFSPrepareCommit(pCommitter->pTsdb, &pCommitter->fs); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - tsdbFSDestroy(&pCommitter->fs); - taosArrayDestroy(pCommitter->aTbDataP); - pCommitter->aTbDataP = NULL; - if (code || eno) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } else { - tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode)); - } - return code; -} - -// ================================================================================ - -static FORCE_INLINE SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) { - return (pCommitter->pIter) ? &pCommitter->pIter->r : NULL; -} - -static int32_t tsdbNextCommitRow(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - - if (pCommitter->pIter) { - SDataIter *pIter = pCommitter->pIter; - if (pCommitter->pIter->type == MEMORY_DATA_ITER) { // memory - tsdbTbDataIterNext(&pIter->iter); - TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter); - while (true) { - if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { - pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow)); - pRow = NULL; - } - - if (pRow) { - pIter->r.suid = pIter->iter.pTbData->suid; - pIter->r.uid = pIter->iter.pTbData->uid; - pIter->r.row = *pRow; - break; - } - - pIter->iTbDataP++; - if (pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP)) { - STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP); - TSDBKEY keyFrom = {.ts = pCommitter->minKey, .version = VERSION_MIN}; - tsdbTbDataIterOpen(pTbData, &keyFrom, 0, &pIter->iter); - pRow = tsdbTbDataIterGet(&pIter->iter); - continue; - } else { - pCommitter->pIter = NULL; - break; - } - } - } else if (pCommitter->pIter->type == STT_DATA_ITER) { // last file - pIter->iRow++; - if (pIter->iRow < pIter->bData.nRow) { - pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow]; - pIter->r.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); - } else { - pIter->iSttBlk++; - if (pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk)) { - SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk); - - code = tsdbReadSttBlockEx(pCommitter->dReader.pReader, pIter->iStt, pSttBlk, &pIter->bData); - if (code) goto _exit; - - pIter->iRow = 0; - pIter->r.suid = pIter->bData.suid; - pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0]; - pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0); - } else { - pCommitter->pIter = NULL; - } - } - } else { - ASSERT(0); - } - - // compare with min in RB Tree - pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt); - if (pCommitter->pIter && pIter) { - int32_t c = tRowInfoCmprFn(&pCommitter->pIter->r, &pIter->r); - if (c > 0) { - tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter); - pCommitter->pIter = NULL; - } else { - ASSERT(c); - } - } - } - - if (pCommitter->pIter == NULL) { - pCommitter->pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt); - if (pCommitter->pIter) { - tRBTreeDrop(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter); - } - } - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) { - int32_t code = 0; - int32_t lino = 0; - - SBlockData *pBlockData = &pCommitter->dWriter.bData; - SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); - TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid}; - - tBlockDataClear(pBlockData); - while (pRowInfo) { - code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbNextCommitRow(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - - pRowInfo = tsdbGetCommitRow(pCommitter); - if (pRowInfo) { - if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) { - pRowInfo = NULL; - } else { - TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row); - if (tsdbKeyCmprFn(&tKey, &pDataBlk->minKey) >= 0) pRowInfo = NULL; - } - } - - if (pBlockData->nRow >= pCommitter->maxRow) { - code = - tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) { - int32_t code = 0; - int32_t lino = 0; - - SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); - TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid}; - SBlockData *pBDataR = &pCommitter->dReader.bData; - SBlockData *pBDataW = &pCommitter->dWriter.bData; - - code = tsdbReadDataBlock(pCommitter->dReader.pReader, pDataBlk, pBDataR); - TSDB_CHECK_CODE(code, lino, _exit); - - tBlockDataClear(pBDataW); - int32_t iRow = 0; - TSDBROW row = tsdbRowFromBlockData(pBDataR, 0); - TSDBROW *pRow = &row; - - while (pRow && pRowInfo) { - int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row); - if (c < 0) { - code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid); - TSDB_CHECK_CODE(code, lino, _exit); - - iRow++; - if (iRow < pBDataR->nRow) { - row = tsdbRowFromBlockData(pBDataR, iRow); - } else { - pRow = NULL; - } - } else if (c > 0) { - code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tBlockDataAppendRow(pBDataW, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbNextCommitRow(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - - pRowInfo = tsdbGetCommitRow(pCommitter); - if (pRowInfo) { - if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) { - pRowInfo = NULL; - } else { - TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row); - if (tsdbKeyCmprFn(&tKey, &pDataBlk->maxKey) > 0) pRowInfo = NULL; - } - } - } else { - ASSERT(0 && "dup rows not allowed"); - } - - if (pBDataW->nRow >= pCommitter->maxRow) { - code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - while (pRow) { - code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid); - TSDB_CHECK_CODE(code, lino, _exit); - - iRow++; - if (iRow < pBDataR->nRow) { - row = tsdbRowFromBlockData(pBDataR, iRow); - } else { - pRow = NULL; - } - - if (pBDataW->nRow >= pCommitter->maxRow) { - code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) { - int32_t code = 0; - int32_t lino = 0; - - SBlockIdx *pBlockIdx = pCommitter->dReader.pBlockIdx; - - ASSERT(pBlockIdx == NULL || tTABLEIDCmprFn(pBlockIdx, &id) >= 0); - if (pBlockIdx && pBlockIdx->suid == id.suid && pBlockIdx->uid == id.uid) { - int32_t iBlock = 0; - SDataBlk block; - SDataBlk *pDataBlk = █ - SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); - - ASSERT(pRowInfo->suid == id.suid && pRowInfo->uid == id.uid); - - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk); - while (pDataBlk && pRowInfo) { - SDataBlk tBlock = {.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)}; - int32_t c = tDataBlkCmprFn(pDataBlk, &tBlock); - - if (c < 0) { - code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - iBlock++; - if (iBlock < pCommitter->dReader.mBlock.nItem) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk); - } else { - pDataBlk = NULL; - } - } else if (c > 0) { - code = tsdbCommitAheadBlock(pCommitter, pDataBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - pRowInfo = tsdbGetCommitRow(pCommitter); - if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL; - } else { - code = tsdbCommitMergeBlock(pCommitter, pDataBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - iBlock++; - if (iBlock < pCommitter->dReader.mBlock.nItem) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk); - } else { - pDataBlk = NULL; - } - pRowInfo = tsdbGetCommitRow(pCommitter); - if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL; - } - } - - while (pDataBlk) { - code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk); - TSDB_CHECK_CODE(code, lino, _exit); - - iBlock++; - if (iBlock < pCommitter->dReader.mBlock.nItem) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk); - } else { - pDataBlk = NULL; - } - } - - code = tsdbCommitterNextTableData(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbInitSttBlockBuilderIfNeed(SCommitter *pCommitter, TABLEID id) { - int32_t code = 0; - int32_t lino = 0; - - SBlockData *pBData = &pCommitter->dWriter.bDatal; - if (pBData->suid || pBData->uid) { - if (!TABLE_SAME_SCHEMA(pBData->suid, pBData->uid, id.suid, id.uid)) { - code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - - tBlockDataReset(pBData); - } - } - - if (!pBData->suid && !pBData->uid) { - ASSERT(pCommitter->skmTable.suid == id.suid); - ASSERT(pCommitter->skmTable.uid == id.uid); - TABLEID tid = {.suid = id.suid, .uid = id.suid ? 0 : id.uid}; - code = tBlockDataInit(pBData, &tid, pCommitter->skmTable.pTSchema, NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - - SBlockData *pBData = &pCommitter->dWriter.bData; - TABLEID id = {.suid = pBData->suid, .uid = pBData->uid}; - - code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id); - TSDB_CHECK_CODE(code, lino, _exit); - - for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) { - TSDBROW row = tsdbRowFromBlockData(pBData, iRow); - - code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &row, NULL, id.uid); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pCommitter->dWriter.bDatal.nRow >= pCommitter->maxRow) { - code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk, - pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { - int32_t code = 0; - int32_t lino = 0; - - SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); - if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) { - pRowInfo = NULL; - } - - if (pRowInfo == NULL) goto _exit; - - if (pCommitter->toLastOnly) { - code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id); - TSDB_CHECK_CODE(code, lino, _exit); - - while (pRowInfo) { - STSchema *pTSchema = NULL; - if (pRowInfo->row.type == TSDBROW_ROW_FMT) { - code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); - TSDB_CHECK_CODE(code, lino, _exit); - pTSchema = pCommitter->skmRow.pTSchema; - } - - code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &pRowInfo->row, pTSchema, id.uid); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbNextCommitRow(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - - pRowInfo = tsdbGetCommitRow(pCommitter); - if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) { - pRowInfo = NULL; - } - - if (pCommitter->dWriter.bDatal.nRow >= pCommitter->maxRow) { - code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk, - pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - } else { - SBlockData *pBData = &pCommitter->dWriter.bData; - ASSERT(pBData->nRow == 0); - - while (pRowInfo) { - STSchema *pTSchema = NULL; - if (pRowInfo->row.type == TSDBROW_ROW_FMT) { - code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); - TSDB_CHECK_CODE(code, lino, _exit); - pTSchema = pCommitter->skmRow.pTSchema; - } - - code = tBlockDataAppendRow(pBData, &pRowInfo->row, pTSchema, id.uid); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbNextCommitRow(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - - pRowInfo = tsdbGetCommitRow(pCommitter); - if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) { - pRowInfo = NULL; - } - - if (pBData->nRow >= pCommitter->maxRow) { - code = - tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - if (pBData->nRow) { - if (pBData->nRow > pCommitter->minRow) { - code = - tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tsdbAppendLastBlock(pCommitter); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - } - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } - return code; -} - -static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { - int32_t code = 0; - int32_t lino = 0; - - SRowInfo *pRowInfo; - TABLEID id = {0}; - while ((pRowInfo = tsdbGetCommitRow(pCommitter)) != NULL) { - ASSERT(pRowInfo->suid != id.suid || pRowInfo->uid != id.uid); - id.suid = pRowInfo->suid; - id.uid = pRowInfo->uid; - - code = tsdbMoveCommitData(pCommitter, id); - TSDB_CHECK_CODE(code, lino, _exit); - - // start - tMapDataReset(&pCommitter->dWriter.mBlock); - - // impl - code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable); - TSDB_CHECK_CODE(code, lino, _exit); - code = tBlockDataInit(&pCommitter->dReader.bData, &id, pCommitter->skmTable.pTSchema, NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); - code = tBlockDataInit(&pCommitter->dWriter.bData, &id, pCommitter->skmTable.pTSchema, NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); - - /* merge with data in .data file */ - code = tsdbMergeTableData(pCommitter, id); - TSDB_CHECK_CODE(code, lino, _exit); - - /* handle remain table data */ - code = tsdbCommitTableData(pCommitter, id); - TSDB_CHECK_CODE(code, lino, _exit); - - // end - if (pCommitter->dWriter.mBlock.nItem > 0) { - SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid}; - code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx); - TSDB_CHECK_CODE(code, lino, _exit); - - if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - } - } - - id.suid = INT64_MAX; - id.uid = INT64_MAX; - code = tsdbMoveCommitData(pCommitter, id); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk, - pCommitter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino, - tstrerror(code)); - } + // TODO return code; } @@ -1365,7 +123,6 @@ int32_t tsdbPreCommit(STsdb *pTsdb) { pTsdb->imem = pTsdb->mem; pTsdb->mem = NULL; taosThreadRwlockUnlock(&pTsdb->rwLock); - return 0; } @@ -1374,42 +131,38 @@ int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo) { int32_t code = 0; int32_t lino = 0; - SCommitter commith; - SMemTable *pMemTable = pTsdb->imem; + SMemTable *pMem = pTsdb->imem; - // check - if (pMemTable->nRow == 0 && pMemTable->nDel == 0) { + if (pMem->nRow == 0 && pMem->nDel == 0) { taosThreadRwlockWrlock(&pTsdb->rwLock); pTsdb->imem = NULL; taosThreadRwlockUnlock(&pTsdb->rwLock); + tsdbUnrefMemTable(pMem, NULL, true); + } else { + SCommitter committer; + int8_t done = 0; - tsdbUnrefMemTable(pMemTable, NULL, true); - goto _exit; + code = tsdbCommitterOpen(pTsdb, pInfo, &committer); + TSDB_CHECK_CODE(code, lino, _exit); + + while (!done && (code = tsdbCommitNextFSet(&committer, &done))) { + } + + code = tsdbCommitterClose(&committer, code); + TSDB_CHECK_CODE(code, lino, _exit); } - // start commit - code = tsdbStartCommit(pTsdb, &commith, pInfo); - TSDB_CHECK_CODE(code, lino, _exit); - - // commit impl - code = tsdbCommitData(&commith); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbCommitDel(&commith); - TSDB_CHECK_CODE(code, lino, _exit); - - // end commit - code = tsdbEndCommit(&commith, 0); - TSDB_CHECK_CODE(code, lino, _exit); - _exit: if (code) { - tsdbEndCommit(&commith, code); - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, pMem->nRow, + pMem->nDel); } return code; } +#if 0 int32_t tsdbCommitCommit(STsdb *pTsdb) { int32_t code = 0; int32_t lino = 0; @@ -1455,4 +208,5 @@ _exit: tsdbInfo("vgId:%d, tsdb rollback commit", TD_VID(pTsdb->pVnode)); } return code; -} \ No newline at end of file +} +#endif \ No newline at end of file