From ed69d8ee9f12d11b5a8ff95384d805f4590da970 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 29 Jun 2022 10:05:21 +0000 Subject: [PATCH] more work --- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 223 +++++++++--------- 1 file changed, 116 insertions(+), 107 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index a3f1c92772..4f07dfd497 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -738,98 +738,135 @@ _err: return code; } +static int32_t tsdbReadColDataImpl(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock, + int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1, + uint8_t **ppBuf2) { + TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; + SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock]; + int32_t code = 0; + int64_t offset; + int64_t size; + int64_t n; + + tBlockDataReset(pBlockData); + pBlockData->nRow = pSubBlock->nRow; + + // TSDBKEY + offset = pSubBlock->offset + sizeof(SBlockDataHdr); + size = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM); + code = tsdbRealloc(ppBuf1, size); + if (code) goto _err; + + n = taosLSeekFile(pFD, offset, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + n = taosReadFile(pFD, *ppBuf1, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2); + if (code) goto _err; + + // OTHER + SBlockCol blockCol; + SBlockCol *pBlockCol = &blockCol; + SColData *pColData; + for (int32_t iCol = 0; iCol < nCol; iCol++) { + int16_t cid = aColId[iCol]; + + if (tMapDataSearch(&pSubBlock->mBlockCol, &(SBlockCol){.cid = cid}, tGetBlockCol, tBlockColCmprFn, pBlockCol) == + 0) { + code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData); + if (code) goto _err; + + tColDataReset(pColData, pBlockCol->cid, pBlockCol->type); + if (pBlockCol->flag == HAS_NULL) { + for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) { + code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type)); + if (code) goto _err; + } + } else { + offset = pSubBlock->offset + sizeof(SBlockDataHdr) + pSubBlock->vsize + pSubBlock->ksize + pBlockCol->offset; + size = pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); + + code = tsdbRealloc(ppBuf1, size); + if (code) goto _err; + + // seek + n = taosLSeekFile(pFD, offset, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // read + n = taosReadFile(pFD, *ppBuf1, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + code = tsdbRecoverColData(pBlockData, pSubBlock, pBlockCol, pColData, *ppBuf1, ppBuf2); + if (code) goto _err; + } + } + } + return code; + +_err: + return code; +} + int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2) { - int32_t code = 0; - TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; - uint8_t *pBuf1 = NULL; - uint8_t *pBuf2 = NULL; + int32_t code = 0; + uint8_t *pBuf1 = NULL; + uint8_t *pBuf2 = NULL; ASSERT(nCol == 0 || aColId[0] != PRIMARYKEY_TIMESTAMP_COL_ID); if (!ppBuf1) ppBuf1 = &pBuf1; if (!ppBuf2) ppBuf2 = &pBuf2; - for (int32_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock]; - int64_t offset; - int64_t size; - int64_t n; + code = tsdbReadColDataImpl(pReader, pBlockIdx, pBlock, 0, aColId, nCol, pBlockData, ppBuf1, ppBuf2); + if (code) goto _err; - tBlockDataReset(pBlockData); - pBlockData->nRow = pSubBlock->nRow; + if (pBlock->nSubBlock > 1) { + SBlockData *pBlockData1 = &(SBlockData){0}; + SBlockData *pBlockData2 = &(SBlockData){0}; - // TSDBKEY - offset = pSubBlock->offset + sizeof(SBlockDataHdr); - size = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM); - code = tsdbRealloc(ppBuf1, size); - if (code) goto _err; + for (int32_t iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { + code = tsdbReadColDataImpl(pReader, pBlockIdx, pBlock, iSubBlock, aColId, nCol, pBlockData1, ppBuf1, ppBuf2); + if (code) goto _err; - n = taosLSeekFile(pFD, offset, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tBlockDataCopy(pBlockData, pBlockData2); + if (code) { + tBlockDataClear(pBlockData1); + tBlockDataClear(pBlockData2); + goto _err; + } - n = taosReadFile(pFD, *ppBuf1, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2); - if (code) goto _err; - - // OTHER - SBlockCol blockCol; - SBlockCol *pBlockCol = &blockCol; - SColData *pColData; - for (int32_t iCol = 0; iCol < nCol; iCol++) { - int16_t cid = aColId[iCol]; - - if (tMapDataSearch(&pSubBlock->mBlockCol, &(SBlockCol){.cid = cid}, tGetBlockCol, tBlockColCmprFn, pBlockCol) == - 0) { - code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData); - if (code) goto _err; - - tColDataReset(pColData, pBlockCol->cid, pBlockCol->type); - if (pBlockCol->flag == HAS_NULL) { - for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) { - code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type)); - if (code) goto _err; - } - } else { - offset = pSubBlock->offset + sizeof(SBlockDataHdr) + pSubBlock->vsize + pSubBlock->ksize + pBlockCol->offset; - size = pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); - - code = tsdbRealloc(ppBuf1, size); - if (code) goto _err; - - // seek - n = taosLSeekFile(pFD, offset, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - // read - n = taosReadFile(pFD, *ppBuf1, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - code = tsdbRecoverColData(pBlockData, pSubBlock, pBlockCol, pColData, *ppBuf1, ppBuf2); - if (code) goto _err; - } + code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData); + if (code) { + tBlockDataClear(pBlockData1); + tBlockDataClear(pBlockData2); + goto _err; } } + + tBlockDataClear(pBlockData1); + tBlockDataClear(pBlockData2); } tsdbFree(pBuf1); @@ -876,34 +913,6 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, goto _err; } - // // check - // p = *ppBuf1; - // SBlockDataHdr *pHdr = (SBlockDataHdr *)p; - // ASSERT(pHdr->delimiter == TSDB_FILE_DLMT); - // ASSERT(pHdr->suid == pBlockIdx->suid); - // ASSERT(pHdr->uid == pBlockIdx->uid); - // p += sizeof(*pHdr); - - // if (!taosCheckChecksumWhole(p, pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM))) { - // code = TSDB_CODE_FILE_CORRUPTED; - // goto _err; - // } - // p += (pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM)); - - // for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) { - // tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol); - - // ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE); - - // if (pBlockCol->flag == HAS_NULL) continue; - - // if (!taosCheckChecksumWhole(p, pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM))) { - // code = TSDB_CODE_FILE_CORRUPTED; - // goto _err; - // } - // p = p + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); - // } - // recover pBlockData->nRow = pSubBlock->nRow; p = *ppBuf1 + sizeof(SBlockDataHdr); @@ -964,7 +973,7 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p SBlockData *pBlockData2 = &(SBlockData){0}; for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData, ppBuf1, ppBuf2); + code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2); if (code) { tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData2);