diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 4c7dbd0a45..32037cbf19 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -230,16 +230,12 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter); int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **ppBuf); int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx); int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL, uint8_t **ppBuf); -int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2, - SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg); - -int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo); - -/* new */ int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock *pBlock, uint8_t **ppBuf1, uint8_t **ppBuf2, int8_t cmprAlg); int32_t tsdbWriteLastBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockL *pBlockL, uint8_t **ppBuf1, uint8_t **ppBuf2, int8_t cmprAlg); + +int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo); // SDataFReader int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFReaderClose(SDataFReader **ppReader); @@ -251,8 +247,6 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2); int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg, uint8_t **ppBuf); - -/* new */ int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2); int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, uint8_t **ppBuf1, @@ -401,12 +395,12 @@ typedef struct { int16_t cid; int8_t type; int8_t smaOn; - int8_t flag; // HAS_NONE|HAS_NULL|HAS_VALUE - int32_t offset; + int8_t flag; // HAS_NONE|HAS_NULL|HAS_VALUE + int32_t szOrigin; // original column value size (only save for variant data type) int32_t szBitmap; // bitmap size int32_t szOffset; // size of offset, only for variant-length data type int32_t szValue; // compressed column value size - int32_t szOrigin; // original column value size (only save for variant data type) + int32_t offset; } SBlockCol; typedef struct { @@ -427,7 +421,6 @@ struct SBlock { int64_t minVersion; int64_t maxVersion; int32_t nRow; - int8_t last; int8_t hasDup; int8_t nSubBlock; SSubBlock aSubBlock[TSDB_MAX_SUBBLOCKS]; @@ -435,26 +428,18 @@ struct SBlock { struct SBlockL { int64_t suid; - struct { - int64_t uid; - int64_t version; - TSKEY ts; - } minKey; - struct { - int64_t uid; - int64_t version; - TSKEY ts; - } maxKey; + int64_t minUid; + int64_t maxUid; int64_t minVer; int64_t maxVer; int32_t nRow; - int8_t cmprAlg; int64_t offset; - int32_t szBlock; + int8_t cmprAlg; int32_t szBlockCol; int32_t szUid; int32_t szVer; int32_t szTSKEY; + int32_t szBlock; }; struct SColData { diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 73bac77e6d..2776ac559e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -1765,91 +1765,56 @@ static void tsdbUpdateBlockInfo(SBlockData *pBlockData, SBlock *pBlock) { pBlock->nRow += pBlockData->nRow; } -static int32_t tsdbWriteBlockDataKey(SSubBlock *pSubBlock, SBlockData *pBlockData, uint8_t **ppBuf1, int64_t *nDataP, - uint8_t **ppBuf2) { +static int32_t tsdbWriteDataArray(uint8_t *aData, int32_t nEle, int8_t type, int8_t cmprAlg, int32_t *rSize, + uint8_t **ppBuf1, int64_t nBuf1, uint8_t **ppBuf2) { int32_t code = 0; - int64_t size; - int64_t tsize; + int32_t size; - if (pSubBlock->cmprAlg == NO_COMPRESSION) { - pSubBlock->szVersion = sizeof(int64_t) * pSubBlock->nRow; - pSubBlock->szTSKEY = sizeof(TSKEY) * pSubBlock->nRow; - - code = tRealloc(ppBuf1, *nDataP + pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM)); - if (code) goto _err; - - // VERSION - memcpy(*ppBuf1 + *nDataP, pBlockData->aVersion, pSubBlock->szVersion); - - // TSKEY - memcpy(*ppBuf1 + *nDataP + pSubBlock->szVersion, pBlockData->aTSKEY, pSubBlock->szTSKEY); + if (IS_VAR_DATA_TYPE(type)) { + size = nEle; } else { - size = (sizeof(int64_t) + sizeof(TSKEY)) * pSubBlock->nRow + COMP_OVERFLOW_BYTES * 2; - - code = tRealloc(ppBuf1, *nDataP + size + sizeof(TSCKSUM)); - if (code) goto _err; - - tsize = sizeof(int64_t) * pSubBlock->nRow + COMP_OVERFLOW_BYTES; - if (pSubBlock->cmprAlg == TWO_STAGE_COMP) { - code = tRealloc(ppBuf2, tsize); - if (code) goto _err; - } - - // VERSION - pSubBlock->szVersion = - tsCompressBigint((char *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, pBlockData->nRow, - *ppBuf1 + *nDataP, size, pSubBlock->cmprAlg, *ppBuf2, tsize); - if (pSubBlock->szVersion <= 0) { - code = TSDB_CODE_COMPRESS_ERROR; - goto _err; - } - - // TSKEY - pSubBlock->szTSKEY = tsCompressTimestamp((char *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, - pBlockData->nRow, *ppBuf1 + *nDataP + pSubBlock->szVersion, - size - pSubBlock->szVersion, pSubBlock->cmprAlg, *ppBuf2, tsize); - if (pSubBlock->szTSKEY <= 0) { - code = TSDB_CODE_COMPRESS_ERROR; - goto _err; - } - - ASSERT(pSubBlock->szVersion + pSubBlock->szTSKEY <= size); + size = tDataTypes[type].bytes * nEle; } - // checksum - size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM); - taosCalcChecksumAppend(0, *ppBuf1 + *nDataP, size); + if (cmprAlg == NO_COMPRESSION) { + code = tRealloc(ppBuf1, nBuf1 + size); + if (code) goto _exit; - *nDataP += size; - return code; + memcpy(*ppBuf1 + nBuf1, aData, size); + *rSize = size; + } else { + code = tRealloc(ppBuf1, size + COMP_OVERFLOW_BYTES); + if (code) goto _exit; -_err: + if (cmprAlg == TWO_STAGE_COMP) { + code = tRealloc(ppBuf2, size + COMP_OVERFLOW_BYTES); + if (code) goto _exit; + } + + int32_t n = tDataTypes[type].compFunc(aData, tDataTypes[type].bytes * nEle, nEle, *ppBuf1 + nBuf1, + size + COMP_OVERFLOW_BYTES, cmprAlg, *ppBuf2, size + COMP_OVERFLOW_BYTES); + if (n <= 0) { + code = TSDB_CODE_COMPRESS_ERROR; + goto _exit; + } + *rSize = n; + } + +_exit: return code; } -static int32_t tsdbWriteColData(SColData *pColData, SBlockCol *pBlockCol, SSubBlock *pSubBlock, uint8_t **ppBuf1, - int64_t *nDataP, uint8_t **ppBuf2) { +static int32_t tsdbWriteColData(SColData *pColData, SBlockCol *pBlockCol, int8_t cmprAlg, uint8_t **ppBuf1, + int64_t nBuf1, uint8_t **ppBuf2) { int32_t code = 0; int64_t size; int64_t n = 0; // BITMAP if (pColData->flag != HAS_VALUE) { - size = BIT2_SIZE(pColData->nVal) + COMP_OVERFLOW_BYTES; - - code = tRealloc(ppBuf1, *nDataP + n + size); + code = tsdbWriteDataArray(pColData->pBitMap, BIT2_SIZE(pColData->nVal), TSDB_DATA_TYPE_TINYINT, cmprAlg, + &pBlockCol->szBitmap, ppBuf1, nBuf1 + n, ppBuf2); if (code) goto _err; - - code = tRealloc(ppBuf2, size); - if (code) goto _err; - - pBlockCol->szBitmap = - tsCompressTinyint((char *)pColData->pBitMap, BIT2_SIZE(pColData->nVal), BIT2_SIZE(pColData->nVal), - *ppBuf1 + *nDataP + n, size, TWO_STAGE_COMP, *ppBuf2, size); - if (pBlockCol->szBitmap <= 0) { - code = TSDB_CODE_COMPRESS_ERROR; - goto _err; - } } else { pBlockCol->szBitmap = 0; } @@ -1857,60 +1822,29 @@ static int32_t tsdbWriteColData(SColData *pColData, SBlockCol *pBlockCol, SSubBl // OFFSET if (IS_VAR_DATA_TYPE(pColData->type)) { - size = sizeof(int32_t) * pColData->nVal + COMP_OVERFLOW_BYTES; - - code = tRealloc(ppBuf1, *nDataP + n + size); + code = tsdbWriteDataArray((uint8_t *)pColData->aOffset, pColData->nVal, TSDB_DATA_TYPE_INT, cmprAlg, + &pBlockCol->szOffset, ppBuf1, nBuf1 + n, ppBuf2); if (code) goto _err; - - code = tRealloc(ppBuf2, size); - if (code) goto _err; - - pBlockCol->szOffset = tsCompressInt((char *)pColData->aOffset, sizeof(int32_t) * pColData->nVal, pColData->nVal, - *ppBuf1 + *nDataP + n, size, TWO_STAGE_COMP, *ppBuf2, size); - if (pBlockCol->szOffset <= 0) { - code = TSDB_CODE_COMPRESS_ERROR; - goto _err; - } } else { pBlockCol->szOffset = 0; } n += pBlockCol->szOffset; // VALUE - if (pSubBlock->cmprAlg == NO_COMPRESSION) { - pBlockCol->szValue = pColData->nData; - - code = tRealloc(ppBuf1, *nDataP + n + pBlockCol->szValue + sizeof(TSCKSUM)); + if (pColData->flag != (HAS_NULL | HAS_NONE)) { + code = tsdbWriteDataArray(pColData->pData, pColData->nData, pColData->type, cmprAlg, &pBlockCol->szValue, ppBuf1, + nBuf1 + n, ppBuf2); if (code) goto _err; - - memcpy(*ppBuf1 + *nDataP + n, pColData->pData, pBlockCol->szValue); } else { - size = pColData->nData + COMP_OVERFLOW_BYTES; - - code = tRealloc(ppBuf1, *nDataP + n + size + sizeof(TSCKSUM)); - if (code) goto _err; - - if (pSubBlock->cmprAlg == TWO_STAGE_COMP) { - code = tRealloc(ppBuf2, size); - if (code) goto _err; - } - - pBlockCol->szValue = - tDataTypes[pColData->type].compFunc((char *)pColData->pData, pColData->nData, pColData->nVal, - *ppBuf1 + *nDataP + n, size, pSubBlock->cmprAlg, *ppBuf2, size); - if (pBlockCol->szValue <= 0) { - code = TSDB_CODE_COMPRESS_ERROR; - goto _err; - } + pBlockCol->szValue = 0; } n += pBlockCol->szValue; - pBlockCol->szOrigin = pColData->nData; // checksum n += sizeof(TSCKSUM); - taosCalcChecksumAppend(0, *ppBuf1 + *nDataP, n); - - *nDataP += n; + code = tRealloc(ppBuf1, nBuf1 + n); + if (code) goto _err; + taosCalcChecksumAppend(0, *ppBuf1 + nBuf1, n); return code; @@ -1918,20 +1852,20 @@ _err: return code; } -static int32_t tsdbWriteBlockDataImpl(TdFilePtr pFD, SSubBlock *pSubBlock, SBlockDataHdr hdr, SArray *aBlockCol, - uint8_t *pData, int64_t nData, uint8_t **ppBuf) { +static int32_t tsdbWriteBlockDataImpl(TdFilePtr pFD, SBlockDataHdr hdr, SArray *aBlockCol, uint8_t *pData, + int64_t nData, uint8_t **ppBuf, int32_t *szBlockCol) { int32_t code = 0; int32_t nBlockCol = taosArrayGetSize(aBlockCol); int64_t size; int64_t n; // HDR + SArray - pSubBlock->szBlockCol = sizeof(hdr); + *szBlockCol = sizeof(hdr); for (int32_t iBlockCol = 0; iBlockCol < nBlockCol; iBlockCol++) { - pSubBlock->szBlockCol += tPutBlockCol(NULL, taosArrayGet(aBlockCol, iBlockCol)); + (*szBlockCol) += tPutBlockCol(NULL, taosArrayGet(aBlockCol, iBlockCol)); } - code = tRealloc(ppBuf, pSubBlock->szBlockCol + sizeof(TSCKSUM)); + code = tRealloc(ppBuf, *szBlockCol + sizeof(TSCKSUM)); if (code) goto _err; n = 0; @@ -1940,11 +1874,11 @@ static int32_t tsdbWriteBlockDataImpl(TdFilePtr pFD, SSubBlock *pSubBlock, SBloc for (int32_t iBlockCol = 0; iBlockCol < nBlockCol; iBlockCol++) { n += tPutBlockCol(*ppBuf + n, taosArrayGet(aBlockCol, iBlockCol)); } - taosCalcChecksumAppend(0, *ppBuf, pSubBlock->szBlockCol + sizeof(TSCKSUM)); + taosCalcChecksumAppend(0, *ppBuf, *szBlockCol + sizeof(TSCKSUM)); - ASSERT(n == pSubBlock->szBlockCol); + ASSERT(n == *szBlockCol); - n = taosWriteFile(pFD, *ppBuf, pSubBlock->szBlockCol + sizeof(TSCKSUM)); + n = taosWriteFile(pFD, *ppBuf, *szBlockCol + sizeof(TSCKSUM)); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -2007,42 +1941,49 @@ _err: return code; } -int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2, - SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg) { - int32_t code = 0; - SSubBlock *pSubBlock = &pBlock->aSubBlock[pBlock->nSubBlock++]; - SBlockCol blockCol; - SBlockCol *pBlockCol = &blockCol; - int64_t n; - TdFilePtr pFileFD = pBlock->last ? pWriter->pLastFD : pWriter->pDataFD; - SBlockDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockIdx->suid, .uid = pBlockIdx->uid}; - uint8_t *p; - int64_t nData; - uint8_t *pBuf1 = NULL; - uint8_t *pBuf2 = NULL; - SArray *aBlockCol = NULL; +int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock *pBlock, uint8_t **ppBuf1, + uint8_t **ppBuf2, int8_t cmprAlg) { + int32_t code = 0; + + ASSERT((pBlockData->suid && pBlockData->uid) || (!pBlockData->suid && pBlockData->uid)); + + uint8_t *pBuf1 = NULL; + uint8_t *pBuf2 = NULL; if (!ppBuf1) ppBuf1 = &pBuf1; if (!ppBuf2) ppBuf2 = &pBuf2; tsdbUpdateBlockInfo(pBlockData, pBlock); + SSubBlock *pSubBlock = &pBlock->aSubBlock[pBlock->nSubBlock++]; + pSubBlock->nRow = pBlockData->nRow; pSubBlock->cmprAlg = cmprAlg; - if (pBlock->last) { - pSubBlock->offset = pWriter->fLast.size; - } else { - pSubBlock->offset = pWriter->fData.size; - } + pSubBlock->offset = pWriter->fData.size; // ======================= BLOCK DATA ======================= - // TSDBKEY - nData = 0; - code = tsdbWriteBlockDataKey(pSubBlock, pBlockData, ppBuf1, &nData, ppBuf2); + int64_t nBuf1 = 0; + + // VERSION + code = tsdbWriteDataArray((uint8_t *)pBlockData->aVersion, pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, cmprAlg, + &pSubBlock->szVersion, ppBuf1, nBuf1, ppBuf2); if (code) goto _err; + nBuf1 += pSubBlock->szVersion; + + // TSKEY + code = tsdbWriteDataArray((uint8_t *)pBlockData->aTSKEY, pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg, + &pSubBlock->szTSKEY, ppBuf1, nBuf1, ppBuf2); + if (code) goto _err; + nBuf1 += pSubBlock->szTSKEY; + + // checksum + nBuf1 += sizeof(TSCKSUM); + code = tRealloc(ppBuf1, nBuf1); + if (code) goto _err; + taosCalcChecksumAppend(0, *ppBuf1, nBuf1); // COLUMNS - aBlockCol = taosArrayInit(taosArrayGetSize(pBlockData->aIdx), sizeof(SBlockCol)); + SArray *aBlockCol = taosArrayInit(taosArrayGetSize(pBlockData->aIdx), sizeof(SBlockCol)); if (aBlockCol == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -2050,46 +1991,46 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_ int32_t offset = 0; for (int32_t iCol = 0; iCol < taosArrayGetSize(pBlockData->aIdx); iCol++) { SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iCol); + SBlockCol blockCol = {0}; ASSERT(pColData->flag); if (pColData->flag == HAS_NONE) continue; - pBlockCol->cid = pColData->cid; - pBlockCol->type = pColData->type; - pBlockCol->smaOn = pColData->smaOn; - pBlockCol->flag = pColData->flag; + blockCol.cid = pColData->cid; + blockCol.type = pColData->type; + blockCol.smaOn = pColData->smaOn; + blockCol.flag = pColData->flag; + blockCol.szOrigin = pColData->nData; if (pColData->flag != HAS_NULL) { - code = tsdbWriteColData(pColData, pBlockCol, pSubBlock, ppBuf1, &nData, ppBuf2); + code = tsdbWriteColData(pColData, &blockCol, cmprAlg, ppBuf1, nBuf1, ppBuf2); if (code) goto _err; - pBlockCol->offset = offset; - offset = offset + pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue + sizeof(TSCKSUM); + blockCol.offset = offset; + offset = offset + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM); + nBuf1 = nBuf1 + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM); } - if (taosArrayPush(aBlockCol, pBlockCol) == NULL) { + if (taosArrayPush(aBlockCol, &blockCol) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } } // write - code = tsdbWriteBlockDataImpl(pFileFD, pSubBlock, hdr, aBlockCol, *ppBuf1, nData, ppBuf2); + SBlockDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockData->suid, .uid = pBlockData->uid}; + code = tsdbWriteBlockDataImpl(pWriter->pDataFD, hdr, aBlockCol, *ppBuf1, nBuf1, ppBuf2, &pSubBlock->szBlockCol); if (code) goto _err; - pSubBlock->szBlock = pSubBlock->szBlockCol + sizeof(TSCKSUM) + nData; - if (pBlock->last) { - pWriter->fLast.size += pSubBlock->szBlock; - } else { - pWriter->fData.size += pSubBlock->szBlock; - } + pSubBlock->szBlock = pSubBlock->szBlockCol + sizeof(TSCKSUM) + nBuf1; + pWriter->fData.size += pSubBlock->szBlock; // ======================= BLOCK SMA ======================= pSubBlock->sOffset = 0; pSubBlock->nSma = 0; - if (pBlock->nSubBlock > 1 || pBlock->last || pBlock->hasDup) goto _exit; + if (pBlock->nSubBlock > 1 || pBlock->hasDup) goto _exit; code = tsdbWriteBlockSma(pWriter->pSmaFD, pBlockData, pSubBlock, ppBuf1); if (code) goto _err; @@ -2113,17 +2054,113 @@ _err: return code; } -int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock *pBlock, uint8_t **ppBuf1, - uint8_t **ppBuf2, int8_t cmprAlg) { - int32_t code = 0; - // TODO - return code; -} - int32_t tsdbWriteLastBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockL *pBlockL, uint8_t **ppBuf1, uint8_t **ppBuf2, int8_t cmprAlg) { int32_t code = 0; - // TODO + + ASSERT((pBlockData->suid && !pBlockData->uid) || (!pBlockData->suid && pBlockData->uid)); + + uint8_t *pBuf1 = NULL; + uint8_t *pBuf2 = NULL; + + if (!ppBuf1) ppBuf1 = &pBuf1; + if (!ppBuf2) ppBuf2 = &pBuf2; + + pBlockL->suid = pBlockData->suid; + if (pBlockData->uid) { + pBlockL->maxUid = pBlockL->minUid = pBlockData->uid; + } else { + pBlockL->minUid = pBlockData->aUid[0]; + pBlockL->maxUid = pBlockData->aUid[pBlockData->nRow - 1]; + } + pBlockL->nRow = pBlockData->nRow; + pBlockL->offset = pWriter->fLast.size; + pBlockL->cmprAlg = cmprAlg; + pBlockL->minVer = VERSION_MAX; + pBlockL->maxVer = VERSION_MIN; + for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { + pBlockL->minVer = TMIN(pBlockL->minVer, pBlockData->aVersion[iRow]); + pBlockL->maxVer = TMAX(pBlockL->maxVer, pBlockData->aVersion[iRow]); + } + + // ======================= BLOCK DATA ======================= + int64_t nBuf1 = 0; + + // UID + if (pBlockData->uid == 0) { + code = tsdbWriteDataArray((uint8_t *)pBlockData->aUid, pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, cmprAlg, + &pBlockL->szUid, ppBuf1, nBuf1, ppBuf2); + if (code) goto _err; + } else { + pBlockL->szUid = 0; + } + nBuf1 += pBlockL->szUid; + + // VERSION + code = tsdbWriteDataArray((uint8_t *)pBlockData->aVersion, pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, cmprAlg, + &pBlockL->szVer, ppBuf1, nBuf1, ppBuf2); + if (code) goto _err; + nBuf1 += pBlockL->szVer; + + // TSKEY + code = tsdbWriteDataArray((uint8_t *)pBlockData->aTSKEY, pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg, + &pBlockL->szTSKEY, ppBuf1, nBuf1, ppBuf2); + if (code) goto _err; + nBuf1 += pBlockL->szTSKEY; + + // checksum + nBuf1 += sizeof(TSCKSUM); + code = tRealloc(ppBuf1, nBuf1); + if (code) goto _err; + taosCalcChecksumAppend(0, *ppBuf1, nBuf1); + + // COLUMNS + SArray *aBlockCol = taosArrayInit(taosArrayGetSize(pBlockData->aIdx), sizeof(SBlockCol)); + if (aBlockCol == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + int32_t offset = 0; + for (int32_t iCol = 0; iCol < taosArrayGetSize(pBlockData->aIdx); iCol++) { + SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iCol); + SBlockCol blockCol = {0}; + + ASSERT(pColData->flag); + + if (pColData->flag == HAS_NONE) continue; + + blockCol.cid = pColData->cid; + blockCol.type = pColData->type; + blockCol.smaOn = pColData->smaOn; + blockCol.flag = pColData->flag; + blockCol.szOrigin = pColData->nData; + + if (pColData->flag != HAS_NULL) { + code = tsdbWriteColData(pColData, &blockCol, cmprAlg, ppBuf1, nBuf1, ppBuf2); + if (code) goto _err; + + blockCol.offset = offset; + offset = offset + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM); + nBuf1 = nBuf1 + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM); + } + + if (taosArrayPush(aBlockCol, &blockCol) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + } + + // write + SBlockDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, .suid = pBlockData->suid, .uid = pBlockData->uid}; + code = tsdbWriteBlockDataImpl(pWriter->pLastFD, hdr, aBlockCol, *ppBuf1, nBuf1, ppBuf2, &pBlockL->szBlockCol); + if (code) goto _err; + + pBlockL->szBlock = pBlockL->szBlockCol + sizeof(TSCKSUM) + nBuf1; + pWriter->fLast.size += pBlockL->szBlock; + + return code; + +_err: return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 7beac23912..c17a7f4a09 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -230,7 +230,6 @@ int32_t tPutBlock(uint8_t *p, void *ph) { n += tPutI64v(p ? p + n : p, pBlock->minVersion); n += tPutI64v(p ? p + n : p, pBlock->maxVersion); n += tPutI32v(p ? p + n : p, pBlock->nRow); - n += tPutI8(p ? p + n : p, pBlock->last); n += tPutI8(p ? p + n : p, pBlock->hasDup); n += tPutI8(p ? p + n : p, pBlock->nSubBlock); for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { @@ -257,7 +256,6 @@ int32_t tGetBlock(uint8_t *p, void *ph) { n += tGetI64v(p + n, &pBlock->minVersion); n += tGetI64v(p + n, &pBlock->maxVersion); n += tGetI32v(p + n, &pBlock->nRow); - n += tGetI8(p + n, &pBlock->last); n += tGetI8(p + n, &pBlock->hasDup); n += tGetI8(p + n, &pBlock->nSubBlock); for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { @@ -290,7 +288,6 @@ int32_t tBlockCmprFn(const void *p1, const void *p2) { bool tBlockHasSma(SBlock *pBlock) { if (pBlock->nSubBlock > 1) return false; - if (pBlock->last) return false; if (pBlock->hasDup) return false; return pBlock->aSubBlock[0].nSma > 0; @@ -301,22 +298,18 @@ int32_t tPutBlockL(uint8_t *p, void *ph) { SBlockL *pBlockL = (SBlockL *)ph; n += tPutI64(p ? p + n : p, pBlockL->suid); - n += tPutI64(p ? p + n : p, pBlockL->minKey.uid); - n += tPutI64v(p ? p + n : p, pBlockL->minKey.version); - n += tPutI64(p ? p + n : p, pBlockL->minKey.ts); - n += tPutI64(p ? p + n : p, pBlockL->maxKey.uid); - n += tPutI64v(p ? p + n : p, pBlockL->maxKey.version); - n += tPutI64(p ? p + n : p, pBlockL->maxKey.ts); + n += tPutI64(p ? p + n : p, pBlockL->minUid); + n += tPutI64(p ? p + n : p, pBlockL->maxUid); n += tPutI64v(p ? p + n : p, pBlockL->minVer); n += tPutI64v(p ? p + n : p, pBlockL->maxVer); n += tPutI32v(p ? p + n : p, pBlockL->nRow); - n += tPutI8(p ? p + n : p, pBlockL->cmprAlg); n += tPutI64v(p ? p + n : p, pBlockL->offset); - n += tPutI32v(p ? p + n : p, pBlockL->szBlock); + n += tPutI8(p ? p + n : p, pBlockL->cmprAlg); n += tPutI32v(p ? p + n : p, pBlockL->szBlockCol); n += tPutI32v(p ? p + n : p, pBlockL->szUid); n += tPutI32v(p ? p + n : p, pBlockL->szVer); n += tPutI32v(p ? p + n : p, pBlockL->szTSKEY); + n += tPutI32v(p ? p + n : p, pBlockL->szBlock); return n; } @@ -326,22 +319,18 @@ int32_t tGetBlockL(uint8_t *p, void *ph) { SBlockL *pBlockL = (SBlockL *)ph; n += tGetI64(p + n, &pBlockL->suid); - n += tGetI64(p + n, &pBlockL->minKey.uid); - n += tGetI64v(p + n, &pBlockL->minKey.version); - n += tGetI64(p + n, &pBlockL->minKey.ts); - n += tGetI64(p + n, &pBlockL->maxKey.uid); - n += tGetI64v(p + n, &pBlockL->maxKey.version); - n += tGetI64(p + n, &pBlockL->maxKey.ts); + n += tGetI64(p + n, &pBlockL->minUid); + n += tGetI64(p + n, &pBlockL->maxUid); n += tGetI64v(p + n, &pBlockL->minVer); n += tGetI64v(p + n, &pBlockL->maxVer); n += tGetI32v(p + n, &pBlockL->nRow); - n += tGetI8(p + n, &pBlockL->cmprAlg); n += tGetI64v(p + n, &pBlockL->offset); - n += tGetI32v(p + n, &pBlockL->szBlock); + n += tGetI8(p + n, &pBlockL->cmprAlg); n += tGetI32v(p + n, &pBlockL->szBlockCol); n += tGetI32v(p + n, &pBlockL->szUid); n += tGetI32v(p + n, &pBlockL->szVer); n += tGetI32v(p + n, &pBlockL->szTSKEY); + n += tGetI32v(p + n, &pBlockL->szBlock); return n; }