Merge branch 'feat/TS-4243-3.0' of https://github.com/taosdata/TDengine into feat/TS-4243-3.0

This commit is contained in:
kailixu 2024-04-08 18:23:23 +08:00
commit fcb605cac6
5 changed files with 52 additions and 32 deletions

View File

@ -535,8 +535,8 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b
if (asc) { if (asc) {
if (IS_NUMERIC_TYPE(pColInfoData->info.type)) { if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
pDataBlock->info.pks[0].val = *(int32_t*) skey; GET_TYPED_DATA(pDataBlock->info.pks[0].val, int64_t, pColInfoData->info.type, skey);
pDataBlock->info.pks[1].val = *(int32_t*) ekey; GET_TYPED_DATA(pDataBlock->info.pks[1].val, int64_t, pColInfoData->info.type, ekey);
} else { // todo refactor } else { // todo refactor
memcpy(pDataBlock->info.pks[0].pData, varDataVal(skey), varDataLen(skey)); memcpy(pDataBlock->info.pks[0].pData, varDataVal(skey), varDataLen(skey));
pDataBlock->info.pks[0].nData = varDataLen(skey); pDataBlock->info.pks[0].nData = varDataLen(skey);
@ -546,8 +546,8 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b
} }
} else { } else {
if (IS_NUMERIC_TYPE(pColInfoData->info.type)) { if (IS_NUMERIC_TYPE(pColInfoData->info.type)) {
pDataBlock->info.pks[0].val = *(int32_t*) ekey; GET_TYPED_DATA(pDataBlock->info.pks[0].val, int64_t, pColInfoData->info.type, ekey);
pDataBlock->info.pks[1].val = *(int32_t*) skey; GET_TYPED_DATA(pDataBlock->info.pks[1].val, int64_t, pColInfoData->info.type, skey);
} else { // todo refactor } else { // todo refactor
memcpy(pDataBlock->info.pks[0].pData, varDataVal(ekey), varDataLen(ekey)); memcpy(pDataBlock->info.pks[0].pData, varDataVal(ekey), varDataLen(ekey));
pDataBlock->info.pks[0].nData = varDataLen(ekey); pDataBlock->info.pks[0].nData = varDataLen(ekey);
@ -1491,6 +1491,18 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
blockDataAppendColInfo(pBlock, &colInfo); blockDataAppendColInfo(pBlock, &colInfo);
} }
// prepare the pk buffer if necessary
if (IS_VAR_DATA_TYPE(pDataBlock->info.pks[0].type)) {
SValue* pVal = &pBlock->info.pks[0];
pVal->type = pDataBlock->info.pks[0].type;
pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[0].nData);
pVal = &pBlock->info.pks[1];
pVal->type = pDataBlock->info.pks[1].type;
pVal->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData);
}
if (copyData) { if (copyData) {
int32_t code = blockDataEnsureCapacity(pBlock, pDataBlock->info.rows); int32_t code = blockDataEnsureCapacity(pBlock, pDataBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {

View File

@ -1678,11 +1678,9 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
__compar_fn_t compFn = pReader->pkComparFn; __compar_fn_t compFn = pReader->pkComparFn;
int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot; int32_t pkSrcSlot = pReader->suppInfo.pkSrcSlot;
SRowKey* pSttKey = &(SRowKey){0}; SRowKey* pSttKey = NULL;
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
pSttKey = getCurrentKeyInSttBlock(pSttBlockReader); pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
} else {
pSttKey = NULL;
} }
SRowKey k; SRowKey k;
@ -1714,10 +1712,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
} }
} }
SRowKey minKey; SRowKey minKey = k;
if (pReader->info.order == TSDB_ORDER_ASC) { if (pReader->info.order == TSDB_ORDER_ASC) {
minKey = k; // chosen the minimum value
if (pfKey != NULL && pkCompEx(compFn, pfKey, &minKey) < 0) { if (pfKey != NULL && pkCompEx(compFn, pfKey, &minKey) < 0) {
minKey = *pfKey; minKey = *pfKey;
} }
@ -1726,8 +1722,6 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
minKey = *pSttKey; minKey = *pSttKey;
} }
} else { } else {
minKey = k;
if (pfKey != NULL && pkCompEx(compFn, pfKey, &minKey) > 0) { if (pfKey != NULL && pkCompEx(compFn, pfKey, &minKey) > 0) {
minKey = *pfKey; minKey = *pfKey;
} }
@ -1882,11 +1876,9 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader); TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader); TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
SRowKey* pSttKey = &(SRowKey){0}; SRowKey* pSttKey = NULL;
if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) { if (hasDataInSttBlock(pBlockScanInfo) && (!pBlockScanInfo->cleanSttBlocks)) {
tRowKeyAssign(pSttKey, getCurrentKeyInSttBlock(pSttBlockReader)); pSttKey = getCurrentKeyInSttBlock(pSttBlockReader);
} else {
pSttKey = NULL;
} }
SRowKey* pfKey = &(SRowKey){0}; SRowKey* pfKey = &(SRowKey){0};

View File

@ -162,6 +162,7 @@ bool hasRemainResults(SGroupResInfo* pGroupResInfo);
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode); SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode);
int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo);
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId, SStorageAPI* pAPI); int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId, SStorageAPI* pAPI);

View File

@ -250,6 +250,34 @@ SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) {
return pBlock; return pBlock;
} }
int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo) {
SDataBlockInfo* pBlockInfo = &pDataBlock->info;
for (int32_t i = 0; i < taosArrayGetSize(pMatchInfo->pList); ++i) {
SColMatchItem* pItem = taosArrayGet(pMatchInfo->pList, i);
if (pItem->isPk) {
SColumnInfoData* pInfoData = taosArrayGet(pDataBlock->pDataBlock, pItem->dstSlotId);
pBlockInfo->pks[0].type = pInfoData->info.type;
pBlockInfo->pks[1].type = pInfoData->info.type;
if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
if (pBlockInfo->pks[0].pData == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
if (pBlockInfo->pks[1].pData == NULL) {
taosMemoryFreeClear(pBlockInfo->pks[0].pData);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
}
}
return TSDB_CODE_SUCCESS;
}
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) { EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
SMetaReader* mr = (SMetaReader*)pContext; SMetaReader* mr = (SMetaReader*)pContext;
if (nodeType(*pNode) == QUERY_NODE_COLUMN) { if (nodeType(*pNode) == QUERY_NODE_COLUMN) {

View File

@ -1196,23 +1196,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader; pInfo->base.readerAPI = pTaskInfo->storageAPI.tsdReader;
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode); pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
prepareDataBlockBuf(pInfo->pResBlock, &pInfo->base.matchInfo);
{ // todo :refactor:
SDataBlockInfo* pBlockInfo = &pInfo->pResBlock->info;
for(int32_t i = 0; i < taosArrayGetSize(pInfo->base.matchInfo.pList); ++i) {
SColMatchItem* pItem = taosArrayGet(pInfo->base.matchInfo.pList, i);
if (pItem->isPk) {
SColumnInfoData* pInfoData = taosArrayGet(pInfo->pResBlock->pDataBlock, pItem->dstSlotId);
pBlockInfo->pks[0].type = pInfoData->info.type;
pBlockInfo->pks[1].type = pInfoData->info.type;
if (IS_VAR_DATA_TYPE(pItem->dataType.type)) {
pBlockInfo->pks[0].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
pBlockInfo->pks[1].pData = taosMemoryCalloc(1, pInfoData->info.bytes);
}
}
}
}
code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
@ -4418,6 +4403,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->bSortRowId = false; pInfo->bSortRowId = false;
} }
prepareDataBlockBuf(pInfo->pResBlock, &pInfo->base.matchInfo);
pInfo->pSortInfo = generateSortByTsPkInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order); pInfo->pSortInfo = generateSortByTsPkInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false); pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);