diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 90fdd60188..f0d93039eb 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -204,7 +204,7 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf); int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf); int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx); int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2, - SBlockIdx *pBlockIdx, SBlock *pBlock); + SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg); int32_t tsdbWriteBlockSMA(SDataFWriter *pWriter, SBlockSMA *pBlockSMA, int64_t *rOffset, int64_t *rSize); SDFileSet *tsdbDataFWriterGetWSet(SDataFWriter *pWriter); @@ -213,8 +213,10 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS int32_t tsdbDataFReaderClose(SDataFReader **ppReader); int32_t tsdbReadBlockIdx(SDataFReader *pReader, SMapData *pMapData, uint8_t **ppBuf); int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf); +int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol, + SBlockData *pBlockData); int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData, - int16_t *aColId, int32_t nCol, uint8_t **ppBuf1, uint8_t **ppBuf2); + uint8_t **ppBuf1, uint8_t **ppBuf2); int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA); // SDelFWriter int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb); @@ -365,6 +367,8 @@ typedef struct { } SBlockCol; typedef struct { + int64_t nRow; + int8_t cmprAlg; int64_t offset; int64_t ksize; int64_t bsize; @@ -379,7 +383,6 @@ struct SBlock { int32_t nRow; int8_t last; int8_t hasDup; - int8_t cmprAlg; int8_t nSubBlock; SSubBlock aSubBlock[TSDB_MAX_SUBBLOCKS]; }; @@ -404,6 +407,7 @@ struct SColData { int32_t *aOffset; int32_t nData; uint8_t *pData; + uint8_t *pBuf; }; struct SBlockData { diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index d9fae3ea64..43e486cbdf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -722,8 +722,7 @@ static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, STbData *pTbData) { row = tBlockDataLastRow(pBlockData); if (tsdbKeyCmprFn(&pBlock->maxKey, &TSDBROW_KEY(&row)) < 0) pBlock->maxKey = TSDBROW_KEY(&row); pBlock->last = pBlockData->nRow < pCommitter->minRow ? 1 : 0; - pBlock->cmprAlg = pCommitter->cmprAlg; - code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock); + code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg); if (code) goto _err; // Design SMA and write SMA to file @@ -760,17 +759,10 @@ _err: static int32_t tsdbCommitDiskData(SCommitter *pCommitter, SBlockIdx *oBlockIdx) { int32_t code = 0; SMapData *mBlockO = &pCommitter->oBlockMap; - SMapData *mBlockN = &pCommitter->nBlockMap; SBlock *pBlockO = &pCommitter->oBlock; + SMapData *mBlockN = &pCommitter->nBlockMap; SBlock *pBlockN = &pCommitter->nBlock; - SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = oBlockIdx->suid, - .uid = oBlockIdx->uid, - .maxKey = oBlockIdx->maxKey, - .minKey = oBlockIdx->minKey, - .minVersion = oBlockIdx->minVersion, - .maxVersion = oBlockIdx->maxVersion, - .offset = -1, - .size = -1}; + SBlockIdx *pBlockIdx = &(SBlockIdx){0}; SBlockData *pBlockDataO = &pCommitter->oBlockData; // read @@ -784,13 +776,12 @@ static int32_t tsdbCommitDiskData(SCommitter *pCommitter, SBlockIdx *oBlockIdx) if (pBlockO->last) { ASSERT(iBlock == mBlockO->nItem - 1); - code = tsdbReadBlockData(pCommitter->pReader, oBlockIdx, pBlockO, pBlockDataO, NULL, -1, NULL, NULL); + code = tsdbReadBlockData(pCommitter->pReader, oBlockIdx, pBlockO, pBlockDataO, NULL, NULL); if (code) goto _err; tBlockReset(pBlockN); pBlockN->last = 1; - pBlockN->cmprAlg = pBlockO->cmprAlg; - code = tsdbWriteBlockData(pCommitter->pWriter, pBlockDataO, NULL, NULL, pBlockIdx, pBlockN); + code = tsdbWriteBlockData(pCommitter->pWriter, pBlockDataO, NULL, NULL, pBlockIdx, pBlockN, pCommitter->cmprAlg); if (code) goto _err; code = tMapDataPutItem(mBlockN, pBlockN, tPutBlock); @@ -802,6 +793,7 @@ static int32_t tsdbCommitDiskData(SCommitter *pCommitter, SBlockIdx *oBlockIdx) } // SBlock + *pBlockIdx = *oBlockIdx; code = tsdbWriteBlock(pCommitter->pWriter, mBlockN, NULL, pBlockIdx); if (code) goto _err; @@ -812,7 +804,7 @@ static int32_t tsdbCommitDiskData(SCommitter *pCommitter, SBlockIdx *oBlockIdx) return code; _err: - tsdbError("vgId:%d tsdb Commit disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d tsdb commit disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 782a80d84c..b8bef3211b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -591,13 +591,112 @@ _err: return code; } -int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData, - int16_t *aColId, int32_t nCol, uint8_t **ppBuf1, uint8_t **ppBuf2) { - int32_t code = 0; +int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol, + SBlockData *pBlockData) { + int32_t code = 0; + TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; // TODO return code; } +int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData, + uint8_t **ppBuf1, uint8_t **ppBuf2) { + int32_t code = 0; + TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; + uint8_t *pBuf1 = NULL; + uint8_t *pBuf2 = NULL; + SBlockCol *pBlockCol = &(SBlockCol){}; + + if (!ppBuf1) ppBuf1 = &pBuf1; + if (!ppBuf2) ppBuf2 = &pBuf2; + + for (int32_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { + SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock]; + uint8_t *p; + int64_t n; + + // realloc + code = tsdbRealloc(ppBuf1, pSubBlock->bsize); + if (code) goto _err; + + // seek + n = taosLSeekFile(pFD, pSubBlock->offset, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // read + n = taosReadFile(pFD, *ppBuf1, pSubBlock->bsize); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < pSubBlock->bsize) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + // check + p = *ppBuf1; + SBlockDataHdr *pHdr = (SBlockDataHdr *)p; + ASSERT(pHdr->delimiter == TSDB_FILE_DLMT); + ASSERT(pHdr->suid == pBlockIdx->suid); + ASSERT(pHdr->uid == pBlockIdx->uid); + p += sizeof(*pHdr); + + if (!taosCheckChecksumWhole(p, pSubBlock->ksize)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + p += pSubBlock->ksize; + + for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) { + tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol); + + ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE); + + if (pBlockCol->flag == HAS_NULL) continue; + + if (!taosCheckChecksumWhole(p, pBlockCol->size)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + p += pBlockCol->size; + } + + // recover + pBlockData->nRow = pSubBlock->nRow; + p = *ppBuf1 + sizeof(*pHdr); + + code = tsdbRealloc((uint8_t **)&pBlockData->aVersion, pBlockData->nRow * sizeof(int64_t)); + if (code) goto _err; + code = tsdbRealloc((uint8_t **)&pBlockData->aTSKEY, pBlockData->nRow * sizeof(TSKEY)); + if (code) goto _err; + p += pSubBlock->ksize; + + for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) { + tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol); + + if (pBlockCol->flag == HAS_NONE) { + // All NULL value + } else { + // decompress + p += pBlockCol->size; + } + } + } + + if (pBuf1) tsdbFree(pBuf1); + if (pBuf2) tsdbFree(pBuf2); + return code; + +_err: + tsdbError("vgId:%d tsdb read block data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + if (pBuf1) tsdbFree(pBuf1); + if (pBuf2) tsdbFree(pBuf2); + return code; +} + int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA) { int32_t code = 0; // TODO @@ -1000,7 +1099,7 @@ _err: } int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2, - SBlockIdx *pBlockIdx, SBlock *pBlock) { + SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg) { int32_t code = 0; SSubBlock *pSubBlock = &pBlock->aSubBlock[pBlock->nSubBlock++]; SBlockCol *pBlockCol = &(SBlockCol){0}; @@ -1017,6 +1116,8 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ if (!ppBuf1) ppBuf1 = &pBuf1; if (!ppBuf2) ppBuf2 = &pBuf2; + pSubBlock->nRow = pBlockData->nRow; + pSubBlock->cmprAlg = cmprAlg; if (pBlock->last) { pSubBlock->offset = pWriter->wSet.fLast.size; } else { @@ -1034,7 +1135,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ // TSDBKEY pSubBlock->ksize = 0; - if (pBlock->cmprAlg == NO_COMPRESSION) { + if (cmprAlg == NO_COMPRESSION) { // TSKEY size = sizeof(TSKEY) * pBlockData->nRow; n = taosWriteFile(pFileFD, pBlockData->aTSKEY, size); @@ -1064,21 +1165,21 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ } pSubBlock->ksize += size; } else { - ASSERT(pBlock->cmprAlg == ONE_STAGE_COMP || pBlock->cmprAlg == TWO_STAGE_COMP); + ASSERT(cmprAlg == ONE_STAGE_COMP || cmprAlg == TWO_STAGE_COMP); size = (sizeof(TSKEY) + sizeof(int64_t)) * pBlockData->nRow + COMP_OVERFLOW_BYTES * 2 + sizeof(TSCKSUM); code = tsdbRealloc(ppBuf1, size); if (code) goto _err; - if (pBlock->cmprAlg == TWO_STAGE_COMP) { + if (cmprAlg == TWO_STAGE_COMP) { code = tsdbRealloc(ppBuf2, size); if (code) goto _err; } // TSKEY n = tsCompressTimestamp((char *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, pBlockData->nRow, *ppBuf1, - size, pBlock->cmprAlg, *ppBuf2, size); + size, cmprAlg, *ppBuf2, size); if (n <= 0) { code = TSDB_CODE_COMPRESS_ERROR; goto _err; @@ -1087,7 +1188,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ // version n = tsCompressBigint((char *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, pBlockData->nRow, - *ppBuf1 + pSubBlock->ksize, size - pSubBlock->ksize, pBlock->cmprAlg, *ppBuf2, size); + *ppBuf1 + pSubBlock->ksize, size - pSubBlock->ksize, cmprAlg, *ppBuf2, size); if (n <= 0) { code = TSDB_CODE_COMPRESS_ERROR; goto _err; @@ -1141,7 +1242,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ } // data - if (pBlock->cmprAlg == NO_COMPRESSION) { + if (cmprAlg == NO_COMPRESSION) { // data n = taosWriteFile(pFileFD, pColData->pData, pColData->nData); if (n < 0) { @@ -1164,14 +1265,14 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ code = tsdbRealloc(ppBuf1, size); if (code) goto _err; - if (pBlock->cmprAlg == TWO_STAGE_COMP) { + if (cmprAlg == TWO_STAGE_COMP) { code = tsdbRealloc(ppBuf2, size); if (code) goto _err; } // data n = tDataTypes[pColData->type].compFunc(pColData->pData, pColData->nData, pBlockData->nRow, *ppBuf1, size, - pBlock->cmprAlg, *ppBuf2, size); + cmprAlg, *ppBuf2, size); if (n <= 0) { code = TSDB_CODE_COMPRESS_ERROR; goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 73df7ae80f..9ed0cd37e0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -118,43 +118,43 @@ _exit: int32_t tPutMapData(uint8_t *p, SMapData *pMapData) { int32_t n = 0; - int32_t maxOffset; ASSERT(pMapData->flag == TSDB_OFFSET_I32); - ASSERT(pMapData->nItem > 0); - - maxOffset = tMapDataGetOffset(pMapData, pMapData->nItem - 1); n += tPutI32v(p ? p + n : p, pMapData->nItem); - if (maxOffset <= INT8_MAX) { - n += tPutU8(p ? p + n : p, TSDB_OFFSET_I8); - if (p) { - for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { - n += tPutI8(p + n, (int8_t)tMapDataGetOffset(pMapData, iItem)); + if (pMapData->nItem) { + int32_t maxOffset = tMapDataGetOffset(pMapData, pMapData->nItem - 1); + + if (maxOffset <= INT8_MAX) { + n += tPutU8(p ? p + n : p, TSDB_OFFSET_I8); + if (p) { + for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { + n += tPutI8(p + n, (int8_t)tMapDataGetOffset(pMapData, iItem)); + } + } else { + n = n + sizeof(int8_t) * pMapData->nItem; + } + } else if (maxOffset <= INT16_MAX) { + n += tPutU8(p ? p + n : p, TSDB_OFFSET_I16); + if (p) { + for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { + n += tPutI16(p + n, (int16_t)tMapDataGetOffset(pMapData, iItem)); + } + } else { + n = n + sizeof(int16_t) * pMapData->nItem; } } else { - n = n + sizeof(int8_t) * pMapData->nItem; - } - } else if (maxOffset <= INT16_MAX) { - n += tPutU8(p ? p + n : p, TSDB_OFFSET_I16); - if (p) { - for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { - n += tPutI16(p + n, (int16_t)tMapDataGetOffset(pMapData, iItem)); + n += tPutU8(p ? p + n : p, TSDB_OFFSET_I32); + if (p) { + for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { + n += tPutI32(p + n, tMapDataGetOffset(pMapData, iItem)); + } + } else { + n = n + sizeof(int32_t) * pMapData->nItem; } - } else { - n = n + sizeof(int16_t) * pMapData->nItem; - } - } else { - n += tPutU8(p ? p + n : p, TSDB_OFFSET_I32); - if (p) { - for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { - n += tPutI32(p + n, tMapDataGetOffset(pMapData, iItem)); - } - } else { - n = n + sizeof(int32_t) * pMapData->nItem; } + n += tPutBinary(p ? p + n : p, pMapData->pData, pMapData->nData); } - n += tPutBinary(p ? p + n : p, pMapData->pData, pMapData->nData); return n; } @@ -163,23 +163,25 @@ int32_t tGetMapData(uint8_t *p, SMapData *pMapData) { int32_t n = 0; n += tGetI32v(p + n, &pMapData->nItem); - n += tGetU8(p + n, &pMapData->flag); - pMapData->pOfst = p + n; - switch (pMapData->flag) { - case TSDB_OFFSET_I8: - n = n + sizeof(int8_t) * pMapData->nItem; - break; - case TSDB_OFFSET_I16: - n = n + sizeof(int16_t) * pMapData->nItem; - break; - case TSDB_OFFSET_I32: - n = n + sizeof(int32_t) * pMapData->nItem; - break; + if (pMapData->nItem) { + n += tGetU8(p + n, &pMapData->flag); + pMapData->pOfst = p + n; + switch (pMapData->flag) { + case TSDB_OFFSET_I8: + n = n + sizeof(int8_t) * pMapData->nItem; + break; + case TSDB_OFFSET_I16: + n = n + sizeof(int16_t) * pMapData->nItem; + break; + case TSDB_OFFSET_I32: + n = n + sizeof(int32_t) * pMapData->nItem; + break; - default: - ASSERT(0); + default: + ASSERT(0); + } + n += tGetBinary(p + n, &pMapData->pData, &pMapData->nData); } - n += tGetBinary(p + n, &pMapData->pData, &pMapData->nData); return n; } @@ -330,8 +332,9 @@ void tBlockReset(SBlock *pBlock) { pBlock->nRow = 0; pBlock->last = -1; pBlock->hasDup = 0; - pBlock->cmprAlg = -1; for (int8_t iSubBlock = 0; iSubBlock < TSDB_MAX_SUBBLOCKS; iSubBlock++) { + pBlock->aSubBlock[iSubBlock].nRow = 0; + pBlock->aSubBlock[iSubBlock].cmprAlg = -1; pBlock->aSubBlock[iSubBlock].offset = -1; pBlock->aSubBlock[iSubBlock].ksize = -1; pBlock->aSubBlock[iSubBlock].bsize = -1; @@ -357,9 +360,10 @@ int32_t tPutBlock(uint8_t *p, void *ph) { n += tPutI32v(p ? p + n : p, pBlock->nRow); n += tPutI8(p ? p + n : p, pBlock->last); n += tPutI8(p ? p + n : p, pBlock->hasDup); - n += tPutI8(p ? p + n : p, pBlock->cmprAlg); n += tPutI8(p ? p + n : p, pBlock->nSubBlock); for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { + n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].nRow); + n += tPutI8(p ? p + n : p, pBlock->aSubBlock[iSubBlock].cmprAlg); n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].offset); n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].ksize); n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].bsize); @@ -380,9 +384,10 @@ int32_t tGetBlock(uint8_t *p, void *ph) { n += tGetI32v(p + n, &pBlock->nRow); n += tGetI8(p + n, &pBlock->last); n += tGetI8(p + n, &pBlock->hasDup); - n += tGetI8(p + n, &pBlock->cmprAlg); n += tGetI8(p + n, &pBlock->nSubBlock); for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { + n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].nRow); + n += tGetI8(p + n, &pBlock->aSubBlock[iSubBlock].cmprAlg); n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].offset); n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].ksize); n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].bsize);