diff --git a/source/common/src/ttypes.c b/source/common/src/ttypes.c index 156b66ae86..fee89e2f37 100644 --- a/source/common/src/ttypes.c +++ b/source/common/src/ttypes.c @@ -392,10 +392,10 @@ tDataTypeDescriptor tDataTypes[TSDB_DATA_TYPE_MAX] = { getStatics_i64}, {TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", 0, 0, tsCompressFloat, tsDecompressFloat, getStatics_f}, {TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", 0, 0, tsCompressDouble, tsDecompressDouble, getStatics_d}, - {TSDB_DATA_TYPE_VARCHAR, 6, 0, "VARCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_bin}, + {TSDB_DATA_TYPE_VARCHAR, 6, 1, "VARCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_bin}, {TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", INT64_MIN, INT64_MAX, tsCompressTimestamp, tsDecompressTimestamp, getStatics_i64}, - {TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr}, + {TSDB_DATA_TYPE_NCHAR, 5, 1, "NCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr}, {TSDB_DATA_TYPE_UTINYINT, 16, CHAR_BYTES, "TINYINT UNSIGNED", 0, UINT8_MAX, tsCompressTinyint, tsDecompressTinyint, getStatics_u8}, {TSDB_DATA_TYPE_USMALLINT, 17, SHORT_BYTES, "SMALLINT UNSIGNED", 0, UINT16_MAX, tsCompressSmallint, diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 32037cbf19..72296e4551 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -47,6 +47,7 @@ typedef struct SBlockL SBlockL; typedef struct SColData SColData; typedef struct SBlockDataHdr SBlockDataHdr; typedef struct SBlockData SBlockData; +typedef struct SDiskData SDiskData; typedef struct SDelFile SDelFile; typedef struct SHeadFile SHeadFile; typedef struct SDataFile SDataFile; @@ -149,6 +150,11 @@ SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx); void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData); int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData); int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData); +// SDiskData +int32_t tDiskDataInit(SDiskData *pDiskData); +void tDiskDataClear(SDiskData *pDiskData); +int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg); +int32_t tDiskToBlockData(SDiskData *pDiskData, SBlockData *pBlockData); // SDelIdx int32_t tPutDelIdx(uint8_t *p, void *ph); int32_t tGetDelIdx(uint8_t *p, void *ph); @@ -392,38 +398,35 @@ struct SMapData { }; typedef struct { - int16_t cid; - int8_t type; - int8_t smaOn; - int8_t flag; // HAS_NONE|HAS_NULL|HAS_VALUE - int32_t szOrigin; // original column value size (only save for variant data type) - int32_t szBitmap; // bitmap size - int32_t szOffset; // size of offset, only for variant-length data type - int32_t szValue; // compressed column value size - int32_t offset; + int16_t cid; + int8_t type; + int8_t smaOn; + int8_t flag; // HAS_NONE|HAS_NULL|HAS_VALUE + int32_t szOrigin; // original column value size (only save for variant data type) + int32_t szBitmap; // bitmap size + int32_t szOffset; // size of offset, only for variant-length data type + int32_t szValue; // compressed column value size + int32_t offset; + uint8_t **ppData; } SBlockCol; typedef struct { - int32_t nRow; - int8_t cmprAlg; - int64_t offset; // block data offset - int32_t szBlockCol; // SBlockCol size - int32_t szVersion; // VERSION size - int32_t szTSKEY; // TSKEY size - int32_t szBlock; // total block size - int64_t sOffset; // sma offset - int32_t nSma; // sma size + int64_t offset; // block data offset + int32_t szBlock; + int32_t szKey; } SSubBlock; struct SBlock { TSDBKEY minKey; TSDBKEY maxKey; - int64_t minVersion; - int64_t maxVersion; + int64_t minVer; + int64_t maxVer; int32_t nRow; int8_t hasDup; int8_t nSubBlock; SSubBlock aSubBlock[TSDB_MAX_SUBBLOCKS]; + int64_t sOffset; // sma offset + int32_t nSma; // sma size }; struct SBlockL { @@ -433,13 +436,6 @@ struct SBlockL { int64_t minVer; int64_t maxVer; int32_t nRow; - int64_t offset; - int8_t cmprAlg; - int32_t szBlockCol; - int32_t szUid; - int32_t szVer; - int32_t szTSKEY; - int32_t szBlock; }; struct SColData { @@ -502,13 +498,17 @@ struct SDelIdx { int64_t size; }; -#pragma pack(push, 1) struct SBlockDataHdr { uint32_t delimiter; + int32_t nRow; int64_t suid; int64_t uid; + int32_t szUid; + int32_t szVer; + int32_t szKey; + int32_t szBlkCol; + int8_t cmprAlg; }; -#pragma pack(pop) struct SDelFile { volatile int32_t nRef; @@ -596,6 +596,24 @@ struct STsdbReadSnap { STsdbFS fs; }; +struct SDiskData { + int8_t cmprAlg; + int32_t nRow; + int64_t suid; + int64_t uid; + int32_t szUid; + int32_t szVer; + int32_t szKey; + + uint8_t *pUid; + uint8_t *pVer; + uint8_t *pKey; + SArray *aBlockCol; // SArray + int32_t nBuf; + SArray *aBuf; // SArray + uint8_t *pBuf; +}; + // ========== inline functions ========== static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { TSDBKEY *pKey1 = (TSDBKEY *)p1; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index cadb6fefd2..84ecaa605d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -630,7 +630,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ } // 2. version range check - if (block.minVersion > pReader->verRange.maxVer || block.maxVersion < pReader->verRange.minVer) { + if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) { continue; } @@ -766,7 +766,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows, - pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr); + pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr); return TSDB_CODE_SUCCESS; } @@ -798,7 +798,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow, - pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr); + pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr); return TSDB_CODE_SUCCESS; @@ -1011,8 +1011,8 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) { static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SBlock* pBlock) { return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) || (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) || - (pVerRange->minVer > pBlock->minVersion && pVerRange->minVer <= pBlock->maxVersion) || - (pVerRange->maxVer < pBlock->maxVersion && pVerRange->maxVer >= pBlock->minVersion); + (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) || + (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer); } static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo, @@ -1092,8 +1092,8 @@ static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) } static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) { - return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVersion >= pVerRange->minVer) && - (pBlock->minVersion <= pVerRange->maxVer); + return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) && + (pBlock->minVer <= pVerRange->maxVer); } static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock) { @@ -1102,11 +1102,11 @@ static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, cons for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) { TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i); if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) { - if (p->version >= pBlock->minVersion) { + if (p->version >= pBlock->minVer) { return true; } } else if (p->ts < pBlock->minKey.ts) { // p->ts < pBlock->minKey.ts - if (p->version >= pBlock->minVersion) { + if (p->version >= pBlock->minVer) { if (i < num - 1) { TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1); if (i + 1 == num - 1) { // pnext is the last point @@ -1114,7 +1114,7 @@ static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, cons return true; } } else { - if (pnext->ts >= pBlock->minKey.ts && pnext->version >= pBlock->minVersion) { + if (pnext->ts >= pBlock->minKey.ts && pnext->version >= pBlock->minVer) { return true; } } @@ -2512,9 +2512,7 @@ void* tsdbGetIvtIdx(SMeta* pMeta) { return metaGetIvtIdx(pMeta); } -uint64_t getReaderMaxVersion(STsdbReader *pReader) { - return pReader->verRange.maxVer; -} +uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; } /** * @brief Get all suids since suid diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index bc4656278c..eeb11abc97 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -1909,8 +1909,8 @@ static void tsdbUpdateBlockInfo(SBlockData *pBlockData, SBlock *pBlock) { pBlock->maxKey = key; } - pBlock->minVersion = TMIN(pBlock->minVersion, key.version); - pBlock->maxVersion = TMAX(pBlock->maxVersion, key.version); + pBlock->minVer = TMIN(pBlock->minVer, key.version); + pBlock->maxVer = TMAX(pBlock->maxVer, key.version); } pBlock->nRow += pBlockData->nRow; } @@ -2310,9 +2310,13 @@ int32_t tsdbWriteLastBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock pBlockL->szBlock = pBlockL->szBlockCol + sizeof(TSCKSUM) + nBuf1; pWriter->fLast.size += pBlockL->szBlock; + tFree(pBuf1); + tFree(pBuf2); return code; _err: + tFree(pBuf1); + tFree(pBuf2); return code; } @@ -2426,4 +2430,4 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { _err: tsdbError("vgId:%d, tsdb DFileSet copy failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 1ae241f0c1..13d293f86b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -227,8 +227,8 @@ int32_t tPutBlock(uint8_t *p, void *ph) { n += tPutTSDBKEY(p ? p + n : p, &pBlock->minKey); n += tPutTSDBKEY(p ? p + n : p, &pBlock->maxKey); - n += tPutI64v(p ? p + n : p, pBlock->minVersion); - n += tPutI64v(p ? p + n : p, pBlock->maxVersion); + n += tPutI64v(p ? p + n : p, pBlock->minVer); + n += tPutI64v(p ? p + n : p, pBlock->maxVer); n += tPutI32v(p ? p + n : p, pBlock->nRow); n += tPutI8(p ? p + n : p, pBlock->hasDup); n += tPutI8(p ? p + n : p, pBlock->nSubBlock); @@ -253,8 +253,8 @@ int32_t tGetBlock(uint8_t *p, void *ph) { n += tGetTSDBKEY(p + n, &pBlock->minKey); n += tGetTSDBKEY(p + n, &pBlock->maxKey); - n += tGetI64v(p + n, &pBlock->minVersion); - n += tGetI64v(p + n, &pBlock->maxVersion); + n += tGetI64v(p + n, &pBlock->minVer); + n += tGetI64v(p + n, &pBlock->maxVer); n += tGetI32v(p + n, &pBlock->nRow); n += tGetI8(p + n, &pBlock->hasDup); n += tGetI8(p + n, &pBlock->nSubBlock); @@ -1509,6 +1509,206 @@ int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData) { return n; } +// SDiskData ============================== +static int32_t tsdbCmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t nOut, + int32_t *szOut, uint8_t **ppBuf) { + int32_t code = 0; + + ASSERT(szIn > 0 && ppOut); + + if (cmprAlg == NO_COMPRESSION) { + code = tRealloc(ppOut, nOut + szIn); + if (code) goto _exit; + + memcpy(*ppOut + nOut, pIn, szIn); + *szOut = szIn; + } else { + int32_t size = szIn + COMP_OVERFLOW_BYTES; + + code = tRealloc(ppOut, nOut + size); + if (code) goto _exit; + + if (cmprAlg == TWO_STAGE_COMP) { + ASSERT(ppBuf); + code = tRealloc(ppBuf, size); + if (code) goto _exit; + } + + *szOut = + tDataTypes[type].compFunc(pIn, szIn, szIn / tDataTypes[type].bytes, *ppOut + nOut, size, cmprAlg, *ppBuf, size); + if (*szOut <= 0) { + code = TSDB_CODE_COMPRESS_ERROR; + goto _exit; + } + } + +_exit: + return code; +} + +static int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppBuf) { + int32_t code = 0; + + int32_t n = 0; + // bitmap + if (pColData->flag != HAS_VALUE) { + code = tsdbCmprData(pColData->pBitMap, BIT2_SIZE(pColData->nVal), TSDB_DATA_TYPE_TINYINT, cmprAlg, + pBlockCol->ppData, n, &pBlockCol->szBitmap, ppBuf); + if (code) goto _exit; + } else { + pBlockCol->szBitmap = 0; + } + n += pBlockCol->szBitmap; + + // offset + if (IS_VAR_DATA_TYPE(pColData->type)) { + code = tsdbCmprData((uint8_t *)pColData->aOffset, sizeof(int32_t) * pColData->nVal, TSDB_DATA_TYPE_INT, cmprAlg, + pBlockCol->ppData, n, &pBlockCol->szOffset, ppBuf); + if (code) goto _exit; + } else { + pBlockCol->szOffset = 0; + } + n += pBlockCol->szOffset; + + // value + if (pColData->flag != (HAS_NULL | HAS_NONE)) { + code = tsdbCmprData((uint8_t *)pColData->pData, pColData->nData, pColData->type, cmprAlg, pBlockCol->ppData, n, + &pBlockCol->szValue, ppBuf); + if (code) goto _exit; + } else { + pBlockCol->szValue = 0; + } + n += pBlockCol->szValue; + + // checksum + n += sizeof(TSCKSUM); + taosCalcChecksumAppend(0, *ppBuf, n); + +_exit: + return code; +} + +static int32_t tsdbDecmprData() { + int32_t code = 0; + // TODO + return code; +} + +int32_t tDiskDataInit(SDiskData *pDiskData) { + int32_t code = 0; + + pDiskData->aBlockCol = taosArrayInit(0, sizeof(SBlockCol)); + if (pDiskData->aBlockCol == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + pDiskData->aBuf = taosArrayInit(0, sizeof(uint8_t *)); + if (pDiskData->aBuf == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + +_exit: + return code; +} + +void tDiskDataClear(SDiskData *pDiskData) { + taosArrayDestroy(pDiskData->aBlockCol); + for (int32_t i = 0; i < taosArrayGetSize(pDiskData->aBuf); i++) { + tFree((uint8_t *)taosArrayGet(pDiskData->aBuf, i)); + } + taosArrayDestroy(pDiskData->aBuf); +} + +static uint8_t **tDiskDataAllocBuf(SDiskData *pDiskData) { + if (pDiskData->nBuf >= taosArrayGetSize(pDiskData->aBuf)) { + uint8_t *p = NULL; + if (taosArrayPush(pDiskData->aBuf, &p) == NULL) { + return NULL; + } + } + + ASSERT(pDiskData->nBuf < taosArrayGetSize(pDiskData->aBuf)); + uint8_t **pp = taosArrayGet(pDiskData->aBuf, pDiskData->nBuf); + pDiskData->nBuf++; + return pp; +} + +int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg) { + int32_t code = 0; + + ASSERT(pBlockData->nRow > 0); + + pDiskData->cmprAlg = cmprAlg; + pDiskData->nRow = pBlockData->nRow; + pDiskData->suid = pBlockData->suid; + pDiskData->uid = pBlockData->uid; + pDiskData->szUid = 0; + pDiskData->szVer = 0; + pDiskData->szKey = 0; + taosArrayClear(pDiskData->aBlockCol); + pDiskData->nBuf = 0; + + // uid + if (pDiskData->uid == 0) { + code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, cmprAlg, + &pDiskData->pUid, &pDiskData->szUid, &pDiskData->pBuf); + if (code) goto _exit; + } + + // version + code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, + cmprAlg, &pDiskData->pVer, &pDiskData->szVer, &pDiskData->pBuf); + if (code) goto _exit; + + // tskey + code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP, + cmprAlg, &pDiskData->pKey, &pDiskData->szKey, &pDiskData->pBuf); + if (code) goto _exit; + + // columns + int32_t offset = 0; + for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) { + SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); + + if (pColData->flag == HAS_NONE) continue; + + SBlockCol blockCol = {.cid = pColData->cid, + .type = pColData->type, + .smaOn = pColData->smaOn, + .flag = pColData->flag, + .szOrigin = pColData->nData}; + + if (pColData->flag != HAS_NULL) { + // alloc a buffer + blockCol.ppData = tDiskDataAllocBuf(pDiskData); + if (blockCol.ppData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + // compress + code = tsdbCmprColData(pColData, cmprAlg, &blockCol); + if (code) goto _exit; + + // update offset + blockCol.offset = offset; + offset = offset + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue; + } + + if (taosArrayPush(pDiskData->aBlockCol, &blockCol) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } + +_exit: + return code; +} +int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg); +int32_t tDiskToBlockData(SDiskData *pDiskData, SBlockData *pBlockData); + // ALGORITHM ============================== void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { SColVal colVal;