diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 3844bc02cb..140735a947 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -121,6 +121,7 @@ int32_t tGetBlockCol(uint8_t *p, void *ph); #define tBlockInit() ((SBlock){0}) void tBlockReset(SBlock *pBlock); void tBlockClear(SBlock *pBlock); +int32_t tBlockCopy(SBlock *pBlockSrc, SBlock *pBlockDest); int32_t tPutBlock(uint8_t *p, void *ph); int32_t tGetBlock(uint8_t *p, void *ph); int32_t tBlockCmprFn(const void *p1, const void *p2); @@ -164,6 +165,7 @@ void tsdbFree(uint8_t *pBuf); #define tMapDataInit() ((SMapData){0}) void tMapDataReset(SMapData *pMapData); void tMapDataClear(SMapData *pMapData); +int32_t tMapDataCopy(SMapData *pMapDataSrc, SMapData *pMapDataDest); int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)); int32_t tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)); int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *), diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index fb80d1487c..386b93309d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -342,33 +342,6 @@ _err: return code; } -// static int32_t tsdbGetOverlapRowNumber(STbDataIter *pIter, SBlock *pBlock) { -// int32_t nRow = 0; -// TSDBROW *pRow; -// TSDBKEY key; -// int32_t c = 0; -// STbDataIter iter = *pIter; - -// iter.pRow = NULL; -// while (true) { -// pRow = tsdbTbDataIterGet(pIter); - -// if (pRow == NULL) break; -// key = tsdbRowKey(pRow); - -// c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock); -// if (c == 0) { -// nRow++; -// } else if (c > 0) { -// break; -// } else { -// ASSERT(0); -// } -// } - -// return nRow; -// } - static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; @@ -636,7 +609,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB // loop to merge pRow1 = tsdbTbDataIterGet(pIter); *pRow2 = tsdbRowFromBlockData(pBlockDataMerge, 0); - ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0); + ASSERT(pRow1 && tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0); ASSERT(tsdbKeyCmprFn(&TSDBROW_KEY(pRow2), &toKey) < 0); code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1)); if (code) goto _err; @@ -668,7 +641,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB pRow1 = NULL; } } - } else if (tsdbRowCmprFn(pRow1, pRow2) < 0) { + } else if (tsdbRowCmprFn(pRow1, pRow2) > 0) { *pRow = *pRow2; if (pRow2->iRow + 1 < pBlockDataMerge->nRow) { @@ -702,7 +675,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB } } - code = tBlockDataAppendRow(pBlockData, &row, pCommitter->pTSchema); + code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema); if (code) goto _err; pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow)); @@ -754,8 +727,9 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter tBlockReset(pBlock); tBlockDataReset(pBlockData); pRow = tsdbTbDataIterGet(pIter); + ASSERT(pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) < 0); while (true) { - if (pRow == NULL || tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) { + if (pRow == NULL) { if (pBlockData->nRow > 0) { goto _write_block; } else { @@ -783,6 +757,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter tsdbTbDataIterNext(pIter); pRow = tsdbTbDataIterGet(pIter); + if (pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) pRow = NULL; if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block; continue; @@ -979,11 +954,87 @@ _err: return code; } +static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) { + int32_t nRow = 0; + TSDBROW *pRow; + TSDBKEY key; + int32_t c = 0; + STbDataIter iter = *pIter; + + iter.pRow = NULL; + while (true) { + pRow = tsdbTbDataIterGet(pIter); + + if (pRow == NULL) break; + key = TSDBROW_KEY(pRow); + + c = tBlockCmprFn(&(SBlock){.maxKey = key, .minKey = key}, pBlock); + if (c == 0) { + nRow++; + } else if (c > 0) { + break; + } else { + ASSERT(0); + } + } + + return nRow; +} + +static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) { + int32_t code = 0; + SBlockData *pBlockData = &pCommitter->nBlockData; + SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid}; + TSDBROW *pRow; + + tBlockDataReset(pBlockData); + pRow = tsdbTbDataIterGet(pIter); + code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow)); + if (code) goto _err; + while (true) { + if (pRow) break; + code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema); + if (code) goto _err; + + tsdbTbDataIterNext(pIter); + pRow = tsdbTbDataIterGet(pIter); + if (pRow) { + int32_t c = tBlockCmprFn(&(SBlock){}, pBlock); + + if (c == 0) { + code = tsdbCommitterUpdateSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow)); + if (code) goto _err; + } else if (c > 0) { + pRow = NULL; + } else { + ASSERT(0); + } + } + } + + // write as a subblock + code = tBlockCopy(pBlock, &pCommitter->nBlock); + if (code) goto _err; + + code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, &pCommitter->nBlock, + pCommitter->cmprAlg); + if (code) goto _err; + + code = tMapDataPutItem(&pCommitter->nBlockMap, &pCommitter->nBlock, tPutBlock); + if (code) goto _err; + + return code; + +_err: + tsdbError("vgId:%d tsdb merge as subblock failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + return code; +} + static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) { int32_t code = 0; STbDataIter *pIter = &(STbDataIter){0}; TSDBROW *pRow; - int32_t iBlock = 0; + int32_t iBlock; int32_t nBlock; int64_t suid; int64_t uid; @@ -991,6 +1042,8 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl if (pTbData) { tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter); pRow = tsdbTbDataIterGet(pIter); + if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL; + suid = pTbData->suid; uid = pTbData->uid; } else { @@ -1004,18 +1057,20 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl nBlock = pCommitter->oBlockMap.nItem; ASSERT(nBlock > 0); + suid = pBlockIdx->suid; uid = pBlockIdx->uid; } else { nBlock = 0; } - if ((pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) && nBlock == 0) goto _exit; + if (pRow == NULL && nBlock == 0) goto _exit; // start =========== tMapDataReset(&pCommitter->nBlockMap); SBlock *pBlock = &pCommitter->oBlock; + iBlock = 0; if (iBlock < nBlock) { tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock); } else { @@ -1024,24 +1079,28 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl // merge =========== while (true) { - if (((pRow == NULL) || TSDBROW_TS(pRow) > pCommitter->maxKey) && pBlock == NULL) break; + if (pRow == NULL && pBlock == NULL) break; - if (pRow && TSDBROW_TS(pRow) <= pCommitter->maxKey && pBlock) { + if (pRow && pBlock) { if (pBlock->last) { code = tsdbMergeTableData(pCommitter, pIter, pBlock, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); if (code) goto _err; pRow = tsdbTbDataIterGet(pIter); + if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL; iBlock++; if (iBlock < nBlock) { tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock); } else { pBlock = NULL; } + + ASSERT(pRow == NULL && pBlock == NULL); } else { int32_t c = tBlockCmprFn(&(SBlock){.maxKey = TSDBROW_KEY(pRow), .minKey = TSDBROW_KEY(pRow)}, pBlock); if (c > 0) { + // only disk data code = tsdbCommitTableDiskData(pCommitter, pBlock, pBlockIdx); if (code) goto _err; @@ -1052,24 +1111,43 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl pBlock = NULL; } } else if (c < 0) { + // only memory data code = tsdbCommitTableMemData(pCommitter, pIter, pBlock->minKey, 1); if (code) goto _err; pRow = tsdbTbDataIterGet(pIter); + if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL; } else { - int64_t nOvlp = 0; // (todo) + // merge memory and disk + int32_t nOvlp = tsdbGetOvlpNRow(pIter, pBlock); + ASSERT(nOvlp); if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) { - // add as a subblock + code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock); + if (code) goto _err; } else { - if (iBlock == nBlock - 1) { - code = tsdbMergeTableData(pCommitter, pIter, pBlock, - (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0); + TSDBKEY toKey = {.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}; + int8_t toDataOnly = 0; - if (code) goto _err; - } else { - // code = tsdbMergeTableData(pCommitter, pIter, pBlock, pBlock[1].minKey, 1); - if (code) goto _err; + if (iBlock < nBlock - 1) { + toDataOnly = 1; + + SBlock nextBlock = {0}; + tBlockReset(&nextBlock); + tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock + 1, &nextBlock, tGetBlock); + toKey = nextBlock.minKey; } + + code = tsdbMergeTableData(pCommitter, pIter, pBlock, toKey, toDataOnly); + if (code) goto _err; + } + + pRow = tsdbTbDataIterGet(pIter); + if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL; + iBlock++; + if (iBlock < nBlock) { + tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock); + } else { + pBlock = NULL; } } } @@ -1089,7 +1167,8 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl if (code) goto _err; pRow = tsdbTbDataIterGet(pIter); - ASSERT(pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey); + if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL; + ASSERT(pRow == NULL); } } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index c529a04280..4d47e10f44 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -35,6 +35,39 @@ void tMapDataClear(SMapData *pMapData) { } } +int32_t tMapDataCopy(SMapData *pMapDataSrc, SMapData *pMapDataDest) { + int32_t code = 0; + int32_t size; + + pMapDataDest->nItem = pMapDataSrc->nItem; + pMapDataDest->flag = pMapDataSrc->flag; + + switch (pMapDataDest->flag) { + case TSDB_OFFSET_I32: + size = sizeof(int32_t) * pMapDataDest->nItem; + break; + case TSDB_OFFSET_I16: + size = sizeof(int16_t) * pMapDataDest->nItem; + break; + case TSDB_OFFSET_I8: + size = sizeof(int8_t) * pMapDataDest->nItem; + break; + default: + ASSERT(0); + } + code = tsdbRealloc(&pMapDataDest->pOfst, size); + if (code) goto _exit; + memcpy(pMapDataDest->pOfst, pMapDataSrc->pOfst, size); + + pMapDataDest->nData = pMapDataSrc->nData; + code = tsdbRealloc(&pMapDataDest->pData, pMapDataDest->nData); + if (code) goto _exit; + memcpy(pMapDataDest->pData, pMapDataSrc->pData, pMapDataDest->nData); + +_exit: + return code; +} + int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) { int32_t code = 0; int32_t offset = pMapData->nData; @@ -369,6 +402,32 @@ void tBlockClear(SBlock *pBlock) { } } +int32_t tBlockCopy(SBlock *pBlockSrc, SBlock *pBlockDest) { + int32_t code = 0; + + pBlockDest->minKey = pBlockSrc->minKey; + pBlockDest->maxKey = pBlockSrc->maxKey; + pBlockDest->minVersion = pBlockSrc->minVersion; + pBlockDest->maxVersion = pBlockSrc->maxVersion; + pBlockDest->nRow = pBlockSrc->nRow; + pBlockDest->last = pBlockSrc->last; + pBlockDest->hasDup = pBlockSrc->hasDup; + pBlockDest->nSubBlock = pBlockSrc->nSubBlock; + for (int32_t iSubBlock = 0; iSubBlock < pBlockSrc->nSubBlock; iSubBlock++) { + pBlockDest->aSubBlock[iSubBlock].nRow = pBlockSrc->aSubBlock[iSubBlock].nRow; + pBlockDest->aSubBlock[iSubBlock].cmprAlg = pBlockSrc->aSubBlock[iSubBlock].cmprAlg; + pBlockDest->aSubBlock[iSubBlock].offset = pBlockSrc->aSubBlock[iSubBlock].offset; + pBlockDest->aSubBlock[iSubBlock].vsize = pBlockSrc->aSubBlock[iSubBlock].vsize; + pBlockDest->aSubBlock[iSubBlock].ksize = pBlockSrc->aSubBlock[iSubBlock].ksize; + pBlockDest->aSubBlock[iSubBlock].bsize = pBlockSrc->aSubBlock[iSubBlock].bsize; + code = tMapDataCopy(&pBlockSrc->aSubBlock[iSubBlock].mBlockCol, &pBlockDest->aSubBlock[iSubBlock].mBlockCol); + if (code) goto _exit; + } + +_exit: + return code; +} + int32_t tPutBlock(uint8_t *p, void *ph) { int32_t n = 0; SBlock *pBlock = (SBlock *)ph;