From 44c473303f7e68ae47d39d040bb925ed100c5d71 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 27 Jun 2022 15:42:55 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/inc/tsdb.h | 4 + source/dnode/vnode/src/tsdb/tsdbCommit.c | 345 +++++++++++------- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 3 + 3 files changed, 221 insertions(+), 131 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 925fb2633c..3844bc02cb 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -104,6 +104,8 @@ int32_t tRowMergerGetRow(SRowMerger *pMerger, STSRow **ppRow); int32_t tTABLEIDCmprFn(const void *p1, const void *p2); // TSDBKEY int32_t tsdbKeyCmprFn(const void *p1, const void *p2); +#define MIN_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) < 0) ? (KEY1) : (KEY2)) +#define MAX_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) > 0) ? (KEY1) : (KEY2)) // KEYINFO #define tKEYINFOInit() \ ((KEYINFO){.maxKey = {.ts = TSKEY_MIN, .version = -1}, \ @@ -139,6 +141,8 @@ int32_t tColDataPCmprFn(const void *p1, const void *p2); // SBlockData #define tBlockDataFirstRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, 0) #define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1) +#define tBlockDataFirstKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataFirstRow(PBLOCKDATA)) +#define tBlockDataLastKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataLastRow(PBLOCKDATA)) int32_t tBlockDataInit(SBlockData *pBlockData); void tBlockDataReset(SBlockData *pBlockData); void tBlockDataClear(SBlockData *pBlockData); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 855b43dea7..7828a3ca15 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -614,31 +614,39 @@ _err: return code; } -static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlockIdx *pBlockIdx, SBlock *pBlock, - TSDBKEY toKey /*not included*/, int8_t toDataOnly) { +static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlockMerge, TSDBKEY toKey, + int8_t toDataOnly) { int32_t code = 0; - SBlockData *pBlockDataFrom = &pCommitter->oBlockData; - SBlockData *pBlockDataTo = &pCommitter->nBlockData; + SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid}; + SBlockData *pBlockDataMerge = &pCommitter->oBlockData; + SBlockData *pBlockData = &pCommitter->nBlockData; + SBlock *pBlock = &pCommitter->nBlock; TSDBROW *pRow1; - TSDBROW *pRow2; + TSDBROW row2; + TSDBROW *pRow2 = &row2; + TSDBROW row; + TSDBROW *pRow = &row; int32_t c = 0; + TSKEY lastKey; // read SBlockData - code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, pBlockDataFrom, NULL, NULL); + code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL); if (code) goto _err; // loop to merge - tBlockDataReset(pBlockDataTo); pRow1 = tsdbTbDataIterGet(pIter); - pRow2 = &tsdbRowFromBlockData(pBlockDataFrom, 0); + *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0); ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0); ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0); code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1)); if (code) goto _err; + lastKey = TSKEY_MIN; + tBlockReset(pBlock); + tBlockDataReset(pBlockData); while (true) { if (pRow1 == NULL && pRow2 == NULL) { - if (pBlockDataTo->nRow == 0) { + if (pBlockData->nRow == 0) { break; } else { goto _write_block; @@ -646,43 +654,85 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB } if (pRow1 && pRow2) { - c = tsdbRowCmprFn(pRow1, pRow2); - if (c < 0) { - code = tBlockDataAppendRow(pBlockDataTo, pRow1, pCommitter->pTSchema); - if (code) goto _err; + if (tsdbRowCmprFn(pRow1, pRow2) < 0) { + *pRow = *pRow1; tsdbTbDataIterNext(pIter); pRow1 = tsdbTbDataIterGet(pIter); - // TODO - } else if (c > 0) { - code = tBlockDataAppendRow(pBlockDataTo, pRow2, NULL); - if (code) goto _err; - pRow2 = pRow2->iRow + 1 < pBlockDataFrom->nRow ? &tsdbRowFromBlockData(pBlockDataFrom, pRow2->iRow + 1) : NULL; + if (pRow1) { + if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) { + code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_VERSION(pRow1)); + if (code) goto _err; + } else { + pRow1 = NULL; + } + } + } else if (tsdbRowCmprFn(pRow1, pRow2) < 0) { + *pRow = *pRow2; + + if (pRow2->iRow + 1 < pBlockDataMerge->nRow) { + *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1); + } else { + pRow2 = NULL; + } } else { ASSERT(0); } } else if (pRow1) { - code = tBlockDataAppendRow(pBlockDataTo, pRow1, pCommitter->pTSchema); + *pRow = *pRow1; + tsdbTbDataIterNext(pIter); pRow1 = tsdbTbDataIterGet(pIter); - // TODO + if (pRow1) { + if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) { + code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_VERSION(pRow1)); + if (code) goto _err; + } else { + pRow1 = NULL; + } + } } else { - code = tBlockDataAppendRow(pBlockDataTo, pRow2, NULL); - if (code) goto _err; + *pRow = *pRow2; - pRow2 = pRow2->iRow + 1 < pBlockDataFrom->nRow ? &tsdbRowFromBlockData(pBlockDataFrom, pRow2->iRow + 1) : NULL; + if (pRow2->iRow + 1 < pBlockDataMerge->nRow) { + *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1); + } else { + pRow2 = NULL; + } } - if (pBlockDataTo->nRow >= pCommitter->maxRow * 4 / 5) { - goto _write_block; + code = tBlockDataAppendRow(pBlockData, &row, pCommitter->pTSchema); + if (code) goto _err; + + pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow)); + pBlock->maxVersion = TMAX(pBlock->maxVersion, TSDBROW_VERSION(pRow)); + pBlock->nRow++; + if (lastKey == TSDBROW_TS(pRow)) { + pBlock->hasDup = 1; } else { - continue; + lastKey = TSDBROW_TS(pRow); } + if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block; + continue; + _write_block: - tBlockDataReset(pBlockDataTo); - // TODO + if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) { + pBlock->last = 1; + } else { + pBlock->last = 0; + } + + code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg); + if (code) goto _err; + + code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock); + if (code) goto _err; + + lastKey = TSKEY_MIN; + tBlockReset(pBlock); + tBlockDataReset(pBlockData); } return code; @@ -695,10 +745,15 @@ _err: static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey, int8_t toDataOnly) { int32_t code = 0; TSDBROW *pRow; + SBlock *pBlock = &pCommitter->nBlock; SBlockData *pBlockData = &pCommitter->nBlockData; + TSKEY lastKey = TSKEY_MIN; + int64_t suid = pIter->pTbData->suid; + int64_t uid = pIter->pTbData->uid; - pRow = tsdbTbDataIterGet(pIter); + tBlockReset(pBlock); tBlockDataReset(pBlockData); + pRow = tsdbTbDataIterGet(pIter); while (true) { if (pRow == NULL || tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) { if (pBlockData->nRow > 0) { @@ -708,24 +763,53 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter } } + // update schema code = tsdbCommitterUpdateSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow)); if (code) goto _err; + // append code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema); if (code) goto _err; + // update + pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow)); + pBlock->maxVersion = TMIN(pBlock->maxVersion, TSDBROW_VERSION(pRow)); + pBlock->nRow++; + if (TSDBROW_TS(pRow) == lastKey) { + pBlock->hasDup = 1; + } else { + lastKey = TSDBROW_TS(pRow); + } + tsdbTbDataIterNext(pIter); pRow = tsdbTbDataIterGet(pIter); if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block; + continue; _write_block: + if (!toDataOnly && pBlockData->nRow < pCommitter->minKey) { + pBlock->last = 1; + } else { + pBlock->last = 0; + } + + code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, &(SBlockIdx){.suid = suid, .uid = uid}, + pBlock, pCommitter->cmprAlg); + if (code) goto _err; + + code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock); + if (code) goto _err; + + tBlockReset(pBlock); tBlockDataReset(pBlockData); + lastKey = TSKEY_MIN; } return code; _err: + tsdbError("vgId:%d tsdb commit table mem data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); return code; } @@ -762,119 +846,120 @@ _err: } static int32_t tsdbMergeMemDisk(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *oBlockIdx) { - int32_t code = 0; - STbDataIter *pIter = &(STbDataIter){0}; - TSDBROW *pRow; + int32_t code = 0; + // STbDataIter *pIter = &(STbDataIter){0}; + // TSDBROW *pRow; - // create iter - tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter); - pRow == tsdbTbDataIterGet(pIter); - if (pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) { - code = tsdbCommitDiskData(pCommitter, oBlockIdx); - if (code) { - goto _err; - } else { - goto _exit; - } - } + // // create iter + // tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter); + // pRow == tsdbTbDataIterGet(pIter); + // if (pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) { + // code = tsdbCommitDiskData(pCommitter, oBlockIdx); + // if (code) { + // goto _err; + // } else { + // goto _exit; + // } + // } - // start ================== - // read - code = tsdbReadBlock(pCommitter->pReader, oBlockIdx, &pCommitter->oBlockMap, NULL); - if (code) goto _err; + // // start ================== + // // read + // code = tsdbReadBlock(pCommitter->pReader, oBlockIdx, &pCommitter->oBlockMap, NULL); + // if (code) goto _err; - // loop to merge - // SBlockData *pBlockData = &pCommitter->nBlockData; - int32_t iBlock = 0; - int32_t nBlock = pCommitter->oBlockMap.nItem; - // SBlock *pBlockO = &pCommitter->oBlock; - SBlock *pBlock; - int32_t c; + // // loop to merge + // // SBlockData *pBlockData = &pCommitter->nBlockData; + // int32_t iBlock = 0; + // int32_t nBlock = pCommitter->oBlockMap.nItem; + // // SBlock *pBlockO = &pCommitter->oBlock; + // SBlock *pBlock; + // int32_t c; - // merge =================== - while (true) { - if ((pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) && pBlock == NULL) break; + // // merge =================== + // while (true) { + // if ((pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) && pBlock == NULL) break; - if ((pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey) && pBlock) { - if (pBlock->last) { - // merge memory data and disk data to write to .data/.last (todo) - code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock, - (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); - if (code) goto _err; + // if ((pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey) && pBlock) { + // if (pBlock->last) { + // // merge memory data and disk data to write to .data/.last (todo) + // code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock, + // (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); + // if (code) goto _err; - pRow = tsdbTbDataIterGet(pIter); - iBlock++; - } else { - c = tBlockCmprFn(&(SBlock){}, pBlock); + // pRow = tsdbTbDataIterGet(pIter); + // iBlock++; + // } else { + // c = tBlockCmprFn(&(SBlock){}, pBlock); - if (c < 0) { - // commit memory data until pBlock->minKey (not included) only to .data file (todo) - code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1); - if (code) goto _err; + // if (c < 0) { + // // commit memory data until pBlock->minKey (not included) only to .data file (todo) + // code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1); + // if (code) goto _err; - pRow = tsdbTbDataIterGet(pIter); - } else if (c > 0) { - // just move the block (todo) - // code = tsdbCommitTableDiskData(pCommitter, pBlock); - if (code) goto _err; + // pRow = tsdbTbDataIterGet(pIter); + // } else if (c > 0) { + // // just move the block (todo) + // // code = tsdbCommitTableDiskData(pCommitter, pBlock); + // if (code) goto _err; - iBlock++; - // TODO - } else { - int64_t nOvlp = 0; // = tsdbOvlpRows(); - if (nOvlp + pBlock->nRow <= pCommitter->maxRow) { - // add as a subblock - } else { - if (iBlock == nBlock - 1) { - // merge memory data and disk data to .data/.last file - code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock, - (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); - if (code) goto _err; - } else { - // merge memory data and disk data to .data file only until pBlock[1]. - code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock, (TSDBKEY){0} /*TODO*/, 1); - } - } + // iBlock++; + // // TODO + // } else { + // int64_t nOvlp = 0; // = tsdbOvlpRows(); + // if (nOvlp + pBlock->nRow <= pCommitter->maxRow) { + // // add as a subblock + // } else { + // if (iBlock == nBlock - 1) { + // // merge memory data and disk data to .data/.last file + // code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock, + // (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); + // if (code) goto _err; + // } else { + // // merge memory data and disk data to .data file only until pBlock[1]. + // code = tsdbMergeTableData(pCommitter, pIter, oBlockIdx, pBlock, (TSDBKEY){0} /*TODO*/, 1); + // } + // } - pRow = tsdbTbDataIterGet(pIter); - iBlock++; - } - } - } else if (pBlock) { - // code = tsdbCommitTableDiskData(pCommitter, pBlock); - if (code) goto _err; + // pRow = tsdbTbDataIterGet(pIter); + // iBlock++; + // } + // } + // } else if (pBlock) { + // // code = tsdbCommitTableDiskData(pCommitter, pBlock); + // if (code) goto _err; - iBlock++; - // next block - } else { - // commit only memory data until (pCommitter->maxKey, VERSION_MAX) - code = - tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); - if (code) goto _err; + // iBlock++; + // // next block + // } else { + // // commit only memory data until (pCommitter->maxKey, VERSION_MAX) + // code = + // tsdbCommitTableMemData(pCommitter, pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = + // VERSION_MIN}, 0); + // if (code) goto _err; - pRow = tsdbTbDataIterGet(pIter); - } - } + // pRow = tsdbTbDataIterGet(pIter); + // } + // } - // end ===================== - // SBlock - // code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx); - // if (code) goto _err; + // // end ===================== + // // SBlock + // // code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx); + // // if (code) goto _err; - // // SBlockIdx - // code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx); - // if (code) goto _err; + // // // SBlockIdx + // // code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx); + // // if (code) goto _err; -_exit: - pRow = tsdbTbDataIterGet(pIter); - if (pRow) { - pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow)); - } + // _exit: + // pRow = tsdbTbDataIterGet(pIter); + // if (pRow) { + // pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow)); + // } return code; -_err: - tsdbError("vgId:%d tsdb merge mem disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); - return code; + // _err: + // tsdbError("vgId:%d tsdb merge mem disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), + // tstrerror(code)); return code; } static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) { @@ -930,7 +1015,6 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl // start =========== tMapDataReset(&pCommitter->nBlockMap); SBlock *pBlock = &pCommitter->oBlock; - int32_t c; if (iBlock < nBlock) { tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock); @@ -944,19 +1028,18 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl if (pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey && pBlock) { if (pBlock->last) { - code = tsdbMergeTableData(pCommitter, pIter, pBlockIdx, pBlock, + code = tsdbMergeTableData(pCommitter, pIter, pBlock, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); if (code) goto _err; pRow = tsdbTbDataIterGet(pIter); iBlock++; } else { - c = tBlockCmprFn(&(SBlock){}, pBlock); + int32_t c = tBlockCmprFn(&(SBlock){.maxKey = TSDBROW_KEY(pRow), .minKey = TSDBROW_KEY(pRow)}, pBlock); if (c > 0) { code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx); if (code) goto _err; - iBlock++; iBlock++; if (iBlock < nBlock) { tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock); @@ -974,12 +1057,12 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl // add as a subblock } else { if (iBlock == nBlock - 1) { - code = tsdbMergeTableData(pCommitter, pIter, pBlockIdx, pBlock, + code = tsdbMergeTableData(pCommitter, pIter, pBlock, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); if (code) goto _err; } else { - // code = tsdbMergeTableData(pCommitter, pIter, pBlockIdx, pBlock, pBlock[1].minKey, 1); + // code = tsdbMergeTableData(pCommitter, pIter, pBlock, pBlock[1].minKey, 1); if (code) goto _err; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 0cfb18f981..3a3d235b4e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -1193,6 +1193,9 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ if (!ppBuf1) ppBuf1 = &pBuf1; if (!ppBuf2) ppBuf2 = &pBuf2; + pBlock->minKey = MIN_TSDBKEY(pBlock->minKey, tBlockDataFirstKey(pBlockData)); + pBlock->maxKey = MAX_TSDBKEY(pBlock->maxKey, tBlockDataLastKey(pBlockData)); + pSubBlock->nRow = pBlockData->nRow; pSubBlock->cmprAlg = cmprAlg; if (pBlock->last) {