From d5c2d5941f4bf392225d18318b0115a3039cc557 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 5 Aug 2022 05:23:50 +0000 Subject: [PATCH] more work --- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 234 +++++++++++++++--- source/dnode/vnode/src/tsdb/tsdbUtil.c | 8 +- 2 files changed, 197 insertions(+), 45 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 2776ac559e..bc4656278c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -729,21 +729,6 @@ _err: return code; } -int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData, uint8_t **ppBuf1, - uint8_t **ppBuf2) { - int32_t code = 0; - // TODO - return code; -} - -int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, uint8_t **ppBuf1, - uint8_t **ppBuf2) { - int32_t code = 0; - ASSERT(0); - // TODO - return code; -} - static int32_t tsdbReadBlockDataKey(SBlockData *pBlockData, SSubBlock *pSubBlock, uint8_t *pBuf, uint8_t **ppBuf) { int32_t code = 0; int64_t size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM); @@ -892,20 +877,25 @@ _err: return code; } -static int32_t tsdbReadBlockCol(SSubBlock *pSubBlock, uint8_t *p, SArray *aBlockCol) { +static int32_t tsdbReadBlockCol(uint8_t *pBuf, int32_t szBlockCol, SBlockDataHdr *pHdr, SArray *aBlockCol) { int32_t code = 0; int32_t n = 0; SBlockCol blockCol; SBlockCol *pBlockCol = &blockCol; - if (!taosCheckChecksumWhole(p, pSubBlock->szBlockCol + sizeof(TSCKSUM))) { + // checksum + if (!taosCheckChecksumWhole(pBuf, szBlockCol + sizeof(TSCKSUM))) { code = TSDB_CODE_FILE_CORRUPTED; goto _err; } + // hdr + *pHdr = *(SBlockDataHdr *)pBuf; n += sizeof(SBlockDataHdr); - while (n < pSubBlock->szBlockCol) { - n += tGetBlockCol(p + n, pBlockCol); + + // aBlockCol + while (n < szBlockCol) { + n += tGetBlockCol(pBuf + n, pBlockCol); if (taosArrayPush(aBlockCol, pBlockCol) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -913,7 +903,7 @@ static int32_t tsdbReadBlockCol(SSubBlock *pSubBlock, uint8_t *p, SArray *aBlock } } - ASSERT(n == pSubBlock->szBlockCol); + ASSERT(n == szBlockCol); return code; @@ -921,10 +911,48 @@ _err: return code; } +static int32_t tsdbReadDataArray(uint8_t *pInput, int32_t szInput, int32_t nEle, int8_t type, int8_t cmprAlg, + uint8_t **ppOut, uint8_t **ppBuf) { + int32_t code = 0; + int32_t size; + + // size + if (IS_VAR_DATA_TYPE(type)) { + size = nEle; + } else { + size = tDataTypes[type].bytes * nEle; + } + + // alloc + code = tRealloc(ppOut, size); + if (code) goto _exit; + + // decode + if (cmprAlg == NO_COMPRESSION) { + ASSERT(szInput == size); + memcpy(*ppOut, pInput, size); + } else { + if (cmprAlg == TWO_STAGE_COMP) { + code = tRealloc(ppBuf, size + COMP_OVERFLOW_BYTES); + if (code) goto _exit; + + int32_t n = + tDataTypes[type].decompFunc(pInput, szInput, nEle, *ppOut, size, cmprAlg, *ppBuf, size + COMP_OVERFLOW_BYTES); + if (n <= 0) { + code = TSDB_CODE_COMPRESS_ERROR; + goto _exit; + } + } + } + +_exit: + return code; +} + static int32_t tsdbReadSubColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock, int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2) { - TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; + TdFilePtr pFD = pReader->pDataFD; SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock]; SArray *aBlockCol = NULL; int32_t code = 0; @@ -974,7 +1002,7 @@ static int32_t tsdbReadSubColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, S goto _err; } - code = tsdbReadBlockCol(pSubBlock, *ppBuf1, aBlockCol); + code = tsdbReadBlockCol(*ppBuf1, pSubBlock->szBlock, NULL /*todo*/, aBlockCol); if (code) goto _err; code = tsdbReadBlockDataKey(pBlockData, pSubBlock, *ppBuf1 + pSubBlock->szBlockCol + sizeof(TSCKSUM), ppBuf2); @@ -1093,13 +1121,13 @@ _err: return code; } -static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock, - SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2) { +static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlock *pBlock, int32_t iSubBlock, SBlockData *pBlockData, + uint8_t **ppBuf1, uint8_t **ppBuf2) { int32_t code = 0; uint8_t *p; int64_t size; int64_t n; - TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; + TdFilePtr pFD = pReader->pDataFD; SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock]; SArray *aBlockCol = NULL; @@ -1175,20 +1203,18 @@ _err: return code; } -int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData, - uint8_t **ppBuf1, uint8_t **ppBuf2) { - int32_t code = 0; - TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; - uint8_t *pBuf1 = NULL; - uint8_t *pBuf2 = NULL; - int32_t iSubBlock; +int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData, uint8_t **ppBuf1, + uint8_t **ppBuf2) { + int32_t code = 0; + uint8_t *pBuf1 = NULL; + uint8_t *pBuf2 = NULL; if (!ppBuf1) ppBuf1 = &pBuf1; if (!ppBuf2) ppBuf2 = &pBuf2; // read the first sub-block - iSubBlock = 0; - code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData, ppBuf1, ppBuf2); + int32_t iSubBlock = 0; + code = tsdbReadSubBlockData(pReader, pBlock, iSubBlock, pBlockData, ppBuf1, ppBuf2); if (code) goto _err; // read remain block data and do merg @@ -1199,7 +1225,7 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p tBlockDataInit(pBlockData1); tBlockDataInit(pBlockData2); for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2); + code = tsdbReadSubBlockData(pReader, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2); if (code) { tBlockDataClear(pBlockData1, 1); tBlockDataClear(pBlockData2, 1); @@ -1230,14 +1256,138 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p ASSERT(tsdbKeyCmprFn(&pBlock->minKey, &TSDBROW_KEY(&tBlockDataFirstRow(pBlockData))) == 0); ASSERT(tsdbKeyCmprFn(&pBlock->maxKey, &TSDBROW_KEY(&tBlockDataLastRow(pBlockData))) == 0); - if (pBuf1) tFree(pBuf1); - if (pBuf2) tFree(pBuf2); + tFree(pBuf1); + tFree(pBuf2); return code; _err: tsdbError("vgId:%d, tsdb read block data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); - if (pBuf1) tFree(pBuf1); - if (pBuf2) tFree(pBuf2); + tFree(pBuf1); + tFree(pBuf2); + return code; +} + +int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, uint8_t **ppBuf1, + uint8_t **ppBuf2) { + int32_t code = 0; + + tBlockDataReset(pBlockData); + + uint8_t *pBuf1 = NULL; + uint8_t *pBuf2 = NULL; + if (!ppBuf1) ppBuf1 = &pBuf1; + if (!ppBuf2) ppBuf2 = &pBuf2; + + // realloc + code = tRealloc(ppBuf1, pBlockL->szBlock); + if (code) goto _err; + + // seek + int64_t n = taosLSeekFile(pReader->pLastFD, pBlockL->offset, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // read + n = taosReadFile(pReader->pLastFD, *ppBuf1, pBlockL->szBlock); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < pBlockL->szBlock) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + // decode block col + SBlockDataHdr hdr; + SArray *aBlockCol = taosArrayInit(0, sizeof(SBlockCol)); + uint8_t *p = *ppBuf1; + if (aBlockCol == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + code = tsdbReadBlockCol(p, pBlockL->szBlockCol, &hdr, aBlockCol); + if (code) goto _err; + p += pBlockL->szBlockCol + sizeof(TSCKSUM); + + // checksum + if (!taosCheckChecksumWhole(p, pBlockL->szUid + pBlockL->szVer + pBlockL->szTSKEY + sizeof(TSCKSUM))) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + // UID + if (hdr.uid == 0) { + code = tsdbReadDataArray(p, pBlockL->szUid, pBlockL->nRow, TSDB_DATA_TYPE_BIGINT, pBlockL->cmprAlg, + (uint8_t **)&pBlockData->aUid, ppBuf2); + if (code) goto _err; + } else { + ASSERT(pBlockL->szUid == 0); + } + p += pBlockL->szUid; + + // VERSION + code = tsdbReadDataArray(p, pBlockL->szVer, pBlockL->nRow, TSDB_DATA_TYPE_BIGINT, pBlockL->cmprAlg, + (uint8_t **)&pBlockData->aVersion, ppBuf2); + if (code) goto _err; + p += pBlockL->szVer; + + // TSKEY + code = tsdbReadDataArray(p, pBlockL->szTSKEY, pBlockL->nRow, TSDB_DATA_TYPE_TIMESTAMP, pBlockL->cmprAlg, + (uint8_t **)&pBlockData->aTSKEY, ppBuf2); + if (code) goto _err; + p += pBlockL->szTSKEY; + p += sizeof(TSCKSUM); + + // COLUMN + code = tBlockDataSetSchema(pBlockData, NULL, hdr.suid, hdr.uid); + if (code) goto _err; + + for (int32_t iBlockCol = 0; iBlockCol < taosArrayGetSize(aBlockCol); iBlockCol++) { + SBlockCol *pBlockCol = (SBlockCol *)taosArrayGet(aBlockCol, iBlockCol); + SColData *pColData; + + // checksum + if (!taosCheckChecksumWhole(p, pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM))) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + // add SColData + code = tBlockDataAddColData(pBlockData, iBlockCol, &pColData); + if (code) goto _err; + tColDataInit(pColData, pBlockCol->cid, pBlockCol->type, pBlockCol->smaOn); + pColData->nVal = pBlockL->nRow; + pColData->flag = pBlockCol->flag; + + // bitmap + if (pBlockCol->szBitmap) { + code = tsdbReadDataArray(p, pBlockCol->szBitmap, ); + if (code) goto _err; + } + p += pBlockCol->szBitmap; + + // offset + if (pBlockCol->szOffset) { + code = tsdbReadDataArray(p, pBlockCol->szOffset, ); + if (code) goto _err; + } + p += pBlockCol->szOffset; + + // value + pColData->nData = pBlockCol->szOrigin; + if (pColData->nData) { + // TODO + } + } + + taosArrayDestroy(aBlockCol); + + return code; + +_err: + tsdbError("vgId:%d tsdb read last block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); return code; } @@ -1832,10 +1982,12 @@ static int32_t tsdbWriteColData(SColData *pColData, SBlockCol *pBlockCol, int8_t // VALUE if (pColData->flag != (HAS_NULL | HAS_NONE)) { + ASSERT(pColData->nData); code = tsdbWriteDataArray(pColData->pData, pColData->nData, pColData->type, cmprAlg, &pBlockCol->szValue, ppBuf1, nBuf1 + n, ppBuf2); if (code) goto _err; } else { + ASSERT(pColData->nData == 0); pBlockCol->szValue = 0; } n += pBlockCol->szValue; @@ -2073,11 +2225,11 @@ int32_t tsdbWriteLastBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock pBlockL->minUid = pBlockData->aUid[0]; pBlockL->maxUid = pBlockData->aUid[pBlockData->nRow - 1]; } + pBlockL->minVer = VERSION_MAX; + pBlockL->maxVer = VERSION_MIN; pBlockL->nRow = pBlockData->nRow; pBlockL->offset = pWriter->fLast.size; pBlockL->cmprAlg = cmprAlg; - pBlockL->minVer = VERSION_MAX; - pBlockL->maxVer = VERSION_MIN; for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { pBlockL->minVer = TMIN(pBlockL->minVer, pBlockData->aVersion[iRow]); pBlockL->maxVer = TMAX(pBlockL->maxVer, pBlockData->aVersion[iRow]); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index c17a7f4a09..1ae241f0c1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -348,11 +348,11 @@ int32_t tPutBlockCol(uint8_t *p, void *ph) { n += tPutI8(p ? p + n : p, pBlockCol->flag); if (pBlockCol->flag != HAS_NULL) { - n += tPutI32v(p ? p + n : p, pBlockCol->offset); + n += tPutI32v(p ? p + n : p, pBlockCol->szOrigin); n += tPutI32v(p ? p + n : p, pBlockCol->szBitmap); n += tPutI32v(p ? p + n : p, pBlockCol->szOffset); n += tPutI32v(p ? p + n : p, pBlockCol->szValue); - n += tPutI32v(p ? p + n : p, pBlockCol->szOrigin); + n += tPutI32v(p ? p + n : p, pBlockCol->offset); } return n; @@ -370,11 +370,11 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) { ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE)); if (pBlockCol->flag != HAS_NULL) { - n += tGetI32v(p + n, &pBlockCol->offset); + n += tGetI32v(p + n, &pBlockCol->szOrigin); n += tGetI32v(p + n, &pBlockCol->szBitmap); n += tGetI32v(p + n, &pBlockCol->szOffset); n += tGetI32v(p + n, &pBlockCol->szValue); - n += tGetI32v(p + n, &pBlockCol->szOrigin); + n += tGetI32v(p + n, &pBlockCol->offset); } return n;