diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index f3c94227a0..812bfa81ad 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -219,7 +219,7 @@ SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid); int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync); int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter); -int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf); +int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **ppBuf); int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx); int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2, SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg); @@ -229,7 +229,7 @@ SDFileSet *tsdbDataFWriterGetWSet(SDataFWriter *pWriter); // SDataFReader int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFReaderClose(SDataFReader **ppReader); -int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *pMapData, uint8_t **ppBuf); +int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppBuf); int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf); int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 7d7a7cfc2b..2bf4f5cd73 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -31,13 +31,13 @@ typedef struct { TSKEY maxKey; // commit file data SDataFReader *pReader; - SMapData oBlockIdxMap; // SMapData, read from reader - SMapData oBlockMap; // SMapData, read from reader + SArray *aBlockIdx; // SArray + SMapData oBlockMap; // SMapData, read from reader SBlock oBlock; SBlockData oBlockData; SDataFWriter *pWriter; - SMapData nBlockIdxMap; // SMapData, build by committer - SMapData nBlockMap; // SMapData + SArray *aBlockIdxN; // SArray + SMapData nBlockMap; // SMapData SBlock nBlock; SBlockData nBlockData; int64_t suid; @@ -245,9 +245,10 @@ static int32_t tsdbCommitDelImpl(SCommitter *pCommitter) { } else { goto _commit_disk_del; } + } else if (pTbData) { + goto _commit_mem_del; } else { - if (pTbData) goto _commit_mem_del; - if (pDelIdx) goto _commit_disk_del; + goto _commit_disk_del; } _commit_mem_del: @@ -326,7 +327,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { pCommitter->nextKey = TSKEY_MAX; // old - tMapDataReset(&pCommitter->oBlockIdxMap); + taosArrayClear(pCommitter->aBlockIdx); tMapDataReset(&pCommitter->oBlockMap); tBlockReset(&pCommitter->oBlock); tBlockDataReset(&pCommitter->oBlockData); @@ -335,12 +336,12 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet); if (code) goto _err; - code = tsdbReadBlockIdx(pCommitter->pReader, &pCommitter->oBlockIdxMap, NULL); + code = tsdbReadBlockIdx(pCommitter->pReader, pCommitter->aBlockIdx, NULL); if (code) goto _err; } // new - tMapDataReset(&pCommitter->nBlockIdxMap); + taosArrayClear(pCommitter->aBlockIdxN); tMapDataReset(&pCommitter->nBlockMap); tBlockReset(&pCommitter->nBlock); tBlockDataReset(&pCommitter->nBlockData); @@ -627,13 +628,16 @@ _err: static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int64_t uid) { int32_t code = 0; - SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = suid, .uid = uid}; + SBlockIdx blockIdx = {.suid = suid, .uid = uid}; + SBlockIdx *pBlockIdx = &blockIdx; code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx); if (code) goto _err; - code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx); - if (code) goto _err; + if (taosArrayPush(pCommitter->aBlockIdxN, pBlockIdx) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } return code; @@ -720,7 +724,8 @@ _err: static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) { int32_t code = 0; - STbDataIter *pIter = &(STbDataIter){0}; + STbDataIter iter = {0}; + STbDataIter *pIter = &iter; TSDBROW *pRow; int32_t iBlock; int32_t nBlock; @@ -883,19 +888,14 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { int32_t iTbData = 0; int32_t nTbData = taosArrayGetSize(pMemTable->aTbData); int32_t iBlockIdx = 0; - int32_t nBlockIdx = pCommitter->oBlockIdxMap.nItem; + int32_t nBlockIdx = taosArrayGetSize(pCommitter->aBlockIdx); STbData *pTbData; - SBlockIdx *pBlockIdx = &(SBlockIdx){0}; + SBlockIdx *pBlockIdx; ASSERT(nTbData > 0); pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); - if (iBlockIdx < nBlockIdx) { - tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx); - } else { - pBlockIdx = NULL; - } - + pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL; while (pTbData || pBlockIdx) { if (pTbData && pBlockIdx) { int32_t c = tTABLEIDCmprFn(pTbData, pBlockIdx); @@ -918,11 +918,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { if (code) goto _err; iTbData++; - if (iTbData < nTbData) { - pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); - } else { - pTbData = NULL; - } + pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL; continue; _commit_table_disk_data: @@ -930,11 +926,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { if (code) goto _err; iBlockIdx++; - if (iBlockIdx < nBlockIdx) { - tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx); - } else { - pBlockIdx = NULL; - } + pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL; continue; _commit_table_mem_and_disk: @@ -942,17 +934,9 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { if (code) goto _err; iBlockIdx++; - if (iBlockIdx < nBlockIdx) { - tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx); - } else { - pBlockIdx = NULL; - } + pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL; iTbData++; - if (iTbData < nTbData) { - pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); - } else { - pTbData = NULL; - } + pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL; continue; } @@ -967,7 +951,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { int32_t code = 0; // write blockIdx - code = tsdbWriteBlockIdx(pCommitter->pWriter, &pCommitter->nBlockIdxMap, NULL); + code = tsdbWriteBlockIdx(pCommitter->pWriter, pCommitter->aBlockIdxN, NULL); if (code) goto _err; // update file header @@ -1051,11 +1035,11 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { int32_t code = 0; pCommitter->pReader = NULL; - pCommitter->oBlockIdxMap = tMapDataInit(); + pCommitter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); pCommitter->oBlockMap = tMapDataInit(); pCommitter->oBlock = tBlockInit(); pCommitter->pWriter = NULL; - pCommitter->nBlockIdxMap = tMapDataInit(); + pCommitter->aBlockIdxN = taosArrayInit(0, sizeof(SBlockIdx)); pCommitter->nBlockMap = tMapDataInit(); pCommitter->nBlock = tBlockInit(); code = tBlockDataInit(&pCommitter->oBlockData); @@ -1071,11 +1055,11 @@ _exit: } static void tsdbCommitDataEnd(SCommitter *pCommitter) { - // tMapDataClear(&pCommitter->oBlockIdxMap); + taosArrayDestroy(pCommitter->aBlockIdx); // tMapDataClear(&pCommitter->oBlockMap); // tBlockClear(&pCommitter->oBlock); // tBlockDataClear(&pCommitter->oBlockData); - // tMapDataClear(&pCommitter->nBlockIdxMap); + taosArrayDestroy(pCommitter->aBlockIdxN); // tMapDataClear(&pCommitter->nBlockMap); // tBlockClear(&pCommitter->nBlock); // tBlockDataClear(&pCommitter->nBlockData); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 56797b61af..a91c60ea5c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -747,39 +747,36 @@ _end: static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) { int32_t code = 0; - SMapData blockIdxMap = {0}; - tMapDataReset(&blockIdxMap); + SArray *aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - code = tsdbReadBlockIdx(pFileReader, &blockIdxMap, NULL); + code = tsdbReadBlockIdx(pFileReader, aBlockIdx, NULL); if (code != TSDB_CODE_SUCCESS) { goto _err; } - if (blockIdxMap.nItem == 0) { + if (taosArrayGetSize(aBlockIdx) == 0) { + taosArrayClear(aBlockIdx); return TSDB_CODE_SUCCESS; } - SBlockIdx blockIndex = {0}; - for (int32_t i = 0; i < blockIdxMap.nItem; ++i) { - code = tMapDataGetItemByIdx(&blockIdxMap, i, &blockIndex, tGetBlockIdx); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + SBlockIdx *pBlockIdx; + for (int32_t i = 0; i < taosArrayGetSize(aBlockIdx); ++i) { + pBlockIdx = (SBlockIdx *)taosArrayGet(aBlockIdx, i); - if (blockIndex.suid != pReader->suid) { + if (pBlockIdx->suid != pReader->suid) { continue; } // this block belongs to a table that is not queried. - void* p = taosHashGet(pReader->status.pTableMap, &blockIndex.uid, sizeof(uint64_t)); + void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t)); if (p == NULL) { continue; } if ((ASCENDING_TRAVERSE(pReader->order) && - (blockIndex.minKey > pReader->window.ekey || blockIndex.maxKey < pReader->window.skey)) || + (pBlockIdx->minKey > pReader->window.ekey || pBlockIdx->maxKey < pReader->window.skey)) || (!ASCENDING_TRAVERSE(pReader->order) && - (blockIndex.minKey > pReader->window.skey || blockIndex.maxKey < pReader->window.ekey))) { + (pBlockIdx->minKey > pReader->window.skey || pBlockIdx->maxKey < pReader->window.ekey))) { continue; } @@ -788,15 +785,15 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, pScanInfo->pBlockList = taosArrayInit(16, sizeof(SBlock)); } - pScanInfo->blockIdx = blockIndex; - taosArrayPush(pIndexList, &blockIndex); + pScanInfo->blockIdx = *pBlockIdx; + taosArrayPush(pIndexList, pBlockIdx); } -// tMapDataClear(&blockIdxMap); + taosArrayDestroy(aBlockIdx); return TSDB_CODE_SUCCESS; _err: - tMapDataClear(&blockIdxMap); + taosArrayDestroy(aBlockIdx); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 4f07dfd497..cb8e981d2d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -519,15 +519,18 @@ _err: return code; } -int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *mBlockIdx, uint8_t **ppBuf) { - int32_t code = 0; - int64_t offset = pReader->pSet->fHead.offset; - int64_t size = pReader->pSet->fHead.size - offset; - int64_t n; - uint32_t delimiter; +int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppBuf) { + int32_t code = 0; + int64_t offset = pReader->pSet->fHead.offset; + int64_t size = pReader->pSet->fHead.size - offset; + uint8_t *pBuf = NULL; + int64_t n; + uint32_t delimiter; + SBlockIdx blockIdx; + + if (!ppBuf) ppBuf = &pBuf; // alloc - if (!ppBuf) ppBuf = &mBlockIdx->pBuf; code = tsdbRealloc(ppBuf, size); if (code) goto _err; @@ -554,16 +557,27 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *mBlockIdx, uint8_t **p } // decode - n = 0; - n += tGetU32(*ppBuf + n, &delimiter); + n = tGetU32(*ppBuf + n, &delimiter); ASSERT(delimiter == TSDB_FILE_DLMT); - n += tGetMapData(*ppBuf + n, mBlockIdx); + + taosArrayClear(aBlockIdx); + while (n < size - sizeof(TSCKSUM)) { + n += tGetBlockIdx(*ppBuf + n, &blockIdx); + + if (taosArrayPush(aBlockIdx, &blockIdx) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + } + ASSERT(n + sizeof(TSCKSUM) == size); + tsdbFree(pBuf); return code; _err: tsdbError("vgId:%d read block idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + tsdbFree(pBuf); return code; } @@ -1257,27 +1271,31 @@ _err: return code; } -int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *mBlockIdx, uint8_t **ppBuf) { +int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **ppBuf) { int32_t code = 0; - int64_t size; SHeadFile *pHeadFile = &pWriter->wSet.fHead; - int64_t n; uint8_t *pBuf = NULL; + int64_t size; + int64_t n; + + if (!ppBuf) ppBuf = &pBuf; // prepare - size = 0; - size += tPutU32(NULL, TSDB_FILE_DLMT); - size = size + tPutMapData(NULL, mBlockIdx) + sizeof(TSCKSUM); + size = tPutU32(NULL, TSDB_FILE_DLMT); + for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) { + size += tPutBlockIdx(NULL, taosArrayGet(aBlockIdx, iBlockIdx)); + } + size += sizeof(TSCKSUM); // alloc - if (!ppBuf) ppBuf = &pBuf; code = tsdbRealloc(ppBuf, size); if (code) goto _err; // build - n = 0; - n += tPutU32(*ppBuf + n, TSDB_FILE_DLMT); - n += tPutMapData(*ppBuf + n, mBlockIdx); + n = tPutU32(*ppBuf + n, TSDB_FILE_DLMT); + for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(aBlockIdx); iBlockIdx++) { + n += tPutBlockIdx(*ppBuf + n, taosArrayGet(aBlockIdx, iBlockIdx)); + } taosCalcChecksumAppend(0, *ppBuf, size); ASSERT(n + sizeof(TSCKSUM) == size);