From 40dd9564f2fd60a47b4008ec800c1de9ef0f3980 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 29 Jun 2022 03:34:55 +0000 Subject: [PATCH 1/2] more work --- source/dnode/vnode/src/inc/tsdb.h | 1 - source/dnode/vnode/src/tsdb/tsdbCommit.c | 113 ++++++------------ source/dnode/vnode/src/tsdb/tsdbFS.c | 3 +- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 21 +++- source/dnode/vnode/src/tsdb/tsdbUtil.c | 2 +- 5 files changed, 61 insertions(+), 79 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 9c32b4fccb..71f31494fc 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -138,7 +138,6 @@ void tColDataClear(void *ph); int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal); int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest); int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal); -int32_t tColDataPCmprFn(const void *p1, const void *p2); // SBlockData #define tBlockDataFirstRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, 0) #define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 5fc36cf9be..2e6773e8e0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -415,10 +415,6 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB TSDBROW *pRow1; TSDBROW row2; TSDBROW *pRow2 = &row2; - TSDBROW row; - TSDBROW *pRow = &row; - int32_t c = 0; - TSKEY lastKey; // read SBlockData code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL); @@ -432,7 +428,6 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1)); if (code) goto _err; - lastKey = TSKEY_MIN; tBlockReset(pBlock); tBlockDataReset(pBlockData); while (true) { @@ -445,68 +440,56 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB } if (pRow1 && pRow2) { - if (tsdbRowCmprFn(pRow1, pRow2) < 0) { - *pRow = *pRow1; - - tsdbTbDataIterNext(pIter); - pRow1 = tsdbTbDataIterGet(pIter); - - if (pRow1) { - if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) { - code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_VERSION(pRow1)); - if (code) goto _err; - } else { - pRow1 = NULL; - } - } - } else if (tsdbRowCmprFn(pRow1, pRow2) > 0) { - *pRow = *pRow2; - - if (pRow2->iRow + 1 < pBlockDataMerge->nRow) { - *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1); - } else { - pRow2 = NULL; - } + int32_t c = tsdbRowCmprFn(pRow1, pRow2); + if (c < 0) { + goto _append_mem_row; + } else if (c > 0) { + goto _append_block_row; } else { ASSERT(0); } } else if (pRow1) { - *pRow = *pRow1; - - tsdbTbDataIterNext(pIter); - pRow1 = tsdbTbDataIterGet(pIter); - if (pRow1) { - if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) { - code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_VERSION(pRow1)); - if (code) goto _err; - } else { - pRow1 = NULL; - } - } + goto _append_mem_row; } else { - *pRow = *pRow2; - - if (pRow2->iRow + 1 < pBlockDataMerge->nRow) { - *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1); - } else { - pRow2 = NULL; - } + goto _append_block_row; } - code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema); + _append_mem_row: + code = tBlockDataAppendRow(pBlockData, pRow1, pCommitter->pTSchema); if (code) goto _err; - pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow)); - pBlock->maxVersion = TMAX(pBlock->maxVersion, TSDBROW_VERSION(pRow)); - pBlock->nRow++; - if (lastKey == TSDBROW_TS(pRow)) { - pBlock->hasDup = 1; - } else { - lastKey = TSDBROW_TS(pRow); + tsdbTbDataIterNext(pIter); + pRow1 = tsdbTbDataIterGet(pIter); + if (pRow1) { + if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) { + code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1)); + if (code) goto _err; + } else { + pRow1 = NULL; + } } - if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block; - continue; + if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { + goto _write_block; + } else { + continue; + } + + _append_block_row: + code = tBlockDataAppendRow(pBlockData, pRow1, NULL); + if (code) goto _err; + + if (pRow2->iRow + 1 < pBlockDataMerge->nRow) { + *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1); + } else { + pRow2 = NULL; + } + + if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { + goto _write_block; + } else { + continue; + } _write_block: if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) { @@ -521,7 +504,6 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock); if (code) goto _err; - lastKey = TSKEY_MIN; tBlockReset(pBlock); tBlockDataReset(pBlockData); } @@ -538,7 +520,6 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter TSDBROW *pRow; SBlock *pBlock = &pCommitter->nBlock; SBlockData *pBlockData = &pCommitter->nBlockData; - TSKEY lastKey = TSKEY_MIN; int64_t suid = pIter->pTbData->suid; int64_t uid = pIter->pTbData->uid; @@ -563,16 +544,6 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema); if (code) goto _err; - // update - pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow)); - pBlock->maxVersion = TMIN(pBlock->maxVersion, TSDBROW_VERSION(pRow)); - pBlock->nRow++; - if (TSDBROW_TS(pRow) == lastKey) { - pBlock->hasDup = 1; - } else { - lastKey = TSDBROW_TS(pRow); - } - tsdbTbDataIterNext(pIter); pRow = tsdbTbDataIterGet(pIter); if (pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) pRow = NULL; @@ -596,7 +567,6 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter tBlockReset(pBlock); tBlockDataReset(pBlockData); - lastKey = TSKEY_MIN; } return code; @@ -614,12 +584,7 @@ static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, S if (code) goto _err; tBlockReset(&pCommitter->nBlock); - pCommitter->nBlock.minKey = pBlock->minKey; - pCommitter->nBlock.maxKey = pBlock->maxKey; - pCommitter->nBlock.minVersion = pBlock->minVersion; - pCommitter->nBlock.nRow = pBlock->nRow; pCommitter->nBlock.last = pBlock->last; - pCommitter->nBlock.hasDup = pBlock->hasDup; code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->oBlockData, NULL, NULL, pBlockIdx, &pCommitter->nBlock, pCommitter->cmprAlg); if (code) goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index c2af9c53e4..d498fa71ab 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -176,7 +176,8 @@ static int32_t tsdbApplyDFileSetChange(STsdbFS *pFS, SDFileSet *pFrom, SDFileSet if (pFrom && pTo) { // head if (tsdbFileIsSame(pFrom, pTo, TSDB_HEAD_FILE)) { - ASSERT(0); + ASSERT(pFrom->fHead.size == pTo->fHead.size); + ASSERT(pFrom->fHead.offset == pTo->fHead.offset); } else { tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_HEAD_FILE, fname); taosRemoveFile(fname); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 07e4e0b8ba..3eb00f2eb3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -1220,8 +1220,25 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ if (!ppBuf1) ppBuf1 = &pBuf1; if (!ppBuf2) ppBuf2 = &pBuf2; - pBlock->minKey = MIN_TSDBKEY(pBlock->minKey, tBlockDataFirstKey(pBlockData)); - pBlock->maxKey = MAX_TSDBKEY(pBlock->maxKey, tBlockDataLastKey(pBlockData)); + TSKEY lastKey = TSKEY_MIN; + for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { + TSDBKEY key = TSDBROW_KEY(&tsdbRowFromBlockData(pBlockData, iRow)); + if (iRow == 0) { + pBlock->minKey = MIN_TSDBKEY(pBlock->minKey, key); + } + + if (iRow == pBlockData->nRow - 1) { + pBlock->maxKey = MAX_TSDBKEY(pBlock->maxKey, key); + } + + pBlock->minVersion = TMIN(pBlock->minVersion, key.version); + pBlock->maxVersion = TMAX(pBlock->maxVersion, key.version); + if (key.ts == lastKey) { + pBlock->hasDup = 1; + } + lastKey = key.ts; + } + pBlock->nRow += pBlockData->nRow; pSubBlock->nRow = pBlockData->nRow; pSubBlock->cmprAlg = cmprAlg; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index fa250831bf..58397b1a9b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1085,7 +1085,7 @@ _exit: return code; } -int32_t tColDataPCmprFn(const void *p1, const void *p2) { +static FORCE_INLINE int32_t tColDataPCmprFn(const void *p1, const void *p2) { SColData *pColData1 = *(SColData **)p1; SColData *pColData2 = *(SColData **)p2; From 1d012a47e51a7c80556343a0edafcd1f18493a99 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 29 Jun 2022 03:43:33 +0000 Subject: [PATCH 2/2] more work --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 2 +- source/dnode/vnode/src/tsdb/tsdbReaderWriter.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 2e6773e8e0..6f4dacdf41 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -476,7 +476,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB } _append_block_row: - code = tBlockDataAppendRow(pBlockData, pRow1, NULL); + code = tBlockDataAppendRow(pBlockData, pRow2, NULL); if (code) goto _err; if (pRow2->iRow + 1 < pBlockDataMerge->nRow) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 3eb00f2eb3..a8a5cb076e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -107,7 +107,7 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, uint8_t **ppBuf *(SBlockDataHdr *)(*ppBuf) = hdr; n += sizeof(hdr); for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) { - size += tPutDelData(*ppBuf + n, taosArrayGet(aDelData, iDelData)); + n += tPutDelData(*ppBuf + n, taosArrayGet(aDelData, iDelData)); } taosCalcChecksumAppend(0, *ppBuf, size);