diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index fcb58e91b3..0824faf390 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -654,394 +654,6 @@ _err: return code; } -#if 0 -static int32_t tsdbMergeCommitDataBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) { - int32_t code = 0; - STbData *pTbData = pIter->pTbData; - SBlockData *pBlockDataR = &pCommitter->dReader.bData; - SBlockData *pBlockDataW = &pCommitter->dWriter.bData; - - code = tsdbReadDataBlock(pCommitter->dReader.pReader, pBlock, pBlockDataR); - if (code) goto _err; - - tBlockDataClear(pBlockDataW); - int32_t iRow = 0; - TSDBROW row; - TSDBROW *pRow1 = tsdbTbDataIterGet(pIter); - TSDBROW *pRow2 = &row; - *pRow2 = tsdbRowFromBlockData(pBlockDataR, iRow); - while (pRow1 && pRow2) { - int32_t c = tsdbRowCmprFn(pRow1, pRow2); - - if (c < 0) { - code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow1)); - if (code) goto _err; - - code = tBlockDataAppendRow(pBlockDataW, pRow1, pCommitter->skmRow.pTSchema, pTbData->uid); - if (code) goto _err; - - // next - tsdbTbDataIterNext(pIter); - pRow1 = tsdbTbDataIterGet(pIter); - } else if (c > 0) { - code = tBlockDataAppendRow(pBlockDataW, pRow2, NULL, pTbData->uid); - if (code) goto _err; - - iRow++; - if (iRow < pBlockDataR->nRow) { - *pRow2 = tsdbRowFromBlockData(pBlockDataR, iRow); - } else { - pRow2 = NULL; - } - } else { - ASSERT(0); - } - - // check - if (pBlockDataW->nRow >= pCommitter->maxRow * 4 / 5) { - code = tsdbCommitDataBlock(pCommitter, NULL); - if (code) goto _err; - } - } - - while (pRow2) { - code = tBlockDataAppendRow(pBlockDataW, pRow2, NULL, pTbData->uid); - if (code) goto _err; - - iRow++; - if (iRow < pBlockDataR->nRow) { - *pRow2 = tsdbRowFromBlockData(pBlockDataR, iRow); - } else { - pRow2 = NULL; - } - - // check - if (pBlockDataW->nRow >= pCommitter->maxRow * 4 / 5) { - code = tsdbCommitDataBlock(pCommitter, NULL); - if (code) goto _err; - } - } - - // check - if (pBlockDataW->nRow > 0) { - code = tsdbCommitDataBlock(pCommitter, NULL); - if (code) goto _err; - } - - return code; - -_err: - tsdbError("vgId:%d, tsdb merge commit data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); - return code; -} - -static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey) { - int32_t code = 0; - STbData *pTbData = pIter->pTbData; - SBlockData *pBlockData = &pCommitter->dWriter.bData; - - tBlockDataClear(pBlockData); - TSDBROW *pRow = tsdbTbDataIterGet(pIter); - while (true) { - if (pRow == NULL) { - if (pBlockData->nRow > 0) { - goto _write_block; - } else { - break; - } - } - - // update schema - code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow)); - if (code) goto _err; - - // append - code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema, pTbData->uid); - if (code) goto _err; - - tsdbTbDataIterNext(pIter); - pRow = tsdbTbDataIterGet(pIter); - if (pRow) { - TSDBKEY rowKey = TSDBROW_KEY(pRow); - if (tsdbKeyCmprFn(&rowKey, &toKey) >= 0) { - pRow = NULL; - } - } - - if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { - _write_block: - code = tsdbCommitDataBlock(pCommitter, NULL); - if (code) goto _err; - } - } - - return code; - -_err: - tsdbError("vgId:%d, tsdb commit table mem data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); - return code; -} - -static int32_t tsdbGetNumOfRowsLessThan(STbDataIter *pIter, TSDBKEY key) { - int32_t nRow = 0; - - STbDataIter iter = *pIter; - while (true) { - TSDBROW *pRow = tsdbTbDataIterGet(&iter); - if (pRow == NULL) break; - - int32_t c = tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &key); - if (c < 0) { - nRow++; - tsdbTbDataIterNext(&iter); - } else if (c > 0) { - break; - } else { - ASSERT(0); - } - } - - return nRow; -} - -static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) { - int32_t code = 0; - STbData *pTbData = pIter->pTbData; - SBlockData *pBlockData = &pCommitter->dWriter.bData; - - tBlockDataClear(pBlockData); - TSDBROW *pRow = tsdbTbDataIterGet(pIter); - while (true) { - if (pRow == NULL) break; - - code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow)); - if (code) goto _err; - - code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema, pTbData->uid); - if (code) goto _err; - - tsdbTbDataIterNext(pIter); - pRow = tsdbTbDataIterGet(pIter); - if (pRow) { - TSDBKEY rowKey = TSDBROW_KEY(pRow); - if (tsdbKeyCmprFn(&rowKey, &pBlock->maxKey) > 0) { - pRow = NULL; - } - } - } - - ASSERT(pBlockData->nRow > 0 && pBlock->nRow + pBlockData->nRow <= pCommitter->maxRow); - - code = tsdbCommitDataBlock(pCommitter, pBlock); - if (code) goto _err; - - return code; - -_err: - tsdbError("vgId:%d, tsdb merge as subblock failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); - return code; -} - -static int32_t tsdbCommitLastFile(SCommitter *pCommitter, STbDataIter *pIter) { - int32_t code = 0; - STbData *pTbData = pIter->pTbData; - SBlockData *pBlockData = &pCommitter->dWriter.bDatal; - TSDBROW *pRow = tsdbTbDataIterGet(pIter); - - if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { - pRow = NULL; - } - - if (pRow == NULL) goto _exit; - - if (pBlockData->suid || pBlockData->uid) { - if (pBlockData->suid != pTbData->suid || pBlockData->suid == 0) { - if (pBlockData->nRow > 0) { - code = tsdbCommitLastBlock(pCommitter); - if (code) goto _err; - } - - tBlockDataReset(pBlockData); - } - } - - if (!pBlockData->suid && !pBlockData->uid) { - code = tBlockDataInit(pBlockData, pTbData->suid, 0, pCommitter->skmTable.pTSchema); - if (code) goto _err; - } - - while (pRow) { - code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow)); - if (code) goto _err; - - code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema, pTbData->uid); - if (code) goto _err; - - tsdbTbDataIterNext(pIter); - pRow = tsdbTbDataIterGet(pIter); - if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { - pRow = NULL; - } - - if (pBlockData->nRow >= pCommitter->maxRow) { - code = tsdbCommitLastBlock(pCommitter); - if (code) goto _err; - } - } - -_exit: - return code; - -_err: - tsdbError("vgId:%d tsdb merge commit last failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); - return code; -} - -static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter) { - int32_t code = 0; - STbData *pTbData = pIter->pTbData; - int32_t iBlock = 0; - SBlock block; - SBlock *pBlock = █ - TSDBROW *pRow = tsdbTbDataIterGet(pIter); - - if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pTbData, pCommitter->dReader.pBlockIdx) == 0) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); - } else { - pBlock = NULL; - } - - while (pBlock && pRow) { - SBlock tBlock = {.minKey = TSDBROW_KEY(pRow), .maxKey = TSDBROW_KEY(pRow)}; - int32_t c = tBlockCmprFn(pBlock, &tBlock); - - if (c < 0) { - code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock); - if (code) goto _err; - - iBlock++; - if (iBlock < pCommitter->dReader.mBlock.nItem) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); - } else { - pBlock = NULL; - } - } else if (c > 0) { - code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey); - if (code) goto _err; - - pRow = tsdbTbDataIterGet(pIter); - if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { - pRow = NULL; - } - } else { - int32_t nOvlp = tsdbGetNumOfRowsLessThan(pIter, pBlock->maxKey); - - ASSERT(nOvlp > 0); - - if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) { - code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock); - if (code) goto _err; - } else { - code = tsdbMergeCommitDataBlock(pCommitter, pIter, pBlock); - if (code) goto _err; - } - - // next - pRow = tsdbTbDataIterGet(pIter); - if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { - pRow = NULL; - } - iBlock++; - if (iBlock < pCommitter->dReader.mBlock.nItem) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); - } else { - pBlock = NULL; - } - } - } - - while (pBlock) { - code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock); - if (code) goto _err; - - iBlock++; - if (iBlock < pCommitter->dReader.mBlock.nItem) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); - } else { - pBlock = NULL; - } - } - -_exit: - return code; - -_err: - return code; -} - -static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { - int32_t code = 0; - - ASSERT(pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, pTbData) >= 0); - - // merge commit table data - STbDataIter iter = {0}; - TSDBROW *pRow; - - tMapDataReset(&pCommitter->dWriter.mBlock); - - tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, &iter); - pRow = tsdbTbDataIterGet(&iter); - if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { - pRow = NULL; - } - if (pRow == NULL) { - if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, pTbData) == 0) { - code = tMapDataCopy(&pCommitter->dReader.mBlock, &pCommitter->dWriter.mBlock); - if (code) goto _err; - } - goto _exit; - } - - code = tsdbCommitterUpdateTableSchema(pCommitter, pTbData->suid, pTbData->uid); - if (code) goto _err; - code = tBlockDataInit(&pCommitter->dReader.bData, pTbData->suid, pTbData->uid, pCommitter->skmTable.pTSchema); - if (code) goto _err; - code = tBlockDataInit(&pCommitter->dWriter.bData, pTbData->suid, pTbData->uid, pCommitter->skmTable.pTSchema); - if (code) goto _err; - - // commit data - code = tsdbMergeCommitData(pCommitter, &iter); - if (code) goto _err; - - // commit last - code = tsdbCommitLastFile(pCommitter, &iter); - if (code) goto _err; - -_exit: - if (pCommitter->dWriter.mBlock.nItem > 0) { - SBlockIdx blockIdx = {.suid = pTbData->suid, .uid = pTbData->uid}; - code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx); - if (code) goto _err; - - if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - } - pRow = tsdbTbDataIterGet(&iter); - if (pRow) { - pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow)); - } - - return code; - -_err: - tsdbError("vgId:%d tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); - return code; -} -#endif - static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { int32_t code = 0; @@ -1112,38 +724,9 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) { code = tsdbCommitFileDataStart(pCommitter); if (code) goto _err; -#if 1 // impl code = tsdbCommitFileDataImpl(pCommitter); if (code) goto _err; -#else - // commit file data impl - for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) { - STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData); - - // move commit until current (suid, uid) - code = tsdbMoveCommitData(pCommitter, *(TABLEID *)pTbData); - if (code) goto _err; - - // commit current table data - code = tsdbCommitTableData(pCommitter, pTbData); - if (code) goto _err; - - // move next reader table data if need - if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pTbData, pCommitter->dReader.pBlockIdx) == 0) { - code = tsdbCommitterNextTableData(pCommitter); - if (code) goto _err; - } - } - - code = tsdbMoveCommitData(pCommitter, (TABLEID){.suid = INT64_MAX, .uid = INT64_MAX}); - if (code) goto _err; - - if (pCommitter->dWriter.bDatal.nRow > 0) { - code = tsdbCommitLastBlock(pCommitter); - if (code) goto _err; - } -#endif // commit file data end code = tsdbCommitFileDataEnd(pCommitter);