diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 0c706c3b6b..9e02453ccf 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -65,6 +65,7 @@ typedef struct SBlockInfo SBlockInfo; typedef struct SSmaInfo SSmaInfo; typedef struct SBlockCol SBlockCol; +#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F) #define TSDB_MAX_SUBBLOCKS 8 #define TSDB_FHDR_SIZE 512 @@ -150,12 +151,7 @@ SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx); void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData); int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest); int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlockData *pBlockData); - -#if 1 -int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData); -int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData); -int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData); -#endif +int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData); // SDiskDataHdr int32_t tPutDiskDataHdr(uint8_t *p, void *ph); int32_t tGetDiskDataHdr(uint8_t *p, void *ph); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 55ef576916..dc9cf19593 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -15,8 +15,6 @@ #include "tsdb.h" -#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F) - // SDelFWriter ==================================================== int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb) { int32_t code = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 744f16b113..ce970ea870 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -94,6 +94,8 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { if (pReader->pBlockIdx && pReader->pBlockL) { TABLEID id = {.suid = pReader->pBlockL->suid, .uid = pReader->pBlockL->minUid}; + ASSERT(0); + // if (tTABLEIDCmprFn(pReader->pBlockIdx, &minId) < 0) { // // TODO // } else if (tTABLEIDCmprFn(pReader->pBlockIdx, &maxId) < 0) { diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 70d723cb3d..740da45393 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1500,50 +1500,114 @@ void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColD *ppColData = NULL; } -int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData) { - int32_t n = 0; +int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, int32_t *szOut) { + int32_t code = 0; - n += tPutI32v(p ? p + n : p, pBlockData->nRow); - if (p) { - memcpy(p + n, pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow); - } - n = n + sizeof(int64_t) * pBlockData->nRow; - if (p) { - memcpy(p + n, pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow); - } - n = n + sizeof(TSKEY) * pBlockData->nRow; + SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT, + .fmtVer = 0, + .suid = pBlockData->suid, + .uid = pBlockData->uid, + .nRow = pBlockData->nRow, + .cmprAlg = cmprAlg}; - int32_t nCol = taosArrayGetSize(pBlockData->aIdx); - n += tPutI32v(p ? p + n : p, nCol); - for (int32_t iCol = 0; iCol < nCol; iCol++) { - SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iCol); - n += tPutColData(p ? p + n : p, pColData); + uint8_t *pBuf1 = NULL; + uint8_t *pBuf2 = NULL; + uint8_t *pBuf3 = NULL; + uint8_t *pBuf4 = NULL; + + // encode ================= + // columns AND SBlockCol + int32_t nBuf1 = 0; + for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { + SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); + + ASSERT(pColData->flag); + + if (pColData->flag == HAS_NONE) continue; + + SBlockCol blockCol = {.cid = pColData->cid, + .type = pColData->type, + .smaOn = pColData->smaOn, + .flag = pColData->flag, + .szOrigin = pColData->nData}; + + if (pColData->flag != HAS_NULL) { + code = tsdbCmprColData(pColData, cmprAlg, &blockCol, &pBuf1, nBuf1, &pBuf3); + if (code) goto _exit; + + blockCol.offset = nBuf1; + nBuf1 = nBuf1 + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue + sizeof(TSCKSUM); + } + + code = tRealloc(&pBuf2, hdr.szBlkCol + tPutBlockCol(NULL, &blockCol)); + if (code) goto _exit; + hdr.szBlkCol += tPutBlockCol(pBuf2 + hdr.szBlkCol, &blockCol); } - return n; + int32_t nBuf2 = 0; + if (hdr.szBlkCol > 0) { + nBuf2 = hdr.szBlkCol + sizeof(TSCKSUM); + + code = tRealloc(&pBuf2, nBuf2); + if (code) goto _exit; + + taosCalcChecksumAppend(0, pBuf2, nBuf2); + } + + // uid + version + tskey + int32_t nBuf3 = 0; + if (pBlockData->uid == 0) { + code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, cmprAlg, + &pBuf3, nBuf3, &hdr.szUid, &pBuf4); + if (code) goto _exit; + } + nBuf3 += hdr.szUid; + + code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, + cmprAlg, &pBuf3, nBuf3, &hdr.szVer, &pBuf4); + if (code) goto _exit; + nBuf3 += hdr.szVer; + + code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP, + cmprAlg, &pBuf3, nBuf3, &hdr.szKey, &pBuf4); + if (code) goto _exit; + nBuf3 += hdr.szKey; + + nBuf3 += sizeof(TSCKSUM); + code = tRealloc(&pBuf3, nBuf3); + if (code) goto _exit; + + // hdr + int32_t nBuf4 = tPutDiskDataHdr(NULL, &hdr); + code = tRealloc(&pBuf4, nBuf4); + if (code) goto _exit; + tPutDiskDataHdr(pBuf4, &hdr); + taosCalcChecksumAppend(taosCalcChecksum(0, pBuf4, nBuf4), pBuf3, nBuf3); + + // aggragate + if (ppOut) { + *szOut = nBuf1 + nBuf2 + nBuf3 + nBuf4; + code = tRealloc(ppOut, *szOut); + if (code) goto _exit; + + memcpy(*ppOut, pBuf4, nBuf4); + memcpy(*ppOut + nBuf4, pBuf3, nBuf3); + if (nBuf2) { + memcpy(*ppOut + nBuf4 + nBuf3, pBuf2, nBuf2); + } + if (nBuf1) { + memcpy(*ppOut + nBuf4 + nBuf3 + nBuf2, pBuf1, nBuf1); + } + } + +_exit: + return code; } -int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData) { - int32_t n = 0; - - tBlockDataReset(pBlockData); - - n += tGetI32v(p + n, &pBlockData->nRow); - pBlockData->aVersion = (int64_t *)(p + n); - n = n + sizeof(int64_t) * pBlockData->nRow; - pBlockData->aTSKEY = (TSKEY *)(p + n); - n = n + sizeof(TSKEY) * pBlockData->nRow; - - int32_t nCol; - n += tGetI32v(p + n, &nCol); - for (int32_t iCol = 0; iCol < nCol; iCol++) { - SColData *pColData; - - if (tBlockDataAddColData(pBlockData, iCol, &pColData)) return -1; - n += tGetColData(p + n, pColData); - } - - return n; +int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData) { + int32_t code = 0; + // TODO + return code; } // SDiskDataHdr ==============================