diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 50b3540fd2..ffcb604191 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -115,6 +115,7 @@ int32_t tValueColumnDecompress(void *input, int32_t inputSize, const SValueColum SValueColumn *valCol, SBuffer *helperBuffer); int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *compressInfo, SBufferWriter *writer); int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompressInfo *compressInfo); +int32_t tValueCompare(const SValue *tv1, const SValue *tv2); // SRow ================================ int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 13da2238bb..ac5b50c891 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -1250,7 +1250,7 @@ void tRowGetKey(SRow *pRow, SRowKey *key) { } \ } while (0) -static int32_t tValueCompare(const SValue *tv1, const SValue *tv2) { +int32_t tValueCompare(const SValue *tv1, const SValue *tv2) { ASSERT(tv1->type == tv2->type); switch (tv1->type) { diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 0b86cae1be..f4567547ec 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -15,10 +15,10 @@ #include "tsdb.h" #include "tsdbFSet2.h" -#include "tsdbUtil2.h" #include "tsdbMerge.h" #include "tsdbReadUtil.h" #include "tsdbSttFileRW.h" +#include "tsdbUtil2.h" static void tLDataIterClose2(SLDataIter *pIter); @@ -60,7 +60,7 @@ void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { pLoadInfo->currentLoadBlockIndex = 1; - SBlockDataInfo* pInfo = &pLoadInfo->blockData[0]; + SBlockDataInfo *pInfo = &pLoadInfo->blockData[0]; tBlockDataDestroy(&pInfo->data); pInfo->sttBlockIndex = -1; pInfo->pin = false; @@ -88,7 +88,7 @@ void destroyLDataIter(SLDataIter *pIter) { taosMemoryFree(pIter); } -void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo* pLoadCost) { +void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost) { if (pLDataIterArray == NULL) { return NULL; } @@ -115,7 +115,7 @@ void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo* pLoa } // choose the unpinned slot to load next data block -static void updateBlockLoadSlot(SSttBlockLoadInfo* pLoadInfo) { +static void updateBlockLoadSlot(SSttBlockLoadInfo *pLoadInfo) { int32_t nextSlotIndex = pLoadInfo->currentLoadBlockIndex ^ 1; if (pLoadInfo->blockData[nextSlotIndex].pin) { nextSlotIndex = nextSlotIndex ^ 1; @@ -180,7 +180,7 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { pInfo->blockData[1].sttBlockIndex, pIter->iRow, idStr); return &pInfo->blockData[pInfo->currentLoadBlockIndex].data; - _exit: +_exit: if (code != TSDB_CODE_SUCCESS) { terrno = code; } @@ -325,7 +325,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl } int32_t startIndex = 0; - while((startIndex < numOfBlocks) && (pStatisBlkArray->data[startIndex].maxTbid.suid < suid)) { + while ((startIndex < numOfBlocks) && (pStatisBlkArray->data[startIndex].maxTbid.suid < suid)) { ++startIndex; } @@ -334,7 +334,7 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl } int32_t endIndex = startIndex; - while(endIndex < numOfBlocks && pStatisBlkArray->data[endIndex].minTbid.suid <= suid) { + while (endIndex < numOfBlocks && pStatisBlkArray->data[endIndex].minTbid.suid <= suid) { ++endIndex; } @@ -346,12 +346,12 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl int64_t st = taosGetTimestampUs(); - for(int32_t k = startIndex; k < endIndex; ++k) { + for (int32_t k = startIndex; k < endIndex; ++k) { tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[k], &block); int32_t i = 0; - int32_t rows = TARRAY2_SIZE(block.suid); - while (i < rows && block.suid->data[i] != suid) { + int32_t rows = block.numOfRecords; + while (i < rows && ((int64_t *)block.suids.data)[i] != suid) { ++i; } @@ -365,16 +365,19 @@ static int32_t loadSttStatisticsBlockData(SSttFileReader *pSttFileReader, SSttBl } if (pStatisBlkArray->data[k].maxTbid.suid == suid) { - taosArrayAddBatch(pBlockLoadInfo->info.pUid, &block.uid->data[i], rows - i); - taosArrayAddBatch(pBlockLoadInfo->info.pFirstKey, &block.firstKey->data[i], rows - i); - taosArrayAddBatch(pBlockLoadInfo->info.pLastKey, &block.lastKey->data[i], rows - i); - taosArrayAddBatch(pBlockLoadInfo->info.pCount, &block.count->data[i], rows - i); + taosArrayAddBatch(pBlockLoadInfo->info.pUid, tBufferGetDataAt(&block.suids, i * sizeof(int64_t)), rows - i); + taosArrayAddBatch(pBlockLoadInfo->info.pFirstKey, + tBufferGetDataAt(&block.firstKeyTimestamps, i * sizeof(int64_t)), rows - i); + taosArrayAddBatch(pBlockLoadInfo->info.pLastKey, + tBufferGetDataAt(&block.lastKeyTimestamps, i * sizeof(int64_t)), rows - i); + taosArrayAddBatch(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, i * sizeof(int64_t)), rows - i); } else { - while (i < rows && block.suid->data[i] == suid) { - taosArrayPush(pBlockLoadInfo->info.pUid, &block.uid->data[i]); - taosArrayPush(pBlockLoadInfo->info.pFirstKey, &block.firstKey->data[i]); - taosArrayPush(pBlockLoadInfo->info.pLastKey, &block.lastKey->data[i]); - taosArrayPush(pBlockLoadInfo->info.pCount, &block.count->data[i]); + while (i < rows && ((int64_t *)block.suids.data)[i] == suid) { + taosArrayPush(pBlockLoadInfo->info.pUid, tBufferGetDataAt(&block.uids, i * sizeof(int64_t))); + taosArrayPush(pBlockLoadInfo->info.pFirstKey, + tBufferGetDataAt(&block.firstKeyTimestamps, i * sizeof(int64_t))); + taosArrayPush(pBlockLoadInfo->info.pLastKey, tBufferGetDataAt(&block.lastKeyTimestamps, i * sizeof(int64_t))); + taosArrayPush(pBlockLoadInfo->info.pCount, tBufferGetDataAt(&block.counts, i * sizeof(int64_t))); i += 1; } } @@ -433,14 +436,14 @@ static int32_t doLoadSttFilesBlk(SSttBlockLoadInfo *pBlockLoadInfo, SLDataIter * return code; } -static int32_t uidComparFn(const void* p1, const void* p2) { +static int32_t uidComparFn(const void *p1, const void *p2) { const uint64_t *pFirst = p1; const uint64_t *pVal = p2; if (*pFirst == *pVal) { return 0; } else { - return *pFirst < *pVal? -1:1; + return *pFirst < *pVal ? -1 : 1; } } @@ -455,7 +458,7 @@ static void setSttInfoForCurrentTable(SSttBlockLoadInfo *pLoadInfo, uint64_t uid pTimeWindow->skey = *(int64_t *)taosArrayGet(pLoadInfo->info.pFirstKey, index); pTimeWindow->ekey = *(int64_t *)taosArrayGet(pLoadInfo->info.pLastKey, index); - *numOfRows += *(int64_t*) taosArrayGet(pLoadInfo->info.pCount, index); + *numOfRows += *(int64_t *)taosArrayGet(pLoadInfo->info.pCount, index); } } @@ -709,7 +712,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { pIter->rInfo.uid = pBlockData->uid; pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow); - _exit: +_exit: return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL); } @@ -740,7 +743,7 @@ static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SR return -1 * tLDataIterCmprFn(p1, p2); } -int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable* pSttDataInfo) { +int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable *pSttDataInfo) { int32_t code = TSDB_CODE_SUCCESS; pMTree->pIter = NULL; @@ -765,12 +768,12 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF for (int32_t j = 0; j < numOfLevels; ++j) { SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j]; - SArray * pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j); + SArray *pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j); for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) { // open all last file SLDataIter *pIter = taosArrayGetP(pList, i); - SSttFileReader * pSttFileReader = pIter->pReader; + SSttFileReader *pSttFileReader = pIter->pReader; SSttBlockLoadInfo *pLoadInfo = pIter->pBlockLoadInfo; // open stt file reader if not opened yet @@ -796,7 +799,8 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF int64_t numOfRows = 0; int64_t cid = pSttLevel->fobjArr->data[i]->f->cid; - code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf, pLoadInfo, &w, &numOfRows, pMTree->idStr); + code = tLDataIterOpen2(pIter, pSttFileReader, cid, pMTree->backward, pConf, pLoadInfo, &w, &numOfRows, + pMTree->idStr); if (code != TSDB_CODE_SUCCESS) { goto _end; } @@ -820,7 +824,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF return code; - _end: +_end: tMergeTreeClose(pMTree); return code; } @@ -829,8 +833,8 @@ void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTr bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; } -static void tLDataIterPinSttBlock(SLDataIter* pIter, const char* id) { - SSttBlockLoadInfo* pInfo = pIter->pBlockLoadInfo; +static void tLDataIterPinSttBlock(SLDataIter *pIter, const char *id) { + SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo; if (pInfo->blockData[0].sttBlockIndex == pIter->iSttBlk) { pInfo->blockData[0].pin = true; @@ -842,15 +846,15 @@ static void tLDataIterPinSttBlock(SLDataIter* pIter, const char* id) { if (pInfo->blockData[1].sttBlockIndex == pIter->iSttBlk) { pInfo->blockData[1].pin = true; ASSERT(!pInfo->blockData[0].pin); - tsdbTrace("pin stt-block, blockIndex:%d, stt-fileVer:%"PRId64" %s", pIter->iSttBlk, pIter->cid, id); + tsdbTrace("pin stt-block, blockIndex:%d, stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id); return; } - tsdbError("failed to pin any stt block, sttBlock:%d stt-fileVer:%"PRId64" %s", pIter->iSttBlk, pIter->cid, id); + tsdbError("failed to pin any stt block, sttBlock:%d stt-fileVer:%" PRId64 " %s", pIter->iSttBlk, pIter->cid, id); } -static void tLDataIterUnpinSttBlock(SLDataIter* pIter, const char* id) { - SSttBlockLoadInfo* pInfo = pIter->pBlockLoadInfo; +static void tLDataIterUnpinSttBlock(SLDataIter *pIter, const char *id) { + SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo; if (pInfo->blockData[0].pin) { ASSERT(!pInfo->blockData[1].pin); pInfo->blockData[0].pin = false; @@ -883,7 +887,7 @@ void tMergeTreeUnpinSttBlock(SMergeTree *pMTree) { return; } - SLDataIter* pIter = pMTree->pPinnedBlockIter; + SLDataIter *pIter = pMTree->pPinnedBlockIter; pMTree->pPinnedBlockIter = NULL; tLDataIterUnpinSttBlock(pIter, pMTree->idStr); } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index dc16953d24..95ac0f0133 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -356,7 +356,7 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v return pLeftBlock->offset > pRightBlock->offset ? 1 : -1; } -static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record){ +static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record) { pBlockInfo->uid = record->uid; pBlockInfo->firstKey = record->firstKey.key.ts; pBlockInfo->lastKey = record->lastKey.key.ts; @@ -429,7 +429,7 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 } for (int32_t i = 0; i < numOfBlocks; ++i) { SFileDataBlockInfo blockInfo = {.tbBlockIdx = i}; - SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i); + SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i); recordToBlockInfo(&blockInfo, record); taosArrayPush(pBlockIter->blockList, &blockInfo); @@ -464,7 +464,7 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 int32_t index = sup.indexPerTable[pos]++; SFileDataBlockInfo blockInfo = {.tbBlockIdx = index}; - SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index); + SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index); recordToBlockInfo(&blockInfo, record); taosArrayPush(pBlockIter->blockList, &blockInfo); @@ -722,11 +722,11 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo pBlockLoadInfo->cost.statisElapsedTime += el; int32_t index = 0; - while (index < TARRAY2_SIZE(pStatisBlock->suid) && pStatisBlock->suid->data[index] < suid) { + while (index < pStatisBlock->numOfRecords && ((int64_t*)pStatisBlock->suids.data)[index] < suid) { ++index; } - if (index >= TARRAY2_SIZE(pStatisBlock->suid)) { + if (index >= pStatisBlock->numOfRecords) { tStatisBlockDestroy(pStatisBlock); taosMemoryFreeClear(pStatisBlock); return num; @@ -744,14 +744,14 @@ int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo uint64_t uid = pUidList[uidIndex]; - if (pStatisBlock->uid->data[j] == uid) { - num += pStatisBlock->count->data[j]; + if (((int64_t*)pStatisBlock->uids.data)[j] == uid) { + num += ((int64_t*)pStatisBlock->counts.data)[j]; uidIndex += 1; j += 1; - loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->suid->size, &i, &j); - } else if (pStatisBlock->uid->data[j] < uid) { + loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j); + } else if (((int64_t*)pStatisBlock->uids.data)[j] < uid) { j += 1; - loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->suid->size, &i, &j); + loadNextStatisticsBlock(pSttFileReader, pStatisBlock, pStatisBlkArray, pStatisBlock->numOfRecords, &i, &j); } else { uidIndex += 1; } diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index b56564dbd8..e37c73d2af 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -29,7 +29,8 @@ struct SSttFileReader { TSttBlkArray sttBlkArray[1]; TStatisBlkArray statisBlkArray[1]; TTombBlkArray tombBlkArray[1]; - uint8_t *bufArr[5]; + uint8_t *bufArr[5]; // TODO: remove here + SBuffer *buffers; }; // SSttFileReader @@ -471,27 +472,70 @@ _exit: int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *statisBlock) { int32_t code = 0; int32_t lino = 0; - - code = tRealloc(&reader->config->bufArr[0], statisBlk->dp->size); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbReadFile(reader->fd, statisBlk->dp->offset, reader->config->bufArr[0], statisBlk->dp->size, 0); - TSDB_CHECK_CODE(code, lino, _exit); - int64_t size = 0; + + // load data from file + code = tBufferEnsureCapacity(&reader->buffers[0], statisBlk->dp->size); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbReadFile(reader->fd, statisBlk->dp->offset, reader->buffers[0].data, statisBlk->dp->size, 0); + TSDB_CHECK_CODE(code, lino, _exit); + + // decode data tStatisBlockClear(statisBlock); - for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->dataArr); ++i) { - code = - tsdbDecmprData(reader->config->bufArr[0] + size, statisBlk->size[i], TSDB_DATA_TYPE_BIGINT, statisBlk->cmprAlg, - &reader->config->bufArr[1], sizeof(int64_t) * statisBlk->numRec, &reader->config->bufArr[2]); - TSDB_CHECK_CODE(code, lino, _exit); + statisBlock->numOfPKs = statisBlk->numOfPKs; + statisBlock->numOfRecords = statisBlk->numRec; - code = TARRAY2_APPEND_BATCH(statisBlock->dataArr + i, reader->config->bufArr[1], statisBlk->numRec); - TSDB_CHECK_CODE(code, lino, _exit); + for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) { + SCompressInfo info = { + .dataType = TSDB_DATA_TYPE_BIGINT, + .cmprAlg = statisBlk->cmprAlg, + .compressedSize = statisBlk->size[i], + .originalSize = statisBlk->numRec * sizeof(int64_t), + }; + code = tDecompressData(tBufferGetDataAt(&reader->buffers[0], size), statisBlk->size[i], &info, + &statisBlock->buffers[i], &reader->buffers[1]); + TSDB_CHECK_CODE(code, lino, _exit); size += statisBlk->size[i]; } + if (statisBlk->numOfPKs > 0) { + SValueColumnCompressInfo firstKeyInfos[TD_MAX_PRIMARY_KEY_COL]; + SValueColumnCompressInfo lastKeyInfos[TD_MAX_PRIMARY_KEY_COL]; + SBufferReader bfReader; + + tBufferReaderInit(&bfReader, true, size, &reader->buffers[0]); + + // decode compress info + for (int32_t i = 0; i < statisBlk->numOfPKs; i++) { + code = tValueColumnCompressInfoDecode(&bfReader, &firstKeyInfos[i]); + TSDB_CHECK_CODE(code, lino, _exit); + } + + for (int32_t i = 0; i < statisBlk->numOfPKs; i++) { + code = tValueColumnCompressInfoDecode(&bfReader, &lastKeyInfos[i]); + TSDB_CHECK_CODE(code, lino, _exit); + } + + size = bfReader.offset; + + // decode value columns + for (int32_t i = 0; i < statisBlk->numOfPKs; i++) { + code = tValueColumnDecompress(tBufferGetDataAt(&reader->buffers[0], size), firstKeyInfos[i].compressedDataSize, + &firstKeyInfos[i], &statisBlock->firstKeyPKs[i], &reader->buffers[1]); + TSDB_CHECK_CODE(code, lino, _exit); + size += firstKeyInfos[i].compressedDataSize; + } + + for (int32_t i = 0; i < statisBlk->numOfPKs; i++) { + code = tValueColumnDecompress(tBufferGetDataAt(&reader->buffers[0], size), lastKeyInfos[i].compressedDataSize, + &lastKeyInfos[i], &statisBlock->lastKeyPKs[i], &reader->buffers[1]); + TSDB_CHECK_CODE(code, lino, _exit); + size += lastKeyInfos[i].compressedDataSize; + } + } + ASSERT(size == statisBlk->dp->size); _exit: @@ -524,7 +568,8 @@ struct SSttFileWriter { // helper data SSkmInfo skmTb[1]; SSkmInfo skmRow[1]; - uint8_t *bufArr[5]; + uint8_t *bufArr[5]; // TODO: remove this + SBuffer *buffers; }; static int32_t tsdbFileDoWriteSttBlockData(STsdbFD *fd, SBlockData *blockData, int8_t cmprAlg, int64_t *fileSize, @@ -595,45 +640,88 @@ _exit: } static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) { - if (STATIS_BLOCK_SIZE(writer->staticBlock) == 0) return 0; + int32_t code = 0; + int32_t lino = 0; + STbStatisRecord record; + STbStatisBlock *statisBlock = writer->staticBlock; + SStatisBlk statisBlk = {0}; - int32_t code = 0; - int32_t lino = 0; - - SStatisBlk statisBlk[1] = {{ - .dp[0] = - { - .offset = writer->file->size, - .size = 0, - }, - .minTbid = - { - .suid = TARRAY2_FIRST(writer->staticBlock->suid), - .uid = TARRAY2_FIRST(writer->staticBlock->uid), - }, - .maxTbid = - { - .suid = TARRAY2_LAST(writer->staticBlock->suid), - .uid = TARRAY2_LAST(writer->staticBlock->uid), - }, - .numRec = STATIS_BLOCK_SIZE(writer->staticBlock), - .cmprAlg = writer->config->cmprAlg, - }}; - - for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; i++) { - code = tsdbCmprData((uint8_t *)TARRAY2_DATA(writer->staticBlock->dataArr + i), - TARRAY2_DATA_LEN(&writer->staticBlock->dataArr[i]), TSDB_DATA_TYPE_BIGINT, statisBlk->cmprAlg, - &writer->config->bufArr[0], 0, &statisBlk->size[i], &writer->config->bufArr[1]); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[0], statisBlk->size[i]); - TSDB_CHECK_CODE(code, lino, _exit); - - statisBlk->dp->size += statisBlk->size[i]; - writer->file->size += statisBlk->size[i]; + if (statisBlock->numOfRecords == 0) { + return 0; } - code = TARRAY2_APPEND_PTR(writer->statisBlkArray, statisBlk); + statisBlk.dp->offset = writer->file->size; + statisBlk.dp->size = 0; + statisBlk.numRec = statisBlock->numOfRecords; + statisBlk.cmprAlg = writer->config->cmprAlg; + statisBlk.numOfPKs = statisBlock->numOfPKs; + + tStatisBlockGet(statisBlock, 0, &record); + statisBlk.minTbid.suid = record.suid; + statisBlk.minTbid.uid = record.uid; + + tStatisBlockGet(statisBlock, statisBlock->numOfRecords - 1, &record); + statisBlk.maxTbid.suid = record.suid; + statisBlk.maxTbid.uid = record.uid; + + // compress each column + for (int32_t i = 0; i < ARRAY_SIZE(statisBlk.size); i++) { + SCompressInfo info = { + .dataType = TSDB_DATA_TYPE_BIGINT, + .cmprAlg = statisBlk.cmprAlg, + }; + + tBufferClear(&writer->buffers[0]); + code = tCompressData(tBufferGetData(&statisBlock->buffers[i]), tBufferGetSize(&statisBlock->buffers[i]), &info, + &writer->buffers[0], &writer->buffers[1]); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbWriteFile(writer->fd, writer->file->size, tBufferGetData(&writer->buffers[0]), info.compressedSize); + TSDB_CHECK_CODE(code, lino, _exit); + + statisBlk.size[i] = info.compressedSize; + statisBlk.dp->size += info.compressedSize; + writer->file->size += info.compressedSize; + } + + // compress primary keys + if (statisBlk.numOfPKs > 0) { + SBufferWriter bfWriter; + SValueColumnCompressInfo compressInfo = {.cmprAlg = statisBlk.cmprAlg}; + + tBufferClear(&writer->buffers[0]); + tBufferClear(&writer->buffers[1]); + tBufferWriterInit(&bfWriter, true, 0, &writer->buffers[0]); + + for (int32_t i = 0; i < statisBlk.numOfPKs; i++) { + code = + tValueColumnCompress(&statisBlock->firstKeyPKs[i], &compressInfo, &writer->buffers[1], &writer->buffers[2]); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tValueColumnCompressInfoEncode(&compressInfo, &bfWriter); + TSDB_CHECK_CODE(code, lino, _exit); + } + + for (int32_t i = 0; i < statisBlk.numOfPKs; i++) { + code = tValueColumnCompress(&statisBlock->lastKeyPKs[i], &compressInfo, &writer->buffers[1], &writer->buffers[2]); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tValueColumnCompressInfoEncode(&compressInfo, &bfWriter); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbWriteFile(writer->fd, writer->file->size, writer->buffers[0].data, writer->buffers[0].size); + TSDB_CHECK_CODE(code, lino, _exit); + writer->file->size += writer->buffers[0].size; + statisBlk.dp->size += writer->buffers[0].size; + + code = tsdbWriteFile(writer->fd, writer->file->size, writer->buffers[1].data, writer->buffers[1].size); + TSDB_CHECK_CODE(code, lino, _exit); + writer->file->size += writer->buffers[1].size; + statisBlk.dp->size += writer->buffers[1].size; + } + + code = TARRAY2_APPEND_PTR(writer->statisBlkArray, &statisBlk); TSDB_CHECK_CODE(code, lino, _exit); tStatisBlockClear(writer->staticBlock); @@ -918,14 +1006,8 @@ int32_t tsdbSttFileWriteRow(SSttFileWriter *writer, SRowInfo *row) { TSDB_CHECK_CODE(code, lino, _exit); } - TSDBKEY key[1]; - if (row->row.type == TSDBROW_ROW_FMT) { - key->ts = row->row.pTSRow->ts; - key->version = row->row.version; - } else { - key->ts = row->row.pBlockData->aTSKEY[row->row.iRow]; - key->version = row->row.pBlockData->aVersion[row->row.iRow]; - } + STsdbRowKey key; + tsdbRowGetKey(&row->row, &key); if (writer->ctx->tbid->uid != row->uid) { writer->ctx->tbid->suid = row->suid; @@ -939,18 +1021,22 @@ int32_t tsdbSttFileWriteRow(SSttFileWriter *writer, SRowInfo *row) { STbStatisRecord record = { .suid = row->suid, .uid = row->uid, - .firstKey = key->ts, - .lastKey = key->ts, + .firstKey = key.key, + .lastKey = key.key, .count = 1, }; code = tStatisBlockPut(writer->staticBlock, &record); TSDB_CHECK_CODE(code, lino, _exit); } else { - ASSERT(key->ts >= TARRAY2_LAST(writer->staticBlock->lastKey)); + // update last key and count + STbStatisRecord record; - if (key->ts > TARRAY2_LAST(writer->staticBlock->lastKey)) { - TARRAY2_LAST(writer->staticBlock->count)++; - TARRAY2_LAST(writer->staticBlock->lastKey) = key->ts; + tStatisBlockGet(writer->staticBlock, writer->staticBlock->numOfRecords, &record); + + if (tRowKeyCmpr(&key.key, &record.lastKey) > 0) { + // TODO: update count and last key + // TARRAY2_LAST(writer->staticBlock->count)++; + // TARRAY2_LAST(writer->staticBlock->lastKey) = key->ts; } } @@ -961,7 +1047,7 @@ int32_t tsdbSttFileWriteRow(SSttFileWriter *writer, SRowInfo *row) { } // row to col conversion - if (key->version <= writer->config->compactVersion // + if (key.version <= writer->config->compactVersion // && writer->blockData->nRow > 0 // && (writer->blockData->uid // ? writer->blockData->uid // diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil2.c b/source/dnode/vnode/src/tsdb/tsdbUtil2.c index 38d4e18669..058b3e0b2f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil2.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil2.c @@ -66,40 +66,103 @@ int32_t tTombRecordCompare(const STombRecord *r1, const STombRecord *r2) { // STbStatisBlock ---------- int32_t tStatisBlockInit(STbStatisBlock *statisBlock) { - for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; ++i) { - TARRAY2_INIT(&statisBlock->dataArr[i]); + statisBlock->numOfPKs = 0; + statisBlock->numOfRecords = 0; + for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) { + tBufferInit(&statisBlock->buffers[i]); + } + for (int32_t i = 0; i < TD_MAX_PRIMARY_KEY_COL; ++i) { + tValueColumnInit(&statisBlock->firstKeyPKs[i]); + tValueColumnInit(&statisBlock->lastKeyPKs[i]); } return 0; } int32_t tStatisBlockDestroy(STbStatisBlock *statisBlock) { - for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; ++i) { - TARRAY2_DESTROY(&statisBlock->dataArr[i], NULL); + statisBlock->numOfPKs = 0; + statisBlock->numOfRecords = 0; + for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) { + tBufferDestroy(&statisBlock->buffers[i]); + } + for (int32_t i = 0; i < TD_MAX_PRIMARY_KEY_COL; ++i) { + tValueColumnDestroy(&statisBlock->firstKeyPKs[i]); + tValueColumnDestroy(&statisBlock->lastKeyPKs[i]); } return 0; } int32_t tStatisBlockClear(STbStatisBlock *statisBlock) { - for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; ++i) { - TARRAY2_CLEAR(&statisBlock->dataArr[i], NULL); + statisBlock->numOfPKs = 0; + statisBlock->numOfRecords = 0; + for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) { + tBufferClear(&statisBlock->buffers[i]); + } + for (int32_t i = 0; i < TD_MAX_PRIMARY_KEY_COL; ++i) { + tValueColumnClear(&statisBlock->firstKeyPKs[i]); + tValueColumnClear(&statisBlock->lastKeyPKs[i]); } return 0; } int32_t tStatisBlockPut(STbStatisBlock *statisBlock, const STbStatisRecord *record) { int32_t code; - for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; ++i) { - code = TARRAY2_APPEND(&statisBlock->dataArr[i], record->dataArr[i]); + + if (statisBlock->numOfRecords == 0) { + statisBlock->numOfPKs = record->firstKey.numOfPKs; + } + + ASSERT(statisBlock->numOfPKs == record->firstKey.numOfPKs); + ASSERT(statisBlock->numOfPKs == record->lastKey.numOfPKs); + + code = tBufferAppend(&statisBlock->suids, &record->suid, sizeof(record->suid)); + if (code) return code; + code = tBufferAppend(&statisBlock->uids, &record->uid, sizeof(record->uid)); + if (code) return code; + code = tBufferAppend(&statisBlock->firstKeyTimestamps, &record->firstKey.ts, sizeof(record->firstKey.ts)); + if (code) return code; + code = tBufferAppend(&statisBlock->lastKeyTimestamps, &record->lastKey.ts, sizeof(record->lastKey.ts)); + if (code) return code; + code = tBufferAppend(&statisBlock->counts, &record->count, sizeof(record->count)); + if (code) return code; + + for (int32_t i = 0; i < statisBlock->numOfPKs; ++i) { + code = tValueColumnAppend(&statisBlock->firstKeyPKs[i], &record->firstKey.pks[i]); + if (code) return code; + code = tValueColumnAppend(&statisBlock->lastKeyPKs[i], &record->lastKey.pks[i]); if (code) return code; } + + statisBlock->numOfRecords++; return 0; } int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecord *record) { - if (idx >= STATIS_BLOCK_SIZE(statisBlock)) return TSDB_CODE_OUT_OF_RANGE; - for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; ++i) { - record->dataArr[i] = TARRAY2_GET(&statisBlock->dataArr[i], idx); + int32_t code; + + if (idx < 0 || idx >= statisBlock->numOfRecords) { + return TSDB_CODE_OUT_OF_RANGE; } + + code = tBufferGet(&statisBlock->suids, idx, sizeof(record->suid), &record->suid); + if (code) return code; + code = tBufferGet(&statisBlock->uids, idx, sizeof(record->uid), &record->uid); + if (code) return code; + code = tBufferGet(&statisBlock->firstKeyTimestamps, idx, sizeof(record->firstKey.ts), &record->firstKey.ts); + if (code) return code; + code = tBufferGet(&statisBlock->lastKeyTimestamps, idx, sizeof(record->lastKey.ts), &record->lastKey.ts); + if (code) return code; + code = tBufferGet(&statisBlock->counts, idx, sizeof(record->count), &record->count); + if (code) return code; + + record->firstKey.numOfPKs = statisBlock->numOfPKs; + record->lastKey.numOfPKs = statisBlock->numOfPKs; + for (int32_t i = 0; i < statisBlock->numOfPKs; ++i) { + code = tValueColumnGet(&statisBlock->firstKeyPKs[i], idx, &record->firstKey.pks[i]); + if (code) return code; + code = tValueColumnGet(&statisBlock->lastKeyPKs[i], idx, &record->lastKey.pks[i]); + if (code) return code; + } + return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil2.h b/source/dnode/vnode/src/tsdb/tsdbUtil2.h index 0b2d23f593..817dbdca65 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil2.h +++ b/source/dnode/vnode/src/tsdb/tsdbUtil2.h @@ -70,27 +70,29 @@ int32_t tTombBlockGet(STombBlock *tombBlock, int32_t idx, STombRecord *record); int32_t tTombRecordCompare(const STombRecord *record1, const STombRecord *record2); // STbStatisRecord ---------- -#define STATIS_RECORD_NUM_ELEM 5 -typedef union { - int64_t dataArr[STATIS_RECORD_NUM_ELEM]; - struct { - int64_t suid; - int64_t uid; - int64_t firstKey; - int64_t lastKey; - int64_t count; - }; +typedef struct { + int64_t suid; + int64_t uid; + SRowKey firstKey; + SRowKey lastKey; + int64_t count; } STbStatisRecord; -typedef union { - TARRAY2(int64_t) dataArr[STATIS_RECORD_NUM_ELEM]; - struct { - TARRAY2(int64_t) suid[1]; - TARRAY2(int64_t) uid[1]; - TARRAY2(int64_t) firstKey[1]; - TARRAY2(int64_t) lastKey[1]; - TARRAY2(int64_t) count[1]; +typedef struct { + int8_t numOfPKs; + int32_t numOfRecords; + union { + SBuffer buffers[5]; + struct { + SBuffer suids; + SBuffer uids; + SBuffer firstKeyTimestamps; + SBuffer lastKeyTimestamps; + SBuffer counts; + }; }; + SValueColumn firstKeyPKs[TD_MAX_PRIMARY_KEY_COL]; + SValueColumn lastKeyPKs[TD_MAX_PRIMARY_KEY_COL]; } STbStatisBlock; typedef struct { @@ -98,12 +100,13 @@ typedef struct { TABLEID minTbid; TABLEID maxTbid; int32_t numRec; - int32_t size[STATIS_RECORD_NUM_ELEM]; + int32_t size[5]; int8_t cmprAlg; - int8_t rsvd[7]; + int8_t numOfPKs; // number of primary keys + int8_t rsvd[6]; } SStatisBlk; -#define STATIS_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid) +#define STATIS_BLOCK_SIZE(db) ((db)->numOfRecords) int32_t tStatisBlockInit(STbStatisBlock *statisBlock); int32_t tStatisBlockDestroy(STbStatisBlock *statisBlock);