From f9fd39eb58c354509750782ca5dd58baa640c0c5 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 27 Jun 2022 11:20:11 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/inc/tsdb.h | 1 + source/dnode/vnode/src/tsdb/tsdbCommit.c | 423 +++++++++++------------ source/dnode/vnode/src/tsdb/tsdbUtil.c | 4 + 3 files changed, 212 insertions(+), 216 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 87dbaa8d53..925fb2633c 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -91,6 +91,7 @@ typedef struct STsdbFSState STsdbFSState; void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow); +int32_t tsdbRowCmprFn(const void *p1, const void *p2); // SRowIter void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); SColVal *tRowIterNext(SRowIter *pIter); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 492fcdbaa5..397e5eb180 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -369,208 +369,6 @@ _err: // return nRow; // } -// static int32_t tsdbMergeCommitImpl(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock, -// int8_t toDataOnly) { -// int32_t code = 0; -// int32_t iRow = 0; -// int32_t nRow = 0; -// int32_t c; -// TSDBROW *pRow; -// SBlock block = tBlockInit(); -// TSDBKEY key1; -// TSDBKEY key2; - -// tBlockDataReset(&pCommitter->nBlockData); - -// // load last and merge until {pCommitter->maxKey, INT64_MAX} -// code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, 0, NULL, NULL); -// if (code) goto _err; - -// iRow = 0; -// nRow = pCommitter->oBlockData.nRow; -// pRow = tsdbTbDataIterGet(pIter); - -// while (true) { -// if ((pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) && (iRow >= nRow)) { -// if (pCommitter->nBlockData.nRow > 0) { -// goto _write_block_data; -// } else { -// break; -// } -// } - -// // TODO - -// _write_block_data: -// block.last = pCommitter->nBlockData.nRow < pCommitter->minRow ? 1 : 0; -// code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->nBlockData, NULL, NULL, pBlockIdx, &block); -// if (code) goto _err; - -// code = tMapDataPutItem(&pCommitter->nBlockMap, &block, tPutBlock); -// if (code) goto _err; -// } - -// tBlockReset(&block); -// tBlockDataReset(&pCommitter->nBlockData); - -// return code; - -// _err: -// tsdbError("vgId:%d merge commit impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); -// return code; -// } - -// static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock, -// int8_t isLastBlock) { -// int32_t code = 0; -// TSDBROW *pRow; -// TSDBKEY key; -// int32_t c; - -// if (pBlock == NULL) { // (pIter && pBlock == NULL) -// key.ts = pCommitter->maxKey; -// key.version = INT64_MAX; -// code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 0); -// if (code) goto _err; -// } else if (pBlock->last) { -// // merge -// code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 0); -// if (code) goto _err; -// } else { // pBlock && pBlock->last == 0 && (pIter == NULL || pIter) -// // memory -// if (pIter) { -// key.ts = pBlock->info.minKey.ts; -// key.version = pBlock->info.minKey.version - 1; -// code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 1); -// if (code) goto _err; -// } - -// // merge or move block -// pRow = tsdbTbDataIterGet(pIter); -// key.ts = pRow->pTSRow->ts; -// key.version = pRow->version; - -// c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock); -// if (c > 0) { -// // move block -// code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock); -// if (code) goto _err; -// } else if (c == 0) { -// int32_t nOverlap = tsdbGetOverlapRowNumber(pIter, pBlock); - -// if (pBlock->nRow + nOverlap > pCommitter->maxRow || pBlock->nSubBlock == TSDB_MAX_SUBBLOCKS) { -// code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 1); -// if (code) goto _err; -// } else { -// // add as a subblock -// } -// } else { -// ASSERT(0); -// } -// } - -// return code; - -// _err: -// tsdbError("vgId:%d merge commit failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); -// return code; -// } - -// static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) { -// int32_t code = 0; -// STbDataIter iter; -// STbDataIter *pIter = &iter; -// TSDBROW *pRow; -// int64_t suid; -// int64_t uid; -// SBlockIdx blockIdx; - -// // create iter -// if (pTbData) { -// suid = pTbData->suid; -// uid = pTbData->uid; -// tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, pIter); -// } else { -// suid = pBlockIdx->suid; -// uid = pBlockIdx->uid; -// pIter = NULL; -// } - -// // check -// pRow = tsdbTbDataIterGet(pIter); -// if (ROW_END(pRow, pCommitter->maxKey) && pBlockIdx == NULL) goto _exit; - -// // start ================================ -// tMapDataReset(&pCommitter->oBlockMap); -// tBlockReset(&pCommitter->oBlock); -// tBlockDataReset(&pCommitter->oBlockData); -// if (pBlockIdx) { -// code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlockMap, NULL); -// if (code) goto _err; -// } - -// blockIdx = tBlockIdxInit(suid, uid); -// tMapDataReset(&pCommitter->nBlockMap); -// tBlockReset(&pCommitter->nBlock); -// tBlockDataReset(&pCommitter->nBlockData); - -// // impl =============================== -// int32_t iBlock = 0; -// int32_t nBlock = pCommitter->oBlockMap.nItem; - -// // merge -// pRow = tsdbTbDataIterGet(pIter); -// while (!ROW_END(pRow, pCommitter->maxKey) && iBlock < nBlock) { -// tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, &pCommitter->oBlock, tGetBlock); -// code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, &pCommitter->oBlock, iBlock == (nBlock - 1)); -// if (code) goto _err; - -// pRow = tsdbTbDataIterGet(pIter); -// iBlock++; -// } - -// // mem -// pRow = tsdbTbDataIterGet(pIter); -// while (!ROW_END(pRow, pCommitter->maxKey)) { -// code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, NULL, 0); -// if (code) goto _err; - -// pRow = tsdbTbDataIterGet(pIter); -// } - -// // disk -// while (iBlock < nBlock) { -// tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, &pCommitter->oBlock, tGetBlock); - -// code = tsdbMergeCommit(pCommitter, &blockIdx, NULL, &pCommitter->oBlock, 0); -// if (code) goto _err; - -// iBlock++; -// } - -// // end =============================== -// code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, &blockIdx); -// if (code) goto _err; - -// code = tMapDataPutItem(&pCommitter->nBlockIdxMap, &blockIdx, tPutBlockIdx); -// if (code) goto _err; - -// _exit: -// pRow = tsdbTbDataIterGet(pIter); -// if (pRow) { -// ASSERT(pRow->pTSRow->ts > pCommitter->maxKey); -// if (pCommitter->nextKey > pRow->pTSRow->ts) { -// pCommitter->nextKey = pRow->pTSRow->ts; -// } -// } - -// return code; - -// _err: -// tsdbError("vgId:%d commit Table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); -// return code; -// } - static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; @@ -816,6 +614,127 @@ _err: return code; } +static int32_t tsdbMergeBlockAndMem(SCommitter *pCommitter, STbDataIter *pIter, SBlockIdx *pBlockIdx, SBlock *pBlock, + TSDBKEY toKey /*not included*/, int8_t toDataOnly) { + int32_t code = 0; + SBlockData *pBlockDataFrom = &pCommitter->oBlockData; + SBlockData *pBlockDataTo = &pCommitter->nBlockData; + TSDBROW *pRow1; + TSDBROW *pRow2; + int32_t c = 0; + + // read SBlockData + code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, pBlockDataFrom, NULL, NULL); + if (code) goto _err; + + // loop to merge + tBlockDataReset(pBlockDataTo); + pRow1 = tsdbTbDataIterGet(pIter); + pRow2 = &tsdbRowFromBlockData(pBlockDataFrom, 0); + ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0); + ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0); + code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1)); + if (code) goto _err; + + while (true) { + if (pRow1 == NULL && pRow2 == NULL) { + if (pBlockDataTo->nRow == 0) { + break; + } else { + goto _write_block; + } + } + + if (pRow1 && pRow2) { + c = tsdbRowCmprFn(pRow1, pRow2); + if (c < 0) { + code = tBlockDataAppendRow(pBlockDataTo, pRow1, pCommitter->pTSchema); + if (code) goto _err; + + tsdbTbDataIterNext(pIter); + pRow1 = tsdbTbDataIterGet(pIter); + // TODO + } else if (c > 0) { + code = tBlockDataAppendRow(pBlockDataTo, pRow2, NULL); + if (code) goto _err; + + pRow2 = pRow2->iRow + 1 < pBlockDataFrom->nRow ? &tsdbRowFromBlockData(pBlockDataFrom, pRow2->iRow + 1) : NULL; + } else { + ASSERT(0); + } + } else if (pRow1) { + code = tBlockDataAppendRow(pBlockDataTo, pRow1, pCommitter->pTSchema); + tsdbTbDataIterNext(pIter); + pRow1 = tsdbTbDataIterGet(pIter); + // TODO + } else { + code = tBlockDataAppendRow(pBlockDataTo, pRow2, NULL); + if (code) goto _err; + + pRow2 = pRow2->iRow + 1 < pBlockDataFrom->nRow ? &tsdbRowFromBlockData(pBlockDataFrom, pRow2->iRow + 1) : NULL; + } + + if (pBlockDataTo->nRow >= pCommitter->maxRow * 4 / 5) { + goto _write_block; + } else { + continue; + } + + _write_block: + tBlockDataReset(pBlockDataTo); + // TODO + } + + return code; + +_err: + tsdbError("vgId:%d tsdb merge block and mem failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + return code; +} + +static int32_t tsdbCommitMemoryDataImpl(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) { + int32_t code = 0; + TSDBROW *pRow; + SBlockData *pBlockData = &pCommitter->nBlockData; + + pRow = tsdbTbDataIterGet(pIter); + tBlockDataReset(pBlockData); + while (true) { + if (pRow == NULL || tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) { + if (pBlockData->nRow > 0) { + goto _write_block; + } else { + break; + } + } + + code = tsdbCommitterUpdateSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow)); + if (code) goto _err; + + code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema); + if (code) goto _err; + + tsdbTbDataIterNext(pIter); + pRow = tsdbTbDataIterGet(pIter); + + if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block; + + _write_block: + tBlockDataReset(pBlockData); + } + + return code; + +_err: + return code; +} + +static int32_t tsdbCommitMoveDiskBlock(SCommitter *pCommitter, SBlock *pBlock) { + int32_t code = 0; + // TODO + return code; +} + static int32_t tsdbMergeMemDisk(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *oBlockIdx) { int32_t code = 0; STbDataIter *pIter = &(STbDataIter){0}; @@ -825,34 +744,106 @@ static int32_t tsdbMergeMemDisk(SCommitter *pCommitter, STbData *pTbData, SBlock tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter); pRow == tsdbTbDataIterGet(pIter); if (pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) { - return tsdbCommitDiskData(pCommitter, oBlockIdx); + code = tsdbCommitDiskData(pCommitter, oBlockIdx); + if (code) { + goto _err; + } else { + goto _exit; + } } + // start ================== // read code = tsdbReadBlock(pCommitter->pReader, oBlockIdx, &pCommitter->oBlockMap, NULL); if (code) goto _err; // loop to merge - SBlockData *pBlockData = &pCommitter->nBlockData; - int32_t iBlock = 0; - int32_t nBlock = pCommitter->oBlockMap.nItem; + // SBlockData *pBlockData = &pCommitter->nBlockData; + int32_t iBlock = 0; + int32_t nBlock = pCommitter->oBlockMap.nItem; + // SBlock *pBlockO = &pCommitter->oBlock; + SBlock *pBlock; + int32_t c; - tBlockDataReset(pBlockData); + // merge =================== while (true) { - if ((pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) && iBlock >= nBlock) break; + if ((pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) && pBlock == NULL) break; + + if ((pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey) && pBlock) { + if (pBlock->last) { + // merge memory data and disk data to write to .data/.last (todo) + code = tsdbMergeBlockAndMem(pCommitter, pIter, oBlockIdx, pBlock, + (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); + if (code) goto _err; + + pRow = tsdbTbDataIterGet(pIter); + iBlock++; + } else { + c = tBlockCmprFn(&(SBlock){}, pBlock); + + if (c < 0) { + // commit memory data until pBlock->minKey (not included) only to .data file (todo) + code = tsdbCommitMemoryDataImpl(pCommitter, pIter, pBlock->minKey, 1); + if (code) goto _err; + + pRow = tsdbTbDataIterGet(pIter); + } else if (c > 0) { + // just move the block (todo) + code = tsdbCommitMoveDiskBlock(pCommitter, pBlock); + if (code) goto _err; + + iBlock++; + // TODO + } else { + int64_t nOvlp = 0; // = tsdbOvlpRows(); + if (nOvlp + pBlock->nRow <= pCommitter->maxRow) { + // add as a subblock + } else { + if (iBlock == nBlock - 1) { + // merge memory data and disk data to .data/.last file + code = tsdbMergeBlockAndMem(pCommitter, pIter, oBlockIdx, pBlock, + (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); + if (code) goto _err; + } else { + // merge memory data and disk data to .data file only until pBlock[1]. + code = tsdbMergeBlockAndMem(pCommitter, pIter, oBlockIdx, pBlock, (TSDBKEY){0} /*TODO*/, 1); + } + } + + pRow = tsdbTbDataIterGet(pIter); + iBlock++; + } + } + } else if (pBlock) { + code = tsdbCommitMoveDiskBlock(pCommitter, pBlock); + if (code) goto _err; + + iBlock++; + // next block + } else { + // commit only memory data until (pCommitter->maxKey, VERSION_MAX) + code = tsdbCommitMemoryDataImpl(pCommitter, pIter, + (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); + if (code) goto _err; + + pRow = tsdbTbDataIterGet(pIter); + } } - while (iBlock < nBlock) { - /* code */ - } + // end ===================== + // // SBlock + // code = tsdbWriteBlock(pCommitter->pWriter, mBlock, NULL, pBlockIdx); + // if (code) goto _err; - // - while (pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey) { - /* code */ - } + // // SBlockIdx + // code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx); + // if (code) goto _err; _exit: - if (pRow) pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow)); + pRow = tsdbTbDataIterGet(pIter); + if (pRow) { + pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow)); + } return code; _err: diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 54a90f8184..e90338538d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -634,6 +634,10 @@ int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow) { return n; } +int32_t tsdbRowCmprFn(const void *p1, const void *p2) { + return tsdbKeyCmprFn(&TSDBROW_KEY((TSDBROW *)p1), &TSDBROW_KEY((TSDBROW *)p2)); +} + // SRowIter ====================================================== void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) { pIter->pRow = pRow;