From 4bfdfe89a58fdea3687c09e9051a4666e82f3997 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 27 Sep 2022 11:02:04 +0800 Subject: [PATCH] adjust api --- source/dnode/vnode/src/inc/tsdb.h | 2 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 3 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 7 ++-- source/dnode/vnode/src/tsdb/tsdbRead.c | 19 +++++---- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 16 ++++++-- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 7 ++-- source/dnode/vnode/src/tsdb/tsdbUtil.c | 41 +++++++++++++++---- 7 files changed, 67 insertions(+), 28 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index c2f0d58279..55a82fd776 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -150,7 +150,7 @@ int32_t tCmprBlockL(void const *lhs, void const *rhs); int32_t tBlockDataCreate(SBlockData *pBlockData); void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear); -int32_t tBlockDataInit(SBlockData *pBlockData, int64_t suid, int64_t uid, STSchema *pTSchema); +int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid); int32_t tBlockDataInitEx(SBlockData *pBlockData, SBlockData *pBlockDataFrom); void tBlockDataReset(SBlockData *pBlockData); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 32272b541c..3c145cc7ca 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -612,7 +612,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk); /* code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, NULL); */ tBlockDataReset(state->pBlockData); - code = tBlockDataInit(state->pBlockData, state->suid, state->uid, state->pTSchema); + TABLEID tid = {.suid = state->suid, .uid = state->uid}; + code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, NULL, 0); if (code) goto _err; code = tsdbReadDataBlock(state->pDataFReader, &block, state->pBlockData); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index a619b9f2e4..45f2f8ccde 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -1305,7 +1305,8 @@ static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) { if (!pBDatal->suid && !pBDatal->uid) { ASSERT(pCommitter->skmTable.suid == id.suid); ASSERT(pCommitter->skmTable.uid == id.uid); - code = tBlockDataInit(pBDatal, id.suid, id.suid ? 0 : id.uid, pCommitter->skmTable.pTSchema); + TABLEID tid = {.suid = id.suid, .uid = id.suid ? 0 : id.uid}; + code = tBlockDataInit(pBDatal, &tid, pCommitter->skmTable.pTSchema, NULL, 0); if (code) goto _exit; } @@ -1428,9 +1429,9 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { // impl code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable); if (code) goto _err; - code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema); + code = tBlockDataInit(&pCommitter->dReader.bData, &id, pCommitter->skmTable.pTSchema, NULL, 0); if (code) goto _err; - code = tBlockDataInit(&pCommitter->dWriter.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema); + code = tBlockDataInit(&pCommitter->dWriter.bData, &id, pCommitter->skmTable.pTSchema, NULL, 0); if (code) goto _err; /* merge with data in .data file */ diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 3d2c89ba02..9283df7455 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1741,7 +1741,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* tsLast = getCurrentKeyInLastBlock(pLastBlockReader); } - int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo)? pBlockData->aTSKEY[pDumpInfo->rowIndex]:INT64_MIN; + int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); @@ -2009,12 +2009,12 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { } static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; } -bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) { - if (pBlockData->nRow > 0) { - ASSERT(pBlockData->nRow == pDumpInfo->totalRows); +bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) { + if (pBlockData->nRow > 0) { + ASSERT(pBlockData->nRow == pDumpInfo->totalRows); } - return pBlockData->nRow > 0 && (!pDumpInfo->allDumped); + return pBlockData->nRow > 0 && (!pDumpInfo->allDumped); } int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, @@ -2452,7 +2452,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { code = buildComposedDataBlock(pReader); } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) { tBlockDataReset(&pStatus->fileBlockData); - code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema); + TABLEID tid = {.suid = pReader->suid, .uid = pScanInfo->uid}; + code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2932,7 +2933,8 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn // 3. load the neighbor block, and set it to be the currently accessed file data block tBlockDataReset(&pStatus->fileBlockData); - int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pFBlock->uid, pReader->pSchema); + TABLEID tid = {.suid = pReader->suid, .uid = pFBlock->uid}; + int32_t code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3684,7 +3686,8 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); tBlockDataReset(&pStatus->fileBlockData); - int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pBlockScanInfo->uid, pReader->pSchema); + TABLEID tid = {.suid = pReader->suid, .uid = pBlockScanInfo->uid}; + int32_t code = tBlockDataInit(&pStatus->fileBlockData, &tid, pReader->pSchema, NULL, 0); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 5fe0b408b1..de59ea95d6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -926,12 +926,13 @@ _err: return code; } -static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, SBlockData *pBlockData) { +static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, SBlockData *pBlockData, + int32_t iStt) { int32_t code = 0; tBlockDataClear(pBlockData); - STsdbFD *pFD = pReader->pDataFD; + STsdbFD *pFD = (iStt < 0) ? pReader->pDataFD : pReader->aSttFD[iStt]; // uid + version + tskey code = tRealloc(&pReader->aBuf[0], pBlkInfo->szKey); @@ -1070,9 +1071,12 @@ _err: int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) { int32_t code = 0; - code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], pBlockData); + code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], pBlockData, -1); if (code) goto _err; + ASSERT(pDataBlk->nSubBlock == 1); + +#if 0 if (pDataBlk->nSubBlock > 1) { SBlockData bData1; SBlockData bData2; @@ -1113,6 +1117,7 @@ int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData tBlockDataDestroy(&bData1, 1); tBlockDataDestroy(&bData2, 1); } +#endif return code; @@ -1124,6 +1129,10 @@ _err: int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData) { int32_t code = 0; + code = tsdbReadBlockDataImpl(pReader, &pSttBlk->bInfo, pBlockData, iStt); + if (code) goto _err; + +#if 0 // alloc code = tRealloc(&pReader->aBuf[0], pSttBlk->bInfo.szBlock); if (code) goto _err; @@ -1136,6 +1145,7 @@ int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, code = tDecmprBlockData(pReader->aBuf[0], pSttBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]); if (code) goto _err; +#endif return code; _err: diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 99e88a442c..9344581894 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -319,7 +319,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, id.suid, id.uid, &pReader->skmTable); if (code) goto _err; - code = tBlockDataInit(pBlockData, id.suid, id.uid, pReader->skmTable.pTSchema); + code = tBlockDataInit(pBlockData, &id, pReader->skmTable.pTSchema, NULL, 0); if (code) goto _err; while (pRowInfo->suid == id.suid && pRowInfo->uid == id.uid) { @@ -715,7 +715,7 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI if (code) goto _err; tMapDataReset(&pWriter->dWriter.mDataBlk); - code = tBlockDataInit(&pWriter->dWriter.bData, pId->suid, pId->uid, pWriter->skmTable.pTSchema); + code = tBlockDataInit(&pWriter->dWriter.bData, pId, pWriter->skmTable.pTSchema, NULL, 0); if (code) goto _err; return code; @@ -1000,7 +1000,8 @@ static int32_t tsdbSnapWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) { code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pWriter->id.suid, pWriter->id.uid, &pWriter->skmTable); if (code) goto _err; - code = tBlockDataInit(pBData, pWriter->id.suid, pWriter->id.suid ? 0 : pWriter->id.uid, pWriter->skmTable.pTSchema); + TABLEID tid = {.suid = pWriter->id.suid, .uid = pWriter->id.suid ? 0 : pWriter->id.uid}; + code = tBlockDataInit(pBData, &tid, pWriter->skmTable.pTSchema, NULL, 0); if (code) goto _err; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 8026441cbc..4999e7a49a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -948,24 +948,47 @@ void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear) { pBlockData->aColData = NULL; } -int32_t tBlockDataInit(SBlockData *pBlockData, int64_t suid, int64_t uid, STSchema *pTSchema) { +int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid) { int32_t code = 0; - ASSERT(suid || uid); + ASSERT(pId->suid || pId->uid); - pBlockData->suid = suid; - pBlockData->uid = uid; + pBlockData->suid = pId->suid; + pBlockData->uid = pId->uid; pBlockData->nRow = 0; taosArrayClear(pBlockData->aIdx); - for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) { + if (aCid) { + int32_t iColumn = 1; STColumn *pTColumn = &pTSchema->columns[iColumn]; + for (int32_t iCid = 0; iCid < nCid; iCid++) { + while (pTColumn && pTColumn->colId < aCid[iCid]) { + iColumn++; + pTColumn = (iColumn < pTSchema->numOfCols) ? &pTSchema->columns[iColumn] : NULL; + } - SColData *pColData; - code = tBlockDataAddColData(pBlockData, iColumn - 1, &pColData); - if (code) goto _exit; + if (pTColumn == NULL) { + break; + } else if (pTColumn->colId == aCid[iCid]) { + SColData *pColData; + code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aIdx), &pColData); + if (code) goto _exit; + tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) ? 1 : 0); - tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) ? 1 : 0); + iColumn++; + pTColumn = (iColumn < pTSchema->numOfCols) ? &pTSchema->columns[iColumn] : NULL; + } + } + } else { + for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) { + STColumn *pTColumn = &pTSchema->columns[iColumn]; + + SColData *pColData; + code = tBlockDataAddColData(pBlockData, iColumn - 1, &pColData); + if (code) goto _exit; + + tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) ? 1 : 0); + } } _exit: