diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index e0d681805c..0b5fac1216 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -162,14 +162,20 @@ typedef struct SGroupCacheLogicNode { bool grpColsMayBeNull; bool grpByUid; bool globalGrp; + bool batchFetch; SNodeList* pGroupCols; } SGroupCacheLogicNode; -typedef struct SDynQueryCtrlLogicNode { - SLogicNode node; - EDynQueryType qType; +typedef struct SDynQueryCtrlStbJoin { + bool batchJoin; SNodeList* pVgList; SNodeList* pUidList; +} SDynQueryCtrlStbJoin; + +typedef struct SDynQueryCtrlLogicNode { + SLogicNode node; + EDynQueryType qType; + SDynQueryCtrlStbJoin stbJoin; } SDynQueryCtrlLogicNode; typedef enum EModifyTableType { MODIFY_TABLE_TYPE_INSERT = 1, MODIFY_TABLE_TYPE_DELETE } EModifyTableType; @@ -445,13 +451,14 @@ typedef struct SGroupCachePhysiNode { bool grpColsMayBeNull; bool grpByUid; bool globalGrp; + bool batchFetch; SNodeList* pGroupCols; } SGroupCachePhysiNode; typedef struct SStbJoinDynCtrlBasic { + bool batchJoin; int32_t vgSlot[2]; int32_t uidSlot[2]; - bool batchJoin; } SStbJoinDynCtrlBasic; typedef struct SDynQueryCtrlPhysiNode { diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 0b2d1d2747..86627b6578 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -582,7 +582,6 @@ SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32 switch (pInfo->qType) { case DYN_QTYPE_STB_HASH: memcpy(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin)); - pInfo->stbJoin.basic.batchJoin = true; if (pInfo->stbJoin.basic.batchJoin) { code = initBatchStbJoinVgHash(&pInfo->stbJoin.ctx.prev); if (TSDB_CODE_SUCCESS != code) { diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 81a77dba27..c6a01db880 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -280,6 +280,8 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p } if (pBlock) { + qError("%s group cache retrieved block with groupId: %" PRIu64, GET_TASKID(pOperator->pTaskInfo), pBlock->info.id.groupId); + pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++; if (NULL == pGCache->pDownstreams[downstreamIdx].pBaseBlock) { code = buildGroupCacheBaseBlock(&pGCache->pDownstreams[downstreamIdx].pBaseBlock, pBlock); @@ -342,6 +344,60 @@ static void handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup->startOffset = pGroup->pVgCtx->fileSize; } + +static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId) { + taosThreadMutexInit(&pGroup->mutex, NULL); + pGroup->downstreamIdx = downstreamIdx; + pGroup->vgId = vgId; + pGroup->fileId = -1; + pGroup->startBlkId = -1; + pGroup->endBlkId = -1; + pGroup->startOffset = -1; + pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId)); +} + +static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp, int32_t vgId, int64_t uid) { + SGroupCacheOperatorInfo* pGCache = pOperator->info; + SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx]; + SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; + SGroupCacheData grpData = {0}; + initNewGroupData(pCtx, &grpData, pParam->downstreamIdx, vgId); + + while (true) { + if (0 != taosHashPut(pGrpHash, &uid, sizeof(uid), &grpData, sizeof(grpData))) { + if (terrno == TSDB_CODE_DUP_KEY) { + *ppGrp = taosHashAcquire(pGrpHash, &uid, sizeof(uid)); + if (*ppGrp) { + break; + } + } else { + return terrno; + } + } + + *ppGrp = taosHashAcquire(pGrpHash, &uid, sizeof(uid)); + if (*ppGrp && pParam->pChildren) { + SGcNewGroupInfo newGroup; + newGroup.pGroup = *ppGrp; + newGroup.vgId = vgId; + newGroup.uid = uid; + newGroup.pParam = taosArrayGetP(pParam->pChildren, 0); + + taosWLockLatch(&pCtx->grpLock); + if (NULL == taosArrayPush(pCtx->pNewGrpList, &newGroup)) { + taosWUnLockLatch(&pCtx->grpLock); + return TSDB_CODE_OUT_OF_MEMORY; + } + taosWUnLockLatch(&pCtx->grpLock); + + break; + } + } + + return TSDB_CODE_SUCCESS; +} + + static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcSessionCtx* pSession, bool* continueFetch) { int32_t code = TSDB_CODE_SUCCESS; SGroupCacheOperatorInfo* pGCache = pOperator->info; @@ -594,58 +650,6 @@ static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) { return TSDB_CODE_SUCCESS; } -static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId) { - taosThreadMutexInit(&pGroup->mutex, NULL); - pGroup->downstreamIdx = downstreamIdx; - pGroup->vgId = vgId; - pGroup->fileId = -1; - pGroup->startBlkId = -1; - pGroup->endBlkId = -1; - pGroup->startOffset = -1; - pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId)); -} - -static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp, int32_t vgId, int64_t uid) { - SGroupCacheOperatorInfo* pGCache = pOperator->info; - SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx]; - SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; - SGroupCacheData grpData = {0}; - initNewGroupData(pCtx, &grpData, pParam->downstreamIdx, vgId); - - while (true) { - if (0 != taosHashPut(pGrpHash, &uid, sizeof(uid), &grpData, sizeof(grpData))) { - if (terrno == TSDB_CODE_DUP_KEY) { - *ppGrp = taosHashAcquire(pGrpHash, &uid, sizeof(uid)); - if (*ppGrp) { - break; - } - } else { - return terrno; - } - } - - *ppGrp = taosHashAcquire(pGrpHash, &uid, sizeof(uid)); - if (*ppGrp && pParam->pChildren) { - SGcNewGroupInfo newGroup; - newGroup.pGroup = *ppGrp; - newGroup.vgId = vgId; - newGroup.uid = uid; - newGroup.pParam = taosArrayGetP(pParam->pChildren, 0); - - taosWLockLatch(&pCtx->grpLock); - if (NULL == taosArrayPush(pCtx->pNewGrpList, &newGroup)) { - taosWUnLockLatch(&pCtx->grpLock); - return TSDB_CODE_OUT_OF_MEMORY; - } - taosWUnLockLatch(&pCtx->grpLock); - - break; - } - } - - return TSDB_CODE_SUCCESS; -} - static FORCE_INLINE void initGroupCacheSessionCtx(SGcSessionCtx* pSession, SGcOperatorParam* pGcParam, SGroupCacheData* pGroup) { pSession->pParam = pGcParam; pSession->downstreamIdx = pGcParam->downstreamIdx; @@ -664,7 +668,7 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP SGroupCacheData* pGroup = taosHashAcquire(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid)); if (NULL == pGroup) { - code = addNewGroupData(pOperator, pParam, &pGroup, pGcParam->vgId, pGcParam->tbUid); + code = addNewGroupData(pOperator, pParam, &pGroup, pGCache->batchFetch ? -1 : pGcParam->vgId, pGcParam->tbUid); if (TSDB_CODE_SUCCESS != code) { return code; } @@ -733,6 +737,11 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { if (NULL == pInfo->pDownstreams[i].pVgTbHash) { return TSDB_CODE_OUT_OF_MEMORY; } + if (pInfo->batchFetch) { + SGcVgroupCtx vgCtx = {.pTbList = NULL, .lastUid = 0, .fileSize = 0, .fileId = i}; + int32_t defaultVg = -1; + tSimpleHashPut(pInfo->pDownstreams[i].pVgTbHash, &defaultVg, sizeof(defaultVg), &vgCtx, sizeof(vgCtx)); + } pInfo->pDownstreams[i].pNewGrpList = taosArrayInit(10, sizeof(SGcNewGroupInfo)); if (NULL == pInfo->pDownstreams[i].pNewGrpList) { @@ -795,7 +804,7 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t pInfo->maxCacheSize = -1; pInfo->grpByUid = pPhyciNode->grpByUid; pInfo->globalGrp = pPhyciNode->globalGrp; - pInfo->batchFetch = false; + pInfo->batchFetch = pPhyciNode->batchFetch; if (!pInfo->grpByUid) { qError("only group cache by uid is supported now"); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 43b086b7c0..90a7cebf81 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -852,8 +852,7 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) { SSDataBlock* result = doGroupedTableScan(pOperator); if (result != NULL) { if (pOperator->dynamicTask) { - STableKeyInfo* pKeyInfo = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentGroupId); - result->info.id.groupId = pKeyInfo->uid; + result->info.id.groupId = result->info.id.uid; } return result; } @@ -891,8 +890,7 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { SSDataBlock* result = doGroupedTableScan(pOperator); if (result != NULL) { if (pOperator->dynamicTask) { - STableKeyInfo* pKeyInfo = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentGroupId); - result->info.id.groupId = pKeyInfo->uid; + result->info.id.groupId = result->info.id.uid; } return result; } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 9463be6755..64f5db09c3 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -540,6 +540,7 @@ static int32_t logicGroupCacheCopy(const SGroupCacheLogicNode* pSrc, SGroupCache COPY_SCALAR_FIELD(grpColsMayBeNull); COPY_SCALAR_FIELD(grpByUid); COPY_SCALAR_FIELD(globalGrp); + COPY_SCALAR_FIELD(batchFetch); CLONE_NODE_LIST_FIELD(pGroupCols); return TSDB_CODE_SUCCESS; } @@ -547,8 +548,9 @@ static int32_t logicGroupCacheCopy(const SGroupCacheLogicNode* pSrc, SGroupCache static int32_t logicDynQueryCtrlCopy(const SDynQueryCtrlLogicNode* pSrc, SDynQueryCtrlLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); COPY_SCALAR_FIELD(qType); - CLONE_NODE_LIST_FIELD(pVgList); - CLONE_NODE_LIST_FIELD(pUidList); + COPY_SCALAR_FIELD(stbJoin.batchJoin); + CLONE_NODE_LIST_FIELD(stbJoin.pVgList); + CLONE_NODE_LIST_FIELD(stbJoin.pUidList); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index ef3181f412..c58c3d7077 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1228,6 +1228,9 @@ static int32_t jsonToLogicGroupCacheNode(const SJson* pJson, void* pObj) { } static const char* jkDynQueryCtrlLogicPlanQueryType = "QueryType"; +static const char* jkDynQueryCtrlLogicPlanStbJoinBatchJoin = "BatchJoin"; +static const char* jkDynQueryCtrlLogicPlanStbJoinVgList = "VgroupList"; +static const char* jkDynQueryCtrlLogicPlanStbJoinUidList = "UidList"; static int32_t logicDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) { const SDynQueryCtrlLogicNode* pNode = (const SDynQueryCtrlLogicNode*)pObj; @@ -1236,6 +1239,15 @@ static int32_t logicDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlLogicPlanQueryType, pNode->qType); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkDynQueryCtrlLogicPlanStbJoinBatchJoin, pNode->stbJoin.batchJoin); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkDynQueryCtrlLogicPlanStbJoinVgList, pNode->stbJoin.pVgList); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkDynQueryCtrlLogicPlanStbJoinUidList, pNode->stbJoin.pUidList); + } return code; } @@ -1247,6 +1259,15 @@ static int32_t jsonToLogicDynQueryCtrlNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkDynQueryCtrlLogicPlanQueryType, pNode->qType, code); } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetBoolValue(pJson, jkDynQueryCtrlLogicPlanStbJoinBatchJoin, &pNode->stbJoin.batchJoin); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkDynQueryCtrlLogicPlanStbJoinVgList, &pNode->stbJoin.pVgList); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkDynQueryCtrlLogicPlanStbJoinUidList, &pNode->stbJoin.pUidList); + } return code; } @@ -2929,6 +2950,7 @@ static const char* jkGroupCachePhysiPlanGroupCols = "GroupColumns"; static const char* jkGroupCachePhysiPlanGrpColsMayBeNull = "GroupColumnsMayBeNull"; static const char* jkGroupCachePhysiPlanGroupByUid = "GroupByUid"; static const char* jkGroupCachePhysiPlanGlobalGroup = "GlobalGroup"; +static const char* jkGroupCachePhysiPlanBatchFetch = "BatchFetch"; static int32_t physiGroupCacheNodeToJson(const void* pObj, SJson* pJson) { @@ -2944,6 +2966,9 @@ static int32_t physiGroupCacheNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkGroupCachePhysiPlanGlobalGroup, pNode->globalGrp); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkGroupCachePhysiPlanBatchFetch, pNode->batchFetch); + } if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkGroupCachePhysiPlanGroupCols, pNode->pGroupCols); } @@ -2963,6 +2988,9 @@ static int32_t jsonToPhysiGroupCacheNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkGroupCachePhysiPlanGlobalGroup, &pNode->globalGrp); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkGroupCachePhysiPlanBatchFetch, &pNode->batchFetch); + } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkGroupCachePhysiPlanGroupCols, &pNode->pGroupCols); } @@ -2970,6 +2998,7 @@ static int32_t jsonToPhysiGroupCacheNode(const SJson* pJson, void* pObj) { } static const char* jkDynQueryCtrlPhysiPlanQueryType = "QueryType"; +static const char* jkDynQueryCtrlPhysiPlanBatchJoin = "BatchJoin"; static const char* jkDynQueryCtrlPhysiPlanVgSlot0 = "VgSlot[0]"; static const char* jkDynQueryCtrlPhysiPlanVgSlot1 = "VgSlot[1]"; static const char* jkDynQueryCtrlPhysiPlanUidSlot0 = "UidSlot[0]"; @@ -2985,7 +3014,10 @@ static int32_t physiDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { switch (pNode->qType) { case DYN_QTYPE_STB_HASH: { - code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanVgSlot0, pNode->stbJoin.vgSlot[0]); + code = tjsonAddBoolToObject(pJson, jkDynQueryCtrlPhysiPlanBatchJoin, pNode->stbJoin.batchJoin); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanVgSlot0, pNode->stbJoin.vgSlot[0]); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanVgSlot1, pNode->stbJoin.vgSlot[1]); } @@ -3015,6 +3047,9 @@ static int32_t jsonToPhysiDynQueryCtrlNode(const SJson* pJson, void* pObj) { switch (pNode->qType) { case DYN_QTYPE_STB_HASH: { tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanQueryType, pNode->qType, code); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkDynQueryCtrlPhysiPlanBatchJoin, &pNode->stbJoin.batchJoin); + } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanVgSlot0, pNode->stbJoin.vgSlot[0], code); } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 6648186b4d..10e8674bc0 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -3530,6 +3530,7 @@ enum { PHY_GROUP_CACHE_CODE_GROUP_COLS_MAY_BE_NULL, PHY_GROUP_CACHE_CODE_GROUP_BY_UID, PHY_GROUP_CACHE_CODE_GLOBAL_GROUP, + PHY_GROUP_CACHE_CODE_BATCH_FETCH, PHY_GROUP_CACHE_CODE_GROUP_COLUMNS }; @@ -3549,6 +3550,9 @@ static int32_t physiGroupCacheNodeToMsg(const void* pObj, STlvEncoder* pEncoder) if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeBool(pEncoder, PHY_GROUP_CACHE_CODE_GLOBAL_GROUP, pNode->globalGrp); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_GROUP_CACHE_CODE_BATCH_FETCH, pNode->batchFetch); + } return code; } @@ -3575,6 +3579,9 @@ static int32_t msgToPhysiGroupCacheNode(STlvDecoder* pDecoder, void* pObj) { case PHY_GROUP_CACHE_CODE_GLOBAL_GROUP: code = tlvDecodeBool(pTlv, &pNode->globalGrp); break; + case PHY_GROUP_CACHE_CODE_BATCH_FETCH: + code = tlvDecodeBool(pTlv, &pNode->batchFetch); + break; default: break; } @@ -3587,6 +3594,7 @@ static int32_t msgToPhysiGroupCacheNode(STlvDecoder* pDecoder, void* pObj) { enum { PHY_DYN_QUERY_CTRL_CODE_BASE_NODE = 1, PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE, + PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_BATCH_JOIN, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT0, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT1, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT0, @@ -3603,7 +3611,10 @@ static int32_t physiDynQueryCtrlNodeToMsg(const void* pObj, STlvEncoder* pEncode if (TSDB_CODE_SUCCESS == code) { switch (pNode->qType) { case DYN_QTYPE_STB_HASH: { - code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT0, pNode->stbJoin.vgSlot[0]); + code = tlvEncodeBool(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_BATCH_JOIN, pNode->stbJoin.batchJoin); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT0, pNode->stbJoin.vgSlot[0]); + } if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT1, pNode->stbJoin.vgSlot[1]); } @@ -3635,6 +3646,9 @@ static int32_t msgToPhysiDynQueryCtrlNode(STlvDecoder* pDecoder, void* pObj) { case PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE: code = tlvDecodeEnum(pTlv, &pNode->qType, sizeof(pNode->qType)); break; + case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_BATCH_JOIN: + code = tlvDecodeBool(pTlv, &pNode->stbJoin.batchJoin); + break; case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT0: code = tlvDecodeEnum(pTlv, &pNode->stbJoin.vgSlot[0], sizeof(pNode->stbJoin.vgSlot[0])); break; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index f808c644d9..a62bb7d73c 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -3232,6 +3232,7 @@ static int32_t stbJoinOptCreateGroupCacheNode(SNodeList* pChildren, SLogicNode** pGrpCache->node.dynamicOp = true; pGrpCache->grpColsMayBeNull = false; pGrpCache->grpByUid = true; + pGrpCache->batchFetch = true; pGrpCache->node.pChildren = pChildren; pGrpCache->node.pTargets = nodesMakeList(); if (NULL == pGrpCache->node.pTargets) { @@ -3337,6 +3338,7 @@ static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pPrev, SLogicNode* p } pDynCtrl->qType = DYN_QTYPE_STB_HASH; + pDynCtrl->stbJoin.batchJoin = true; if (TSDB_CODE_SUCCESS == code) { pDynCtrl->node.pChildren = nodesMakeList(); @@ -3346,24 +3348,24 @@ static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pPrev, SLogicNode* p } if (TSDB_CODE_SUCCESS == code) { - pDynCtrl->pVgList = nodesMakeList(); - if (NULL == pDynCtrl->pVgList) { + pDynCtrl->stbJoin.pVgList = nodesMakeList(); + if (NULL == pDynCtrl->stbJoin.pVgList) { code = TSDB_CODE_OUT_OF_MEMORY; } } if (TSDB_CODE_SUCCESS == code) { - pDynCtrl->pUidList = nodesMakeList(); - if (NULL == pDynCtrl->pUidList) { + pDynCtrl->stbJoin.pUidList = nodesMakeList(); + if (NULL == pDynCtrl->stbJoin.pUidList) { code = TSDB_CODE_OUT_OF_MEMORY; } } SJoinLogicNode* pHJoin = (SJoinLogicNode*)pPrev; - nodesListStrictAppend(pDynCtrl->pUidList, nodesListGetNode(pHJoin->node.pTargets, 0)); - nodesListStrictAppend(pDynCtrl->pUidList, nodesListGetNode(pHJoin->node.pTargets, 2)); - nodesListStrictAppend(pDynCtrl->pVgList, nodesListGetNode(pHJoin->node.pTargets, 1)); - nodesListStrictAppend(pDynCtrl->pVgList, nodesListGetNode(pHJoin->node.pTargets, 3)); + nodesListStrictAppend(pDynCtrl->stbJoin.pUidList, nodesListGetNode(pHJoin->node.pTargets, 0)); + nodesListStrictAppend(pDynCtrl->stbJoin.pUidList, nodesListGetNode(pHJoin->node.pTargets, 2)); + nodesListStrictAppend(pDynCtrl->stbJoin.pVgList, nodesListGetNode(pHJoin->node.pTargets, 1)); + nodesListStrictAppend(pDynCtrl->stbJoin.pVgList, nodesListGetNode(pHJoin->node.pTargets, 3)); if (TSDB_CODE_SUCCESS == code) { nodesListStrictAppend(pDynCtrl->node.pChildren, (SNode*)pPrev); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index ff0f90498f..8afbc4eb40 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -988,6 +988,7 @@ static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh pGrpCache->grpColsMayBeNull = pLogicNode->grpColsMayBeNull; pGrpCache->grpByUid = pLogicNode->grpByUid; pGrpCache->globalGrp = pLogicNode->globalGrp; + pGrpCache->batchFetch = pLogicNode->batchFetch; SDataBlockDescNode* pChildDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc; int32_t code = TSDB_CODE_SUCCESS; if (TSDB_CODE_SUCCESS == code) { @@ -1002,21 +1003,15 @@ static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh return code; } - -static int32_t createDynQueryCtrlPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SDynQueryCtrlLogicNode* pLogicNode, - SPhysiNode** pPhyNode) { - SDynQueryCtrlPhysiNode* pDynCtrl = - (SDynQueryCtrlPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pLogicNode, QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL); - if (NULL == pDynCtrl) { - return TSDB_CODE_OUT_OF_MEMORY; - } +static int32_t updateDynQueryCtrlStbJoinInfo(SPhysiPlanContext* pCxt, SNodeList* pChildren, SDynQueryCtrlLogicNode* pLogicNode, + SDynQueryCtrlPhysiNode* pDynCtrl) { SDataBlockDescNode* pPrevDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc; SNodeList* pVgList = NULL; SNodeList* pUidList = NULL; - int32_t code = setListSlotId(pCxt, pPrevDesc->dataBlockId, -1, pLogicNode->pVgList, &pVgList); + int32_t code = setListSlotId(pCxt, pPrevDesc->dataBlockId, -1, pLogicNode->stbJoin.pVgList, &pVgList); if (TSDB_CODE_SUCCESS == code) { - code = setListSlotId(pCxt, pPrevDesc->dataBlockId, -1, pLogicNode->pUidList, &pUidList); + code = setListSlotId(pCxt, pPrevDesc->dataBlockId, -1, pLogicNode->stbJoin.pUidList, &pUidList); } if (TSDB_CODE_SUCCESS == code) { SNode* pNode = NULL; @@ -1030,10 +1025,32 @@ static int32_t createDynQueryCtrlPhysiNode(SPhysiPlanContext* pCxt, SNodeList* p pDynCtrl->stbJoin.uidSlot[i] = ((SColumnNode*)pNode)->slotId; ++i; } + pDynCtrl->stbJoin.batchJoin = pLogicNode->stbJoin.batchJoin; } + + return code; +} + +static int32_t createDynQueryCtrlPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SDynQueryCtrlLogicNode* pLogicNode, + SPhysiNode** pPhyNode) { + int32_t code = TSDB_CODE_SUCCESS; + SDynQueryCtrlPhysiNode* pDynCtrl = + (SDynQueryCtrlPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pLogicNode, QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL); + if (NULL == pDynCtrl) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + switch (pLogicNode->qType) { + case DYN_QTYPE_STB_HASH: + code = updateDynQueryCtrlStbJoinInfo(pCxt, pChildren, pLogicNode, pDynCtrl); + break; + default: + planError("Invalid dyn query ctrl type:%d", pLogicNode->qType); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + if (TSDB_CODE_SUCCESS == code) { pDynCtrl->qType = pLogicNode->qType; - *pPhyNode = (SPhysiNode*)pDynCtrl; }