diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index e748cce643..69b2a2e6a3 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -535,8 +535,8 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b if (asc) { if (IS_NUMERIC_TYPE(pColInfoData->info.type)) { - pDataBlock->info.pks[0].val = *(int32_t*) skey; - pDataBlock->info.pks[1].val = *(int32_t*) ekey; + GET_TYPED_DATA(pDataBlock->info.pks[0].val, int64_t, pColInfoData->info.type, skey); + GET_TYPED_DATA(pDataBlock->info.pks[1].val, int64_t, pColInfoData->info.type, ekey); } else { // todo refactor memcpy(pDataBlock->info.pks[0].pData, varDataVal(skey), varDataLen(skey)); pDataBlock->info.pks[0].nData = varDataLen(skey); @@ -546,8 +546,8 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b } } else { if (IS_NUMERIC_TYPE(pColInfoData->info.type)) { - pDataBlock->info.pks[0].val = *(int32_t*) ekey; - pDataBlock->info.pks[1].val = *(int32_t*) skey; + GET_TYPED_DATA(pDataBlock->info.pks[0].val, int64_t, pColInfoData->info.type, ekey); + GET_TYPED_DATA(pDataBlock->info.pks[1].val, int64_t, pColInfoData->info.type, skey); } else { // todo refactor memcpy(pDataBlock->info.pks[0].pData, varDataVal(ekey), varDataLen(ekey)); pDataBlock->info.pks[0].nData = varDataLen(ekey); @@ -1491,6 +1491,18 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { blockDataAppendColInfo(pBlock, &colInfo); } + // prepare the pk buffer if necessary + if (IS_VAR_DATA_TYPE(pDataBlock->info.pks[0].type)) { + SValue* pVal = &pBlock->info.pks[0]; + + pVal->type = pDataBlock->info.pks[0].type; + pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[0].nData); + + pVal = &pBlock->info.pks[1]; + pVal->type = pDataBlock->info.pks[1].type; + pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData); + } + if (copyData) { int32_t code = blockDataEnsureCapacity(pBlock, pDataBlock->info.rows); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 57d4121880..a8a4ced517 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -1678,11 +1678,9 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* __compar_fn_t compFn = pReader->pkComparFn; int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot; - SRowKey* pSttKey = &(SRowKey){0}; + SRowKey* pSttKey = NULL; if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); - } else { - pSttKey = NULL; } SRowKey k; @@ -1714,10 +1712,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } - SRowKey minKey; + SRowKey minKey = k; if (pReader->info.order == TSDB_ORDER_ASC) { - minKey = k; // chosen the minimum value - if (pfKey != NULL && pkCompEx(compFn, pfKey, &minKey) < 0) { minKey = *pfKey; } @@ -1726,8 +1722,6 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* minKey = *pSttKey; } } else { - minKey = k; - if (pfKey != NULL && pkCompEx(compFn, pfKey, &minKey) > 0) { minKey = *pfKey; } @@ -1882,11 +1876,9 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader); TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader); - SRowKey* pSttKey = &(SRowKey){0}; + SRowKey* pSttKey = NULL; if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { - tRowKeyAssign(pSttKey, getCurrentKeyInSttBlock(pSttBlockReader)); - } else { - pSttKey = NULL; + pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); } SRowKey* pfKey = &(SRowKey){0}; diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 1c70fd8bb6..55b803f6d4 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -162,6 +162,7 @@ bool hasRemainResults(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode); +int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext); int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId, SStorageAPI* pAPI); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 97b9b00efb..9749dffc13 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -250,6 +250,34 @@ SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) { return pBlock; } +int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) { + SDataBlockInfo* pBlockInfo = &pDataBlock->info; + + for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) { + SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i); + if (pItem->isPk) { + SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId); + pBlockInfo->pks[0].type = pInfoData->info.type; + pBlockInfo->pks[1].type = pInfoData->info.type; + + if (IS_VAR_DATA_TYPE(pItem->dataType.type)) { + pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes); + if (pBlockInfo->pks[0].pData == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes); + if (pBlockInfo->pks[1].pData == NULL) { + taosMemoryFreeClear(pBlockInfo->pks[0].pData); + return TSDB_CODE_OUT_OF_MEMORY; + } + } + } + } + + return TSDB_CODE_SUCCESS; +} + EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) { SMetaReader* mr = (SMetaReader*)pContext; if (nodeType(*pNode) == QUERY_NODE_COLUMN) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a51e627272..0d4c536dec 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1196,23 +1196,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader; initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pResBlock = createDataBlockFromDescNode(pDescNode); + prepareDataBlockBuf(pInfo->pResBlock, &pInfo->base.matchInfo); - { // todo :refactor: - SDataBlockInfo* pBlockInfo = &pInfo->pResBlock->info; - for(int32_t i = 0; i < taosArrayGetSize(pInfo->base.matchInfo.pList); ++i) { - SColMatchItem* pItem = taosArrayGet(pInfo->base.matchInfo.pList, i); - if (pItem->isPk) { - SColumnInfoData* pInfoData = taosArrayGet(pInfo->pResBlock->pDataBlock, pItem->dstSlotId); - pBlockInfo->pks[0].type = pInfoData->info.type; - pBlockInfo->pks[1].type = pInfoData->info.type; - - if (IS_VAR_DATA_TYPE(pItem->dataType.type)) { - pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes); - pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes); - } - } - } - } code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -4418,6 +4403,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->bSortRowId = false; } + prepareDataBlockBuf(pInfo->pResBlock, &pInfo->base.matchInfo); + pInfo->pSortInfo = generateSortByTsPkInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order); pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);