diff --git a/include/util/tcompression.h b/include/util/tcompression.h index fd01bcf1a3..8d7c607e6b 100644 --- a/include/util/tcompression.h +++ b/include/util/tcompression.h @@ -131,7 +131,7 @@ typedef struct SCompressor SCompressor; int32_t tCompressorCreate(SCompressor **ppCmprsor); int32_t tCompressorDestroy(SCompressor *pCmprsor); int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); -int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int64_t *nData); +int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData); int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tsdb/tsdbDiskData.c b/source/dnode/vnode/src/tsdb/tsdbDiskData.c index 3904012988..a5104dcfbd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDiskData.c +++ b/source/dnode/vnode/src/tsdb/tsdbDiskData.c @@ -349,6 +349,7 @@ struct SDiskData { SCompressor *pKeyC; int32_t nDiskCol; SArray *aDiskCol; + uint8_t *aBuf[2]; }; int32_t tDiskDataInit(SDiskData *pDiskData, STSchema *pTSchema, TABLEID *pId, uint8_t cmprAlg) { @@ -425,6 +426,9 @@ int32_t tDiskDataDestroy(SDiskData *pDiskData) { } taosArrayDestroy(pDiskData->aDiskCol); } + for (int32_t iBuf = 0; iBuf < sizeof(pDiskData->aBuf) / sizeof(pDiskData->aBuf[0]); iBuf++) { + tFree(pDiskData->aBuf[iBuf]); + } return code; } @@ -435,8 +439,17 @@ int32_t tDiskDataAddRow(SDiskData *pDiskData, TSDBROW *pRow, STSchema *pTSchema, ASSERT(pId->suid == pDiskData->suid); // uid - code = tCompress(pDiskData->pUidC, &pId->uid, sizeof(int64_t)); - if (code) goto _exit; + if (pDiskData->uid && pDiskData->uid != pId->uid) { + for (int32_t iRow = 0; iRow < pDiskData->nRow; iRow++) { + code = tCompress(pDiskData->pUidC, &pDiskData->uid, sizeof(int64_t)); + if (code) goto _exit; + } + pDiskData->uid = 0; + } + if (pDiskData->uid == 0) { + code = tCompress(pDiskData->pUidC, &pId->uid, sizeof(int64_t)); + if (code) goto _exit; + } // version int64_t version = TSDBROW_VERSION(pRow); @@ -474,3 +487,87 @@ int32_t tDiskDataAddRow(SDiskData *pDiskData, TSDBROW *pRow, STSchema *pTSchema, _exit: return code; } + +int32_t tDiskDataToBinary(SDiskData *pDiskData, const uint8_t **ppData, int32_t *nData) { + int32_t code = 0; + + ASSERT(pDiskData->nRow); + + SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, + .fmtVer = 0, + .suid = pDiskData->suid, + .uid = pDiskData->uid, + .szUid = 0, + .szVer = 0, + .szKey = 0, + .szBlkCol = 0, + .nRow = pDiskData->nRow, + .cmprAlg = pDiskData->cmprAlg}; + + // UID + const uint8_t *pUid = NULL; + if (pDiskData->uid == 0) { + code = tCompGen(pDiskData->pUidC, &pUid, &hdr.szUid); + if (code) return code; + } + + // VERSION + const uint8_t *pVer = NULL; + code = tCompGen(pDiskData->pVerC, &pVer, &hdr.szVer); + if (code) return code; + + // TSKEY + const uint8_t *pKey = NULL; + code = tCompGen(pDiskData->pKeyC, &pKey, &hdr.szKey); + if (code) return code; + + int32_t offset = 0; + for (int32_t iDiskCol = 0; iDiskCol < pDiskData->nDiskCol; iDiskCol++) { + SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol); + + if (pDiskCol->flag == HAS_NONE) continue; + + // code = tDiskColToBinary(pDiskCol, ); + // if (code) return code; + + SBlockCol bCol = {.cid = pDiskCol->cid, + .type = pDiskCol->type, + // .smaOn = , + .flag = pDiskCol->flag, + // .szOrigin = + // .szBitmap = + // .szOffset = + // .szValue = + .offset = offset}; + + hdr.szBlkCol += tPutBlockCol(NULL, &bCol); + offset = offset + bCol.szBitmap + bCol.szOffset + bCol.szValue; + } + + *nData = tPutDiskDataHdr(NULL, &hdr) + hdr.szUid + hdr.szVer + hdr.szKey + hdr.szBlkCol + offset; + code = tRealloc(&pDiskData->aBuf[0], *nData); + if (code) return code; + *ppData = pDiskData->aBuf[0]; + + int32_t n = 0; + n += tPutDiskDataHdr(pDiskData->aBuf[0] + n, &hdr); + if (hdr.szUid) { + memcpy(pDiskData->aBuf[0] + n, pUid, hdr.szUid); + n += hdr.szUid; + } + memcpy(pDiskData->aBuf[0] + n, pVer, hdr.szVer); + n += hdr.szVer; + memcpy(pDiskData->aBuf[0] + n, pKey, hdr.szKey); + n += hdr.szKey; + for (int32_t iDiskCol = 0; iDiskCol < pDiskData->nDiskCol; iDiskCol++) { + SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol); + n += tPutBlockCol(pDiskData->aBuf[0] + n, NULL /*pDiskCol->bCol (todo) */); + } + for (int32_t iDiskCol = 0; iDiskCol < pDiskData->nDiskCol; iDiskCol++) { + SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol); + // memcpy(pDiskData->aBuf[0] + n, NULL, ); + // n += 0; + } + + return code; +} diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 5d6e6d12ac..cd9ff5e325 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -1574,7 +1574,7 @@ int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { return code; } -int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int64_t *nData) { +int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData) { int32_t code = 0; if (pCmprsor->nVal == 0) {