diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 4948d8a0b1..6c24396810 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -19,20 +19,18 @@ typedef struct { SMemTable *pMemTable; int32_t minutes; int8_t precision; - int32_t sfid; - int32_t efid; + TSKEY nCommitKey; SReadH readh; SDFileSet wSet; - SArray *aDelInfo; SArray *aBlkIdx; SArray *aSupBlk; SArray *aSubBlk; + SArray *aDelInfo; } SCommitH; -static int32_t tsdbStartCommit(SCommitH *pCHandle, STsdb *pTsdb); -static int32_t tsdbEndCommit(SCommitH *pCHandle); -static int32_t tsdbCommitToFile(SCommitH *pCHandle, int32_t fid); -static int32_t tsdbCommitDelete(SCommitH *pCHandle); +static int32_t tsdbCommitStart(SCommitH *pCHandle, STsdb *pTsdb); +static int32_t tsdbCommitEnd(SCommitH *pCHandle); +static int32_t tsdbCommitImpl(SCommitH *pCHandle); int32_t tsdbBegin2(STsdb *pTsdb) { int32_t code = 0; @@ -53,26 +51,19 @@ int32_t tsdbCommit2(STsdb *pTsdb) { SCommitH ch = {0}; // start to commit - code = tsdbStartCommit(&ch, pTsdb); + code = tsdbCommitStart(&ch, pTsdb); if (code) { goto _exit; } // commit - for (int32_t fid = ch.sfid; fid <= ch.efid; fid++) { - code = tsdbCommitToFile(&ch, fid); - if (code) { - goto _err; - } - } - - code = tsdbCommitDelete(&ch); + code = tsdbCommitImpl(&ch); if (code) { goto _err; } // end commit - code = tsdbEndCommit(&ch); + code = tsdbCommitEnd(&ch); if (code) { goto _exit; } @@ -81,11 +72,11 @@ _exit: return code; _err: - // TODO: rollback + tsdbError("vgId:%d failed to commit since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } -static int32_t tsdbStartCommit(SCommitH *pCHandle, STsdb *pTsdb) { +static int32_t tsdbCommitStart(SCommitH *pCHandle, STsdb *pTsdb) { int32_t code = 0; SMemTable *pMemTable = (SMemTable *)pTsdb->mem; @@ -100,28 +91,32 @@ static int32_t tsdbStartCommit(SCommitH *pCHandle, STsdb *pTsdb) { pCHandle->pMemTable = pMemTable; pCHandle->minutes = pTsdb->keepCfg.days; pCHandle->precision = pTsdb->keepCfg.precision; - pCHandle->sfid = TSDB_KEY_FID(pMemTable->minKey.ts, pCHandle->minutes, pCHandle->precision); - pCHandle->efid = TSDB_KEY_FID(pMemTable->maxKey.ts, pCHandle->minutes, pCHandle->precision); + pCHandle->nCommitKey = pMemTable->minKey.ts; code = tsdbInitReadH(&pCHandle->readh, pTsdb); if (code) { goto _err; } - pCHandle->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx)); + pCHandle->aBlkIdx = taosArrayInit(0, sizeof(SBlockIdx)); if (pCHandle->aBlkIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - pCHandle->aSupBlk = taosArrayInit(1024, sizeof(SBlock)); + pCHandle->aSupBlk = taosArrayInit(0, sizeof(SBlock)); if (pCHandle->aSupBlk == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - pCHandle->aSubBlk = taosArrayInit(1024, sizeof(SBlock)); + pCHandle->aSubBlk = taosArrayInit(0, sizeof(SBlock)); if (pCHandle->aSubBlk == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } + pCHandle->aDelInfo = taosArrayInit(0, sizeof(SDelInfo)); + if (pCHandle->aDelInfo == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } // start FS transaction tsdbStartFSTxn(pTsdb, 0, 0); @@ -132,7 +127,7 @@ _err: return code; } -static int32_t tsdbEndCommit(SCommitH *pCHandle) { +static int32_t tsdbCommitEnd(SCommitH *pCHandle) { int32_t code = 0; STsdb *pTsdb = pCHandle->pMemTable->pTsdb; SMemTable *pMemTable = (SMemTable *)pTsdb->imem; @@ -144,6 +139,7 @@ static int32_t tsdbEndCommit(SCommitH *pCHandle) { } // close handle + taosArrayClear(pCHandle->aDelInfo); taosArrayClear(pCHandle->aSubBlk); taosArrayClear(pCHandle->aSupBlk); taosArrayClear(pCHandle->aBlkIdx); @@ -160,10 +156,29 @@ _err: return code; } -static int32_t tsdbCommitTableData(SCommitH *pCHandle, SMemData *pMemData, SBlockIdx *pBlockIdx) { +static int32_t tsdbCommitTableStart(SCommitH *pCHandle) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t tsdbCommitTableEnd(SCommitH *pCHandle) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t tsdbCommitTable(SCommitH *pCHandle, SMemData *pMemData, SBlockIdx *pBlockIdx) { int32_t code = 0; SMemDataIter iter = {0}; + // commit table start + code = tsdbCommitTableStart(pCHandle); + if (code) { + goto _err; + } + + // commit table impl if (pMemData && pBlockIdx) { // merge } else if (pMemData) { @@ -172,6 +187,15 @@ static int32_t tsdbCommitTableData(SCommitH *pCHandle, SMemData *pMemData, SBloc // save old ones } + // commit table end + code = tsdbCommitTableEnd(pCHandle); + if (code) { + goto _err; + } + + return code; + +_err: return code; } @@ -193,38 +217,58 @@ static int32_t tsdbTableIdCmprFn(const void *p1, const void *p2) { return 0; } -static int32_t tsdbCommitToFile(SCommitH *pCHandle, int32_t fid) { + +static int32_t tsdbWriteBlockIdx(SDFile *pFile, SArray *pArray, uint8_t **ppBuf) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t tsdbCommitFileStart(SCommitH *pCHandle, int32_t fid) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t tsdbCommitFileEnd(SCommitH *pCHandle) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t tsdbCommitFile(SCommitH *pCHandle, int32_t fid) { int32_t code = 0; SMemDataIter iter = {0}; TSDBROW *pRow = NULL; int8_t hasData = 0; TSKEY fidSKey; TSKEY fidEKey; - int32_t iMemData = 0; - int32_t nMemData = taosArrayGetSize(pCHandle->pMemTable->aMemData); - int32_t iBlockIdx = 0; + int32_t iMemData; + int32_t nMemData; + int32_t iBlockIdx; int32_t nBlockIdx; - // check if there are data in the time range - for (; iMemData < nMemData; iMemData++) { - SMemData *pMemData = (SMemData *)taosArrayGetP(pCHandle->pMemTable->aMemData, iMemData); - tsdbMemDataIterOpen(pMemData, &(TSDBKEY){.ts = fidSKey, .version = 0}, 0, &iter); - tsdbMemDataIterGet(&iter, &pRow); - - if (pRow->tsRow.ts >= fidSKey && pRow->tsRow.ts <= fidEKey) { - hasData = 1; - break; - } - } - - if (!hasData) return code; + pCHandle->nCommitKey = TSKEY_MAX; // create or open the file to commit(todo) + code = tsdbCommitFileStart(pCHandle, fid); + if (code) { + goto _err; + } // loop to commit each table data + iMemData = 0; + nMemData = taosArrayGetSize(pCHandle->pMemTable->aMemData); + iBlockIdx = 0; nBlockIdx = 0; for (;;) { - if (iBlockIdx >= nBlockIdx && iMemData >= nMemData) break; + if (iBlockIdx >= nBlockIdx && iMemData >= nMemData) { + // code = tsdbWriteBlockIdx(); + // if (code) { + // goto _err; + // } + break; + } SMemData *pMemData = NULL; SBlockIdx *pBlockIdx = NULL; @@ -256,12 +300,42 @@ static int32_t tsdbCommitToFile(SCommitH *pCHandle, int32_t fid) { } } - code = tsdbCommitTableData(pCHandle, pMemData, pBlockIdx); + code = tsdbCommitTable(pCHandle, pMemData, pBlockIdx); if (code) { goto _err; } } + // close file + code = tsdbCommitFileEnd(pCHandle); + if (code) { + goto _err; + } + + return code; + +_err: + return code; +} + +static int32_t tsdbCommitData(SCommitH *pCHandle) { + int32_t code = 0; + int32_t fid; + + if (pCHandle->pMemTable->nRows == 0) goto _exit; + + // loop to commit to each file + for (;;) { + if (pCHandle->nCommitKey == TSKEY_MAX) break; + + fid = TSDB_KEY_FID(pCHandle->nCommitKey, pCHandle->minutes, pCHandle->precision); + code = tsdbCommitFile(pCHandle, fid); + if (code) { + goto _err; + } + } + +_exit: return code; _err: @@ -320,9 +394,46 @@ static int32_t tsdbCommitDelete(SCommitH *pCHandle) { taosArraySort(pCHandle->aDelInfo, delInfoCmprFn); + // write to new file + _exit: return code; +_err: + return code; +} + +static int32_t tsdbCommitCache(SCommitH *pCHandle) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t tsdbCommitImpl(SCommitH *pCHandle) { + int32_t code = 0; + + // commit data + code = tsdbCommitData(pCHandle); + if (code) { + goto _err; + } + + // commit delete + code = tsdbCommitDelete(pCHandle); + if (code) { + goto _err; + } + + // commit cache if need (todo) + if (0) { + code = tsdbCommitCache(pCHandle); + if (code) { + goto _err; + } + } + + return code; + _err: return code; } \ No newline at end of file