diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 3844bc02cb..140735a947 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -121,6 +121,7 @@ int32_t tGetBlockCol(uint8_t *p, void *ph); #define tBlockInit() ((SBlock){0}) void tBlockReset(SBlock *pBlock); void tBlockClear(SBlock *pBlock); +int32_t tBlockCopy(SBlock *pBlockSrc, SBlock *pBlockDest); int32_t tPutBlock(uint8_t *p, void *ph); int32_t tGetBlock(uint8_t *p, void *ph); int32_t tBlockCmprFn(const void *p1, const void *p2); @@ -164,6 +165,7 @@ void tsdbFree(uint8_t *pBuf); #define tMapDataInit() ((SMapData){0}) void tMapDataReset(SMapData *pMapData); void tMapDataClear(SMapData *pMapData); +int32_t tMapDataCopy(SMapData *pMapDataSrc, SMapData *pMapDataDest); int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)); int32_t tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)); int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *), diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 65dc2203c2..386b93309d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -342,33 +342,6 @@ _err: return code; } -// static int32_t tsdbGetOverlapRowNumber(STbDataIter *pIter, SBlock *pBlock) { -// int32_t nRow = 0; -// TSDBROW *pRow; -// TSDBKEY key; -// int32_t c = 0; -// STbDataIter iter = *pIter; - -// iter.pRow = NULL; -// while (true) { -// pRow = tsdbTbDataIterGet(pIter); - -// if (pRow == NULL) break; -// key = tsdbRowKey(pRow); - -// c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock); -// if (c == 0) { -// nRow++; -// } else if (c > 0) { -// break; -// } else { -// ASSERT(0); -// } -// } - -// return nRow; -// } - static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; @@ -981,6 +954,82 @@ _err: return code; } +static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) { + int32_t nRow = 0; + TSDBROW *pRow; + TSDBKEY key; + int32_t c = 0; + STbDataIter iter = *pIter; + + iter.pRow = NULL; + while (true) { + pRow = tsdbTbDataIterGet(pIter); + + if (pRow == NULL) break; + key = TSDBROW_KEY(pRow); + + c = tBlockCmprFn(&(SBlock){.maxKey = key, .minKey = key}, pBlock); + if (c == 0) { + nRow++; + } else if (c > 0) { + break; + } else { + ASSERT(0); + } + } + + return nRow; +} + +static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) { + int32_t code = 0; + SBlockData *pBlockData = &pCommitter->nBlockData; + SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid}; + TSDBROW *pRow; + + tBlockDataReset(pBlockData); + pRow = tsdbTbDataIterGet(pIter); + code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow)); + if (code) goto _err; + while (true) { + if (pRow) break; + code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema); + if (code) goto _err; + + tsdbTbDataIterNext(pIter); + pRow = tsdbTbDataIterGet(pIter); + if (pRow) { + int32_t c = tBlockCmprFn(&(SBlock){}, pBlock); + + if (c == 0) { + code = tsdbCommitterUpdateSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow)); + if (code) goto _err; + } else if (c > 0) { + pRow = NULL; + } else { + ASSERT(0); + } + } + } + + // write as a subblock + code = tBlockCopy(pBlock, &pCommitter->nBlock); + if (code) goto _err; + + code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, &pCommitter->nBlock, + pCommitter->cmprAlg); + if (code) goto _err; + + code = tMapDataPutItem(&pCommitter->nBlockMap, &pCommitter->nBlock, tPutBlock); + if (code) goto _err; + + return code; + +_err: + tsdbError("vgId:%d tsdb merge as subblock failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + return code; +} + static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) { int32_t code = 0; STbDataIter *pIter = &(STbDataIter){0}; @@ -1070,19 +1119,26 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL; } else { // merge memory and disk - int64_t nOvlp = 0; // (todo) + int32_t nOvlp = tsdbGetOvlpNRow(pIter, pBlock); + ASSERT(nOvlp); if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) { - // add as a subblock + code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock); + if (code) goto _err; } else { - if (iBlock == nBlock - 1) { - code = tsdbMergeTableData(pCommitter, pIter, pBlock, - (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); + TSDBKEY toKey = {.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}; + int8_t toDataOnly = 0; - if (code) goto _err; - } else { - // code = tsdbMergeTableData(pCommitter, pIter, pBlock, pBlock[1].minKey, 1); - if (code) goto _err; + if (iBlock < nBlock - 1) { + toDataOnly = 1; + + SBlock nextBlock = {0}; + tBlockReset(&nextBlock); + tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock + 1, &nextBlock, tGetBlock); + toKey = nextBlock.minKey; } + + code = tsdbMergeTableData(pCommitter, pIter, pBlock, toKey, toDataOnly); + if (code) goto _err; } pRow = tsdbTbDataIterGet(pIter); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index c529a04280..4d47e10f44 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -35,6 +35,39 @@ void tMapDataClear(SMapData *pMapData) { } } +int32_t tMapDataCopy(SMapData *pMapDataSrc, SMapData *pMapDataDest) { + int32_t code = 0; + int32_t size; + + pMapDataDest->nItem = pMapDataSrc->nItem; + pMapDataDest->flag = pMapDataSrc->flag; + + switch (pMapDataDest->flag) { + case TSDB_OFFSET_I32: + size = sizeof(int32_t) * pMapDataDest->nItem; + break; + case TSDB_OFFSET_I16: + size = sizeof(int16_t) * pMapDataDest->nItem; + break; + case TSDB_OFFSET_I8: + size = sizeof(int8_t) * pMapDataDest->nItem; + break; + default: + ASSERT(0); + } + code = tsdbRealloc(&pMapDataDest->pOfst, size); + if (code) goto _exit; + memcpy(pMapDataDest->pOfst, pMapDataSrc->pOfst, size); + + pMapDataDest->nData = pMapDataSrc->nData; + code = tsdbRealloc(&pMapDataDest->pData, pMapDataDest->nData); + if (code) goto _exit; + memcpy(pMapDataDest->pData, pMapDataSrc->pData, pMapDataDest->nData); + +_exit: + return code; +} + int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) { int32_t code = 0; int32_t offset = pMapData->nData; @@ -369,6 +402,32 @@ void tBlockClear(SBlock *pBlock) { } } +int32_t tBlockCopy(SBlock *pBlockSrc, SBlock *pBlockDest) { + int32_t code = 0; + + pBlockDest->minKey = pBlockSrc->minKey; + pBlockDest->maxKey = pBlockSrc->maxKey; + pBlockDest->minVersion = pBlockSrc->minVersion; + pBlockDest->maxVersion = pBlockSrc->maxVersion; + pBlockDest->nRow = pBlockSrc->nRow; + pBlockDest->last = pBlockSrc->last; + pBlockDest->hasDup = pBlockSrc->hasDup; + pBlockDest->nSubBlock = pBlockSrc->nSubBlock; + for (int32_t iSubBlock = 0; iSubBlock < pBlockSrc->nSubBlock; iSubBlock++) { + pBlockDest->aSubBlock[iSubBlock].nRow = pBlockSrc->aSubBlock[iSubBlock].nRow; + pBlockDest->aSubBlock[iSubBlock].cmprAlg = pBlockSrc->aSubBlock[iSubBlock].cmprAlg; + pBlockDest->aSubBlock[iSubBlock].offset = pBlockSrc->aSubBlock[iSubBlock].offset; + pBlockDest->aSubBlock[iSubBlock].vsize = pBlockSrc->aSubBlock[iSubBlock].vsize; + pBlockDest->aSubBlock[iSubBlock].ksize = pBlockSrc->aSubBlock[iSubBlock].ksize; + pBlockDest->aSubBlock[iSubBlock].bsize = pBlockSrc->aSubBlock[iSubBlock].bsize; + code = tMapDataCopy(&pBlockSrc->aSubBlock[iSubBlock].mBlockCol, &pBlockDest->aSubBlock[iSubBlock].mBlockCol); + if (code) goto _exit; + } + +_exit: + return code; +} + int32_t tPutBlock(uint8_t *p, void *ph) { int32_t n = 0; SBlock *pBlock = (SBlock *)ph;