diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 386b93309d..08c4f57aca 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -431,162 +431,6 @@ _exit: return code; } -static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, STbData *pTbData) { - int32_t code = 0; - STsdb *pTsdb = pCommitter->pTsdb; - STbDataIter *pIter = &(STbDataIter){0}; - TSDBKEY key = {.ts = pCommitter->minKey, .version = VERSION_MIN}; - TSDBROW row; - TSDBROW *pRow; - - // create iter - tsdbTbDataIterOpen(pTbData, &key, 0, pIter); - pRow = tsdbTbDataIterGet(pIter); - - if (pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) goto _exit; - - // main loop - SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pTbData->suid, .uid = pTbData->uid}; - SMapData *mBlock = &pCommitter->nBlockMap; - SBlock *pBlock = &pCommitter->nBlock; - SBlockData *pBlockData = &pCommitter->nBlockData; - TSKEY lastTS; - - tBlockIdxReset(pBlockIdx); - tMapDataReset(mBlock); - tBlockReset(pBlock); - tBlockDataReset(pBlockData); - lastTS = TSKEY_MIN; - while (1) { - if (pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) { - if (pBlockData->nRow > 0) { - goto _write_block; - } else { - break; - } - } - - // update schema - code = tsdbCommitterUpdateSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow)); - if (code) goto _err; - - // append - code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema); - if (code) goto _err; - - // update - pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow)); - pBlock->maxVersion = TMAX(pBlock->maxVersion, TSDBROW_VERSION(pRow)); - pBlock->nRow++; - if (TSDBROW_TS(pRow) == lastTS) pBlock->hasDup = 1; - lastTS = TSDBROW_TS(pRow); - - // next - tsdbTbDataIterNext(pIter); - pRow = tsdbTbDataIterGet(pIter); - - // check - if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block; - continue; - - _write_block: - row = tBlockDataFirstRow(pBlockData); - if (tsdbKeyCmprFn(&pBlock->minKey, &TSDBROW_KEY(&row)) > 0) pBlock->minKey = TSDBROW_KEY(&row); - row = tBlockDataLastRow(pBlockData); - if (tsdbKeyCmprFn(&pBlock->maxKey, &TSDBROW_KEY(&row)) < 0) pBlock->maxKey = TSDBROW_KEY(&row); - pBlock->last = pBlockData->nRow < pCommitter->minRow ? 1 : 0; - code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg); - if (code) goto _err; - - // Design SMA and write SMA to file - - // SBlockIdx - code = tMapDataPutItem(mBlock, pBlock, tPutBlock); - if (code) goto _err; - pBlockIdx->minKey = TMIN(pBlockIdx->minKey, pBlock->minKey.ts); - pBlockIdx->maxKey = TMAX(pBlockIdx->maxKey, pBlock->maxKey.ts); - pBlockIdx->minVersion = TMIN(pBlockIdx->minVersion, pBlock->minVersion); - pBlockIdx->maxVersion = TMAX(pBlockIdx->maxVersion, pBlock->maxVersion); - - tBlockReset(pBlock); - tBlockDataReset(pBlockData); - lastTS = TSKEY_MIN; - } - - // write block - code = tsdbWriteBlock(pCommitter->pWriter, mBlock, NULL, pBlockIdx); - if (code) goto _err; - - code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx); - if (code) goto _err; - -_exit: - if (pRow) pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow)); - return code; - -_err: - tsdbError("vgId:%d tsdb commit memory data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - return code; -} - -static int32_t tsdbCommitDiskData(SCommitter *pCommitter, SBlockIdx *oBlockIdx) { - int32_t code = 0; - SMapData *mBlockO = &pCommitter->oBlockMap; - SBlock *pBlockO = &pCommitter->oBlock; - SMapData *mBlockN = &pCommitter->nBlockMap; - SBlock *pBlockN = &pCommitter->nBlock; - SBlockIdx *pBlockIdx = &(SBlockIdx){0}; - SBlockData *pBlockDataO = &pCommitter->oBlockData; - - // read - code = tsdbReadBlock(pCommitter->pReader, oBlockIdx, mBlockO, NULL); - if (code) goto _err; - - // loop to add to new - tMapDataReset(mBlockN); - for (int32_t iBlock = 0; iBlock < mBlockO->nItem; iBlock++) { - tMapDataGetItemByIdx(mBlockO, iBlock, pBlockO, tGetBlock); - - if (pBlockO->last) { - ASSERT(iBlock == mBlockO->nItem - 1); - code = tsdbReadBlockData(pCommitter->pReader, oBlockIdx, pBlockO, pBlockDataO, NULL, NULL); - if (code) goto _err; - - tBlockReset(pBlockN); - pBlockN->minKey = pBlockO->minKey; - pBlockN->maxKey = pBlockO->maxKey; - pBlockN->minVersion = pBlockO->minVersion; - pBlockN->maxVersion = pBlockO->maxVersion; - pBlockN->nRow = pBlockO->nRow; - pBlockN->last = pBlockO->last; - pBlockN->hasDup = pBlockO->hasDup; - code = tsdbWriteBlockData(pCommitter->pWriter, pBlockDataO, NULL, NULL, pBlockIdx, pBlockN, pCommitter->cmprAlg); - if (code) goto _err; - - code = tMapDataPutItem(mBlockN, pBlockN, tPutBlock); - if (code) goto _err; - } else { - code = tMapDataPutItem(mBlockN, pBlockO, tPutBlock); - if (code) goto _err; - } - } - - // SBlock - *pBlockIdx = *oBlockIdx; - code = tsdbWriteBlock(pCommitter->pWriter, mBlockN, NULL, pBlockIdx); - if (code) goto _err; - - // SBlockIdx - code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx); - if (code) goto _err; - - return code; - -_err: - tsdbError("vgId:%d tsdb commit disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); - return code; -} - static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlockMerge, TSDBKEY toKey, int8_t toDataOnly) { int32_t code = 0; @@ -820,123 +664,6 @@ _err: return code; } -static int32_t tsdbMergeMemDisk(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *oBlockIdx) { - int32_t code = 0; - // STbDataIter *pIter = &(STbDataIter){0}; - // TSDBROW *pRow; - - // // create iter - // tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter); - // pRow == tsdbTbDataIterGet(pIter); - // if (pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) { - // code = tsdbCommitDiskData(pCommitter, oBlockIdx); - // if (code) { - // goto _err; - // } else { - // goto _exit; - // } - // } - - // // start ================== - // // read - // code = tsdbReadBlock(pCommitter->pReader, oBlockIdx, &pCommitter->oBlockMap, NULL); - // if (code) goto _err; - - // // loop to merge - // // SBlockData *pBlockData = &pCommitter->nBlockData; - // int32_t iBlock = 0; - // int32_t nBlock = pCommitter->oBlockMap.nItem; - // // SBlock *pBlockO = &pCommitter->oBlock; - // SBlock *pBlock; - // int32_t c; - - // // merge =================== - // while (true) { - // if ((pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) && pBlock == NULL) break; - - // if ((pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey) && pBlock) { - // if (pBlock->last) { - // // merge memory data and disk data to write to .data/.last (todo) - // code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock, - // (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); - // if (code) goto _err; - - // pRow = tsdbTbDataIterGet(pIter); - // iBlock++; - // } else { - // c = tBlockCmprFn(&(SBlock){}, pBlock); - - // if (c < 0) { - // // commit memory data until pBlock->minKey (not included) only to .data file (todo) - // code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1); - // if (code) goto _err; - - // pRow = tsdbTbDataIterGet(pIter); - // } else if (c > 0) { - // // just move the block (todo) - // // code = tsdbCommitTableDiskData(pCommitter, pBlock); - // if (code) goto _err; - - // iBlock++; - // // TODO - // } else { - // int64_t nOvlp = 0; // = tsdbOvlpRows(); - // if (nOvlp + pBlock->nRow <= pCommitter->maxRow) { - // // add as a subblock - // } else { - // if (iBlock == nBlock - 1) { - // // merge memory data and disk data to .data/.last file - // code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock, - // (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); - // if (code) goto _err; - // } else { - // // merge memory data and disk data to .data file only until pBlock[1]. - // code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock, (TSDBKEY){0} /*TODO*/, 1); - // } - // } - - // pRow = tsdbTbDataIterGet(pIter); - // iBlock++; - // } - // } - // } else if (pBlock) { - // // code = tsdbCommitTableDiskData(pCommitter, pBlock); - // if (code) goto _err; - - // iBlock++; - // // next block - // } else { - // // commit only memory data until (pCommitter->maxKey, VERSION_MAX) - // code = - // tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = - // VERSION_MIN}, 0); - // if (code) goto _err; - - // pRow = tsdbTbDataIterGet(pIter); - // } - // } - - // // end ===================== - // // SBlock - // // code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx); - // // if (code) goto _err; - - // // // SBlockIdx - // // code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx); - // // if (code) goto _err; - - // _exit: - // pRow = tsdbTbDataIterGet(pIter); - // if (pRow) { - // pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow)); - // } - return code; - - // _err: - // tsdbError("vgId:%d tsdb merge mem disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), - // tstrerror(code)); return code; -} - static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) { int32_t code = 0; SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = suid, .uid = uid}; @@ -1190,7 +917,6 @@ _err: static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { int32_t code = 0; - int32_t c; STsdb *pTsdb = pCommitter->pTsdb; SMemTable *pMemTable = pTsdb->imem; int32_t iTbData = 0; @@ -1209,41 +935,49 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { pBlockIdx = NULL; } - // merge - while (pTbData && pBlockIdx) { - c = tTABLEIDCmprFn(pTbData, pBlockIdx); + while (pTbData || pBlockIdx) { + if (pTbData && pBlockIdx) { + int32_t c = tTABLEIDCmprFn(pTbData, pBlockIdx); - if (c == 0) { - // merge commit - code = tsdbMergeMemDisk(pCommitter, pTbData, pBlockIdx); - if (code) goto _err; + if (c == 0) { + code = tsdbCommitTableData(pCommitter, pTbData, pBlockIdx); + if (code) goto _err; - iTbData++; - iBlockIdx++; - if (iTbData < nTbData) { - pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); - } else { - pTbData = NULL; - } - if (iBlockIdx < nBlockIdx) { - tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx); - } else { - pBlockIdx = NULL; - } - } else if (c < 0) { - // commit memory data - code = tsdbCommitMemoryData(pCommitter, pTbData); - if (code) goto _err; + iBlockIdx++; + if (iBlockIdx < nBlockIdx) { + tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx); + } else { + pBlockIdx = NULL; + } + iTbData++; + if (iTbData < nTbData) { + pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); + } else { + pTbData = NULL; + } + } else if (c < 0) { + code = tsdbCommitTableData(pCommitter, NULL, pBlockIdx); + if (code) goto _err; - iTbData++; - if (iTbData < nTbData) { - pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); + iBlockIdx++; + if (iBlockIdx < nBlockIdx) { + tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx); + } else { + pBlockIdx = NULL; + } } else { - pTbData = NULL; + code = tsdbCommitTableData(pCommitter, pTbData, NULL); + if (code) goto _err; + + iTbData++; + if (iTbData < nTbData) { + pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); + } else { + pTbData = NULL; + } } - } else { - // commit disk data - code = tsdbCommitDiskData(pCommitter, pBlockIdx); + } else if (pBlockIdx) { + code = tsdbCommitTableData(pCommitter, NULL, pBlockIdx); if (code) goto _err; iBlockIdx++; @@ -1252,34 +986,16 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { } else { pBlockIdx = NULL; } - } - } - - // disk - while (pBlockIdx) { - // commit disk data - code = tsdbCommitDiskData(pCommitter, pBlockIdx); - if (code) goto _err; - - iBlockIdx++; - if (iBlockIdx < nBlockIdx) { - tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx); } else { - pBlockIdx = NULL; - } - } + code = tsdbCommitTableData(pCommitter, pTbData, NULL); + if (code) goto _err; - // memory - while (pTbData) { - // commit memory data - code = tsdbCommitMemoryData(pCommitter, pTbData); - if (code) goto _err; - - iTbData++; - if (iTbData < nTbData) { - pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); - } else { - pTbData = NULL; + iTbData++; + if (iTbData < nTbData) { + pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); + } else { + pTbData = NULL; + } } }