From 8ae2ab1c6e92d044920aa9fed3506acc9e416352 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 8 Aug 2022 05:34:01 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/inc/tsdb.h | 31 ++++----- source/dnode/vnode/src/tsdb/tsdbCommit.c | 10 +-- source/dnode/vnode/src/tsdb/tsdbRead.c | 6 +- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 13 ++-- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 6 +- source/dnode/vnode/src/tsdb/tsdbUtil.c | 66 ++++++++++++++----- 6 files changed, 82 insertions(+), 50 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index f33b8cd51d..0cbbe4c351 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -138,22 +138,23 @@ int32_t tGetColData(uint8_t *p, SColData *pColData); #define tBlockDataFirstKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataFirstRow(PBLOCKDATA)) #define tBlockDataLastKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataLastRow(PBLOCKDATA)) -int32_t tBlockDataCreate(SBlockData *pBlockData); -void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear); - -void tBlockDataReset(SBlockData *pBlockData); -int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema, int64_t suid, int64_t uid); -int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData); -int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); -void tBlockDataClearData(SBlockData *pBlockData); - -int32_t tBlockDataCorrectSchema(SBlockData *pBlockData, SBlockData *pBlockDataFrom); -int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData); -int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest); +int32_t tBlockDataCreate(SBlockData *pBlockData); +void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear); +int32_t tBlockDataInit(SBlockData *pBlockData, int64_t suid, int64_t uid, STSchema *pTSchema); +int32_t tBlockDataInitEx(SBlockData *pBlockData, int64_t *suid, int64_t uid, SArray *aColId); +void tBlockDataReset(SBlockData *pBlockData); +int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); +void tBlockDataClear(SBlockData *pBlockData); SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx); void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData); -int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData); -int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData); + +#if 1 +int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData); +int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData); +int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest); +int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData); +int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData); +#endif // SDiskDataHdr int32_t tPutDiskDataHdr(uint8_t *p, void *ph); int32_t tGetDiskDataHdr(uint8_t *p, void *ph); @@ -190,7 +191,7 @@ int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol uint8_t **ppBuf); int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, int32_t nVal, SColData *pColData, uint8_t **ppBuf); -int32_t tsdbReadAndCheckFile(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size); +int32_t tsdbReadAndCheckFile(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck); // tsdbMemTable ============================================================================================== // SMemTable int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 44f360d597..d48125beb5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -539,7 +539,7 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) { if (code) goto _err; // clear - tBlockDataClearData(pBlockData); + tBlockDataClear(pBlockData); return code; @@ -578,7 +578,7 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { } // clear - tBlockDataClearData(pBlockData); + tBlockDataClear(pBlockData); return code; @@ -596,7 +596,7 @@ static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter, S code = tsdbReadDataBlock(pCommitter->dReader.pReader, pBlock, pBlockDataR, NULL, 0); if (code) goto _err; - tBlockDataClearData(pBlockDataW); + tBlockDataClear(pBlockDataW); int32_t iRow = 0; TSDBROW row; TSDBROW *pRow1 = tsdbTbDataIterGet(pIter); @@ -672,7 +672,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter STbData *pTbData = pIter->pTbData; SBlockData *pBlockData = &pCommitter->dWriter.bData; - tBlockDataClearData(pBlockData); + tBlockDataClear(pBlockData); TSDBROW *pRow = tsdbTbDataIterGet(pIter); while (true) { if (pRow == NULL) { @@ -741,7 +741,7 @@ static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, S STbData *pTbData = pIter->pTbData; SBlockData *pBlockData = &pCommitter->dWriter.bData; - tBlockDataClearData(pBlockData); + tBlockDataClear(pBlockData); TSDBROW *pRow = tsdbTbDataIterGet(pIter); while (true) { if (pRow == NULL) break; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index cd1e46c342..baafdb488c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1748,7 +1748,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader); if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) { tBlockDataReset(&pStatus->fileBlockData); - tBlockDataClearData(&pStatus->fileBlockData); + tBlockDataClear(&pStatus->fileBlockData); code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2208,7 +2208,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn // 3. load the neighbor block, and set it to be the currently accessed file data block tBlockDataReset(&pStatus->fileBlockData); - tBlockDataClearData(&pStatus->fileBlockData); + tBlockDataClear(&pStatus->fileBlockData); int32_t code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2871,7 +2871,7 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); tBlockDataReset(&pStatus->fileBlockData); - tBlockDataClearData(&pStatus->fileBlockData); + tBlockDataClear(&pStatus->fileBlockData); int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); if (code != TSDB_CODE_SUCCESS) { tBlockDataDestroy(&pStatus->fileBlockData, 1); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 69f39bb3c5..17ab9eff67 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -731,13 +731,14 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo int32_t nColId, SBlockData *pBlockData) { int32_t code = 0; - // TODO - tBlockDataReset(pBlockData); + ASSERT(pBlockData->suid || pBlockData->uid); + + tBlockDataClear(pBlockData); TdFilePtr pFD = fromLast ? pReader->pLastFD : pReader->pDataFD; // uid + version + tskey - code = tsdbReadAndCheckFile(pFD, pBlkInfo->offset, &pReader->pBuf1, pBlkInfo->szKey); + code = tsdbReadAndCheckFile(pFD, pBlkInfo->offset, &pReader->pBuf1, pBlkInfo->szKey, 1); if (code) goto _err; SDiskDataHdr hdr; uint8_t *p = pReader->pBuf1 + tGetDiskDataHdr(pReader->pBuf1, &hdr); @@ -776,8 +777,8 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo // read and decode columns if (hdr.szBlkCol > 0) { - code = - tsdbReadAndCheckFile(pFD, pBlkInfo->offset + pBlkInfo->szKey, &pReader->pBuf1, hdr.szBlkCol + sizeof(TSCKSUM)); + code = tsdbReadAndCheckFile(pFD, pBlkInfo->offset + pBlkInfo->szKey, &pReader->pBuf1, + hdr.szBlkCol + sizeof(TSCKSUM), 1); if (code) goto _err; int32_t n = 0; @@ -797,7 +798,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo } else { code = tsdbReadAndCheckFile( pFD, pBlkInfo->offset + pBlkInfo->szKey + hdr.szBlkCol + sizeof(TSCKSUM) + blockCol.offset, &pReader->pBuf2, - blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM)); + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM), 1); code = tsdbDecmprColData(pReader->pBuf2, &blockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->pBuf3); if (code) goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 973cd1e53a..4064ad950c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -482,7 +482,7 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { if (code) goto _err; tBlockReset(&pWriter->blockW); - tBlockDataClearData(&pWriter->bDataW); + tBlockDataClear(&pWriter->bDataW); } pWriter->iRow++; @@ -675,7 +675,7 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { if (code) goto _err; tBlockReset(&pWriter->blockW); - tBlockDataClearData(&pWriter->bDataW); + tBlockDataClear(&pWriter->bDataW); } code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); @@ -725,7 +725,7 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { if (code) goto _err; tBlockReset(&pWriter->blockW); - tBlockDataClearData(&pWriter->bDataW); + tBlockDataClear(&pWriter->bDataW); } return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 039fe54f3d..92495096da 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1145,31 +1145,54 @@ void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear) { pBlockData->aColData = NULL; } -void tBlockDataReset(SBlockData *pBlockData) { - pBlockData->suid = 0; - pBlockData->uid = 0; - pBlockData->nRow = 0; - taosArrayClear(pBlockData->aIdx); -} - -int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema, int64_t suid, int64_t uid) { +int32_t tBlockDataInit(SBlockData *pBlockData, int64_t suid, int64_t uid, STSchema *pTSchema) { int32_t code = 0; ASSERT(suid || uid); - tBlockDataReset(pBlockData); pBlockData->suid = suid; pBlockData->uid = uid; + pBlockData->nRow = 0; + + taosArrayClear(pBlockData->aIdx); + for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) { + STColumn *pTColumn = &pTSchema->columns[iColumn]; + + SColData *pColData; + code = tBlockDataAddColData(pBlockData, iColumn - 1, &pColData); + if (code) goto _exit; + + tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) ? 1 : 0); + } + +_exit: + return code; +} + +int32_t tBlockDataInitEx(SBlockData *pBlockData, int64_t *suid, int64_t uid, SArray *aColId) { + int32_t code = 0; + + ASSERT(suid || uid); + + pBlockData->suid = suid; + pBlockData->uid = uid; + pBlockData->nRow = 0; + + taosArrayClear(pBlockData->aIdx); + if (aColId) { + int16_t lcid = -1; + for (int32_t iColId = 0; iColId < taosArrayGetSize(aColId); iColId++) { + int16_t cid = *(int16_t *)taosArrayGet(aColId, iColId); + + ASSERT(cid != PRIMARYKEY_TIMESTAMP_COL_ID); + ASSERT(cid > lcid); + lcid = cid; - if (pTSchema) { - for (int32_t iColumn = 1; iColumn < pTSchema->numOfCols; iColumn++) { - STColumn *pTColumn = &pTSchema->columns[iColumn]; SColData *pColData; - - code = tBlockDataAddColData(pBlockData, iColumn - 1, &pColData); + code = tBlockDataAddColData(pBlockData, iColId, &pColData); if (code) goto _exit; - tColDataInit(pColData, pTColumn->colId, pTColumn->type, (pTColumn->flags & COL_SMA_ON) != 0); + tColDataInit(pColData, cid, TSDB_DATA_TYPE_NULL, -1); } } @@ -1177,7 +1200,14 @@ _exit: return code; } -void tBlockDataClearData(SBlockData *pBlockData) { +void tBlockDataReset(SBlockData *pBlockData) { + pBlockData->suid = 0; + pBlockData->uid = 0; + pBlockData->nRow = 0; + taosArrayClear(pBlockData->aIdx); +} + +void tBlockDataClear(SBlockData *pBlockData) { pBlockData->nRow = 0; for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); @@ -1869,7 +1899,7 @@ _exit: return code; } -int32_t tsdbReadAndCheckFile(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size) { +int32_t tsdbReadAndCheckFile(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int32_t size, int8_t toCheck) { int32_t code = 0; // alloc @@ -1894,7 +1924,7 @@ int32_t tsdbReadAndCheckFile(TdFilePtr pFD, int64_t offset, uint8_t **ppOut, int } // check - if (!taosCheckChecksumWhole(*ppOut, size)) { + if (toCheck && !taosCheckChecksumWhole(*ppOut, size)) { code = TSDB_CODE_FILE_CORRUPTED; goto _exit; }