From b4ea80b637592f3efd725261fd7d18640f4a607c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 17:09:07 +0800 Subject: [PATCH] fix(tsdb): prepare the pk buf for blocks generated by reader. --- source/common/src/tdatablock.c | 2 ++ source/dnode/vnode/src/tsdb/tsdbRead2.c | 38 +++++++++++++++++++------ source/libs/executor/src/executil.c | 2 ++ 3 files changed, 33 insertions(+), 9 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 8d9ef6831d..20c4fa64c4 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1502,10 +1502,12 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { pVal->type = pDataBlock->info.pks[0].type; pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[0].nData); + pVal->nData = pDataBlock->info.pks[0].nData; pVal = &pBlock->info.pks[1]; pVal->type = pDataBlock->info.pks[1].type; pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData); + pVal->nData = pDataBlock->info.pks[1].nData; } if (copyData) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 3409559867..54d5c54788 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -485,7 +485,7 @@ void tsdbReleaseDataBlock2(STsdbReader* pReader) { } static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacity, SSDataBlock* pResBlock, - SQueryTableDataCond* pCond) { + SQueryTableDataCond* pCond, SBlockLoadSuppInfo* pSup) { pResBlockInfo->capacity = capacity; pResBlockInfo->pResBlock = pResBlock; terrno = 0; @@ -493,6 +493,28 @@ static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacit if (pResBlockInfo->pResBlock == NULL) { pResBlockInfo->freeBlock = true; pResBlockInfo->pResBlock = createResBlock(pCond, pResBlockInfo->capacity); + + if (pSup->numOfPks > 0) { + SSDataBlock* p = pResBlockInfo->pResBlock; + p->info.pks[0].type = pSup->pk.type; + p->info.pks[1].type = pSup->pk.type; + + if (IS_VAR_DATA_TYPE(pSup->pk.type)) { + p->info.pks[0].pData = taosMemoryCalloc(1, pSup->pk.bytes); + if (p->info.pks[0].pData == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + p->info.pks[1].pData = taosMemoryCalloc(1, pSup->pk.bytes); + if (p->info.pks[1].pData == NULL) { + taosMemoryFreeClear(p->info.pks[0].pData); + return TSDB_CODE_OUT_OF_MEMORY; + } + + p->info.pks[0].nData = pSup->pk.bytes; + p->info.pks[1].nData = pSup->pk.bytes; + } + } } else { pResBlockInfo->freeBlock = false; } @@ -525,14 +547,9 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL; pReader->type = pCond->type; - + pReader->bFilesetDelimited = false; pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket - code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond); - if (code != TSDB_CODE_SUCCESS) { - goto _end; - } - if (pCond->numOfCols <= 0) { tsdbError("vgId:%d, invalid column number %d in query cond, %s", TD_VID(pVnode), pCond->numOfCols, idstr); code = TSDB_CODE_INVALID_PARA; @@ -548,6 +565,11 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void pReader->pkComparFn = getComparFunc(pSup->pk.type, 0); } + code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond, pSup); + if (code != TSDB_CODE_SUCCESS) { + goto _end; + } + code = tBlockDataCreate(&pReader->status.fileBlockData); if (code != TSDB_CODE_SUCCESS) { terrno = code; @@ -569,8 +591,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void goto _end; } - pReader->bFilesetDelimited = false; - tsdbInitReaderLock(pReader); tsem_init(&pReader->resumeAfterSuspend, 0, 0); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index a9ab8b783c..d152baf502 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -276,6 +276,8 @@ int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) pBlockInfo->pks[0].nData = pInfoData->info.bytes; pBlockInfo->pks[1].nData = pInfoData->info.bytes; } + + break; } }