diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a96f737cc2..6cbcd9d4be 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2052,25 +2052,16 @@ typedef struct { typedef struct SOperatorParam { - SArray* pOpParams; //SArray + int32_t opType; + int32_t downstreamIdx; + void* value; + SArray* pChildren; //SArray } SOperatorParam; -typedef struct SOperatorSpecParam { - int32_t opType; - void* value; -} SOperatorSpecParam; - -typedef struct SOperatorBaseParam { - SOperatorParam* pChild; -} SOperatorBaseParam; - - typedef struct STableScanOperatorParam { - SOperatorParam* pChild; SArray* pUidList; } STableScanOperatorParam; - typedef struct { SMsgHead header; uint64_t sId; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 6ef6ccbb4d..e1a06f515b 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5500,71 +5500,70 @@ void tFreeSSubQueryMsg(SSubQueryMsg *pReq) { } int32_t tSerializeSOperatorParam(SEncoder* pEncoder, SOperatorParam* pOpParam) { - int32_t n = taosArrayGetSize(pOpParam->pOpParams); + if (tEncodeI32(pEncoder, pOpParam->opType) < 0) return -1; + switch (pOpParam->opType) { + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: { + STableScanOperatorParam* pScan = (STableScanOperatorParam*)pOpParam->value; + int32_t uidNum = taosArrayGetSize(pScan->pUidList); + if (tEncodeI32(pEncoder, uidNum) < 0) return -1; + for (int32_t m = 0; m < uidNum; ++m) { + int64_t* pUid = taosArrayGet(pScan->pUidList, m); + if (tEncodeI64(pEncoder, *pUid) < 0) return -1; + } + break; + } + default: + return TSDB_CODE_INVALID_PARA; + } + + int32_t n = taosArrayGetSize(pOpParam->pChildren); if (tEncodeI32(pEncoder, n) < 0) return -1; for (int32_t i = 0; i < n; ++i) { - SOperatorSpecParam* pSpec = (SOperatorSpecParam*)taosArrayGet(pOpParam->pOpParams, i); - if (tEncodeI32(pEncoder, pSpec->opType) < 0) return -1; - switch (pSpec->opType) { - case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: { - STableScanOperatorParam* pScan = (STableScanOperatorParam*)pSpec->value; - if (pScan->pChild) { - if (tSerializeSOperatorParam(pEncoder, pScan->pChild) < 0) return -1; - } else { - if (tEncodeI32(pEncoder, 0) < 0) return -1; - } - int32_t uidNum = taosArrayGetSize(pScan->pUidList); - if (tEncodeI32(pEncoder, uidNum) < 0) return -1; - for (int32_t m = 0; m < uidNum; ++m) { - int64_t* pUid = taosArrayGet(pScan->pUidList, m); - if (tEncodeI64(pEncoder, *pUid) < 0) return -1; - } - break; - } - default: - return TSDB_CODE_INVALID_PARA; - } + SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet(pOpParam->pChildren, i); + if (tSerializeSOperatorParam(pEncoder, pChild) < 0) return -1; } return 0; } -int32_t tDeserializeSOperatorParam(SDecoder *pDecoder, SOperatorParam* pOpParam, int32_t specNum) { - pOpParam->pOpParams = taosArrayInit(specNum, sizeof(SOperatorSpecParam)); - if (NULL == pOpParam->pOpParams) return -1; - - SOperatorSpecParam specParam; - for (int32_t i = 0; i < specNum; ++i) { - if (tDecodeI32(pDecoder, &specParam.opType) < 0) return -1; - switch (specParam.opType) { - case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: { - STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam)); - if (NULL == pScan) return -1; - int32_t childSpecNum = 0; - if (tDecodeI32(pDecoder, &childSpecNum) < 0) return -1; - if (childSpecNum > 0) { - pScan->pChild = taosMemoryMalloc(sizeof(SOperatorParam)); - if (NULL == pScan->pChild) return -1; - if (tDeserializeSOperatorParam(pDecoder, pScan->pChild, childSpecNum) < 0) return -1; +int32_t tDeserializeSOperatorParam(SDecoder *pDecoder, SOperatorParam* pOpParam) { + if (tDecodeI32(pDecoder, &pOpParam->opType) < 0) return -1; + switch (pOpParam->opType) { + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: { + STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam)); + if (NULL == pScan) return -1; + int32_t uidNum = 0; + int64_t uid = 0; + if (tDecodeI32(pDecoder, &uidNum) < 0) return -1; + if (uidNum > 0) { + pScan->pUidList = taosArrayInit(uidNum, sizeof(int64_t)); + if (NULL == pScan->pUidList) return -1; + for (int32_t m = 0; m < uidNum; ++m) { + if (tDecodeI64(pDecoder, &uid) < 0) return -1; + taosArrayPush(pScan->pUidList, &uid); } - int32_t uidNum = 0; - int64_t uid = 0; - if (tDecodeI32(pDecoder, &uidNum) < 0) return -1; - if (uidNum > 0) { - pScan->pUidList = taosArrayInit(uidNum, sizeof(int64_t)); - if (NULL == pScan->pUidList) return -1; - for (int32_t m = 0; m < uidNum; ++m) { - if (tDecodeI64(pDecoder, &uid) < 0) return -1; - taosArrayPush(pScan->pUidList, &uid); - } - } - specParam.value = pScan; - break; + } else { + pScan->pUidList = NULL; } - default: - return TSDB_CODE_INVALID_PARA; + break; } - taosArrayPush(pOpParam->pOpParams, &specParam); + default: + return TSDB_CODE_INVALID_PARA; + } + + int32_t childrenNum = 0; + if (tDecodeI32(pDecoder, &childrenNum) < 0) return -1; + if (childrenNum > 0) { + pOpParam->pChildren = taosArrayInit(childrenNum, POINTER_BYTES); + if (NULL == pOpParam->pChildren) return -1; + for (int32_t i = 0; i < childrenNum; ++i) { + SOperatorParam* pChild = taosMemoryCalloc(1, sizeof(SOperatorParam)); + if (NULL == pChild) return -1; + if (tDeserializeSOperatorParam(pDecoder, pChild) < 0) return -1; + taosArrayPush(pOpParam->pChildren, &pChild); + } + } else { + pOpParam->pChildren = NULL; } return 0; @@ -5587,6 +5586,7 @@ int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) { if (tEncodeU64(&encoder, pReq->taskId) < 0) return -1; if (tEncodeI32(&encoder, pReq->execId) < 0) return -1; if (pReq->pOpParam) { + if (tEncodeI32(&encoder, 1) < 0) return -1; if (tSerializeSOperatorParam(&encoder, pReq->pOpParam) < 0) return -1; } else { if (tEncodeI32(&encoder, 0) < 0) return -1; @@ -5623,12 +5623,12 @@ int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1; - int32_t specNum = 0; - if (tDecodeI32(&decoder, &specNum) < 0) return -1; - if (specNum > 0) { + int32_t paramNum = 0; + if (tDecodeI32(&decoder, ¶mNum) < 0) return -1; + if (paramNum > 0) { pReq->pOpParam = taosMemoryMalloc(sizeof(*pReq->pOpParam)); if (NULL == pReq->pOpParam) return -1; - if (tDeserializeSOperatorParam(&decoder, pReq->pOpParam, specNum) < 0) return -1; + if (tDeserializeSOperatorParam(&decoder, pReq->pOpParam) < 0) return -1; } tEndDecode(&decoder); diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 59bb83ec9e..b73e347f5d 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -149,11 +149,9 @@ typedef struct SLimitInfo { } SLimitInfo; typedef struct SSortMergeJoinOperatorParam { - SOperatorParam* pChild; } SSortMergeJoinOperatorParam; typedef struct SExchangeOperatorParam { - SOperatorParam* pChild; int32_t vgId; int32_t srcOpType; SArray* uidList; diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index d5e65be0ee..62b06f62a5 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -62,6 +62,11 @@ typedef struct SGcSessionCtx { SGcBlkBufInfo* pLastBlk; } SGcSessionCtx; +typedef struct SGcExecInfo { + int32_t downstreamNum; + int64_t* pDownstreamBlkNum; +} SGcExecInfo; + typedef struct SGroupCacheOperatorInfo { SSHashObj* pSessionHash; SGroupColsInfo groupColsInfo; @@ -69,6 +74,7 @@ typedef struct SGroupCacheOperatorInfo { SSHashObj* pBlkHash; int64_t pCurrentId; SGcSessionCtx* pCurrent; + SGcExecInfo execInfo; } SGroupCacheOperatorInfo; #ifdef __cplusplus diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 44be65f74b..8bd80c438f 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -73,7 +73,7 @@ typedef struct SOperatorInfo { SExecTaskInfo* pTaskInfo; SOperatorCostInfo cost; SResultInfo resultInfo; - SOperatorBaseParam* pOperatorParam; + SOperatorParam* pOperatorParam; SOperatorParam** pDownstreamParams; struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 1cf91d8295..842b21097e 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -41,48 +41,43 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, if (NULL == *ppRes) { return TSDB_CODE_OUT_OF_MEMORY; } - (*ppRes)->pOpParams = taosArrayInit(1, sizeof(SOperatorSpecParam)); + (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES); if (NULL == *ppRes) { return TSDB_CODE_OUT_OF_MEMORY; } + if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) { + return TSDB_CODE_OUT_OF_MEMORY; + } SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam)); if (NULL == pGc) { return TSDB_CODE_OUT_OF_MEMORY; } - pGc->pChild = pChild; pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1); pGc->downstreamIdx = downstreamIdx; pGc->needCache = needCache; pGc->pGroupValue = pGrpValue; pGc->groupValueSize = grpValSize; - SOperatorSpecParam specParam; - specParam.opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE; - specParam.value = pGc; - - taosArrayPush((*ppRes)->pOpParams, &specParam); + (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE; + (*ppRes)->value = pGc; return TSDB_CODE_SUCCESS; } -static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t* pVgId, int64_t* pUid, SOperatorParam* pChild) { +static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid, SOperatorParam* pChild) { *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); if (NULL == *ppRes) { return TSDB_CODE_OUT_OF_MEMORY; } - (*ppRes)->pOpParams = taosArrayInit(1, sizeof(SOperatorSpecParam)); - if (NULL == *ppRes) { - return TSDB_CODE_OUT_OF_MEMORY; - } + (*ppRes)->pChildren = NULL; SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam)); if (NULL == pExc) { return TSDB_CODE_OUT_OF_MEMORY; } - pExc->pChild = pChild; pExc->vgId = *pVgId; pExc->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pExc->uidList = taosArrayInit(1, sizeof(int64_t)); @@ -92,12 +87,10 @@ static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, i } taosArrayPush(pExc->uidList, pUid); - SOperatorSpecParam specParam; - specParam.opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; - specParam.value = pExc; - - taosArrayPush((*ppRes)->pOpParams, &specParam); - + (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; + (*ppRes)->downstreamIdx = downstreamIdx; + (*ppRes)->value = pExc; + return TSDB_CODE_SUCCESS; } @@ -106,28 +99,24 @@ static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, if (NULL == *ppRes) { return TSDB_CODE_OUT_OF_MEMORY; } - (*ppRes)->pOpParams = taosArrayInit(2, sizeof(SOperatorSpecParam)); + (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES); if (NULL == *ppRes) { return TSDB_CODE_OUT_OF_MEMORY; } - - SSortMergeJoinOperatorParam* pJoin0 = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam)); - SSortMergeJoinOperatorParam* pJoin1 = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam)); - if (NULL == pJoin0 || NULL == pJoin1) { - taosMemoryFree(pJoin0); - taosMemoryFree(pJoin1); + if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) { return TSDB_CODE_OUT_OF_MEMORY; } - pJoin0->pChild = pChild0; - pJoin1->pChild = pChild1; - - SOperatorSpecParam specParam; - specParam.opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; - specParam.value = pJoin0; - taosArrayPush((*ppRes)->pOpParams, &specParam); - specParam.value = pJoin1; - taosArrayPush((*ppRes)->pOpParams, &specParam); + SSortMergeJoinOperatorParam* pJoin = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam)); + if (NULL == pJoin) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; + (*ppRes)->value = pJoin; return TSDB_CODE_SUCCESS; } @@ -142,11 +131,11 @@ static int32_t buildStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJ SOperatorParam* pExcParam0 = NULL; SOperatorParam* pExcParam1 = NULL; SOperatorParam* pGcParam0 = NULL; - SOperatorParam* pGcParam1 = NULL; + SOperatorParam* pGcParam1 = NULL; - int32_t code = buildExchangeOperatorParam(&pExcParam0, (int32_t*)(pVg0->pData + pVg0->info.bytes * rowIdx), (int64_t*)(pUid0->pData + pUid0->info.bytes * rowIdx), NULL); + int32_t code = buildExchangeOperatorParam(&pExcParam0, 0, (int32_t*)(pVg0->pData + pVg0->info.bytes * rowIdx), (int64_t*)(pUid0->pData + pUid0->info.bytes * rowIdx), NULL); if (TSDB_CODE_SUCCESS == code) { - code = buildExchangeOperatorParam(&pExcParam1, (int32_t*)(pVg1->pData + pVg1->info.bytes * rowIdx), (int64_t*)(pUid1->pData + pUid1->info.bytes * rowIdx), NULL); + code = buildExchangeOperatorParam(&pExcParam1, 1, (int32_t*)(pVg1->pData + pVg1->info.bytes * rowIdx), (int64_t*)(pUid1->pData + pUid1->info.bytes * rowIdx), NULL); } if (TSDB_CODE_SUCCESS == code) { code = buildGroupCacheOperatorParam(&pGcParam0, 0, false, pUid0->pData + pUid0->info.bytes * rowIdx, pUid0->info.bytes, pExcParam0); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 1ef87875ae..978216914b 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -93,7 +93,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn } SRetrieveTableRsp* pRsp = pDataInfo->pRsp; - SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i); + SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pDataInfo->index); // todo SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; @@ -422,27 +422,20 @@ int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, in if (NULL == *ppRes) { return TSDB_CODE_OUT_OF_MEMORY; } - (*ppRes)->pOpParams = taosArrayInit(1, sizeof(SOperatorSpecParam)); - if (NULL == *ppRes) { - return TSDB_CODE_OUT_OF_MEMORY; - } STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam)); if (NULL == pScan) { return TSDB_CODE_OUT_OF_MEMORY; } - pScan->pChild = NULL; pScan->pUidList = taosArrayDup(pUidList, NULL); if (NULL == pScan->pUidList) { return TSDB_CODE_OUT_OF_MEMORY; } - SOperatorSpecParam specParam; - specParam.opType = srcOpType; - specParam.value = pScan; - - taosArrayPush((*ppRes)->pOpParams, &specParam); + (*ppRes)->opType = srcOpType; + (*ppRes)->value = pScan; + (*ppRes)->pChildren = NULL; return TSDB_CODE_SUCCESS; } @@ -753,7 +746,7 @@ _error: int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) { SExchangeInfo* pExchangeInfo = pOperator->info; - SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperator->pOperatorParam; + SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperator->pOperatorParam->value; int32_t* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pParam->vgId, sizeof(pParam->vgId)); if (NULL == pIdx) { qError("No exchange source for vgId: %d", pParam->vgId); diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 47dd761a18..939b516d73 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -66,9 +66,23 @@ static void freeGroupCacheBufPage(void* param) { taosMemoryFree(pInfo->data); } +static void logGroupCacheExecInfo(SGroupCacheOperatorInfo* pGrpCacheOperator) { + char* buf = taosMemoryMalloc(pGrpCacheOperator->execInfo.downstreamNum * 32 + 100); + if (NULL == buf) { + return; + } + int32_t offset = sprintf(buf, "groupCache exec info, downstreamBlkNum:"); + for (int32_t i = 0; i < pGrpCacheOperator->execInfo.downstreamNum; ++i) { + offset += sprintf(buf + offset, " %" PRId64 , pGrpCacheOperator->execInfo.pDownstreamBlkNum[i]); + } + qDebug("%s", buf); +} + static void destroyGroupCacheOperator(void* param) { SGroupCacheOperatorInfo* pGrpCacheOperator = (SGroupCacheOperatorInfo*)param; + logGroupCacheExecInfo(pGrpCacheOperator); + taosMemoryFree(pGrpCacheOperator->groupColsInfo.pColsInfo); taosMemoryFree(pGrpCacheOperator->groupColsInfo.pBuf); taosArrayDestroyEx(pGrpCacheOperator->pBlkBufs, freeGroupCacheBufPage); @@ -191,7 +205,7 @@ static void addBlkToGroupCache(struct SOperatorInfo* pOperator, SSDataBlock* pBl SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator) { SGroupCacheOperatorInfo* pGCache = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SGcOperatorParam* pParam = (SGcOperatorParam*)pOperator->pOperatorParam; + SGcOperatorParam* pParam = (SGcOperatorParam*)pOperator->pOperatorParam->value; SGcSessionCtx* pSession = NULL; SSDataBlock* pRes = NULL; int32_t code = TSDB_CODE_SUCCESS; @@ -211,8 +225,12 @@ SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator) { break; } + pGCache->execInfo.pDownstreamBlkNum[pSession->downstreamIdx]++; + if (pGCache->pCurrent->needCache) { addBlkToGroupCache(pOperator, pBlock, &pRes); + } else { + pRes = pBlock; } break; } @@ -220,6 +238,15 @@ SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator) { return pRes; } +static int32_t initGroupCacheExecInfo(SOperatorInfo* pOperator) { + SGroupCacheOperatorInfo* pInfo = pOperator->info; + pInfo->execInfo.downstreamNum = pOperator->numOfDownstream; + pInfo->execInfo.pDownstreamBlkNum = taosMemoryCalloc(pOperator->numOfDownstream, sizeof(int64_t)); + if (NULL == pInfo->execInfo.pDownstreamBlkNum) { + return TSDB_CODE_OUT_OF_MEMORY; + } + return TSDB_CODE_SUCCESS; +} SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) { @@ -263,6 +290,11 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t goto _error; } + code = initGroupCacheExecInfo(pOperator); + if (TSDB_CODE_SUCCESS != code) { + goto _error; + } + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getFromGroupCache, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); return pOperator; diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index b99562f879..9f0659cb2e 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -605,26 +605,14 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf return TSDB_CODE_SUCCESS; } -void *getOperatorParam(int32_t opType, SOperatorParam* param, int32_t idx) { - if (NULL == param) { - return NULL; - } - int32_t opNum = taosArrayGetSize(param->pOpParams); - if (idx >= opNum) { - return NULL; - } - SOperatorSpecParam *pSpecParam = taosArrayGet(param->pOpParams, idx); - if (opType == pSpecParam->opType) { - return pSpecParam->value; - } - return NULL; -} - int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pParam) { if (NULL == pParam) { + pOperator->pOperatorParam = NULL; taosMemoryFreeClear(pOperator->pDownstreamParams); return TSDB_CODE_SUCCESS; } + + pOperator->pOperatorParam = (pParam->opType == pOperator->operatorType) ? pParam : NULL; if (NULL == pOperator->pDownstreamParams) { pOperator->pDownstreamParams = taosMemoryCalloc(pOperator->numOfDownstream, POINTER_BYTES); @@ -632,16 +620,24 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pPara return TSDB_CODE_OUT_OF_MEMORY; } } - - SOperatorBaseParam* pBaseParam = NULL; - for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { - pBaseParam = getOperatorParam(pOperator->operatorType, pParam, i); - if (pBaseParam) { - pOperator->pDownstreamParams[i] = pBaseParam->pChild; - } else { + if (NULL == pOperator->pOperatorParam) { + for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { pOperator->pDownstreamParams[i] = pParam; } + return TSDB_CODE_SUCCESS; + } + + memset(pOperator->pDownstreamParams, 0, pOperator->numOfDownstream * POINTER_BYTES); + + int32_t childrenNum = taosArrayGetSize(pOperator->pOperatorParam->pChildren); + if (childrenNum <= 0) { + return TSDB_CODE_SUCCESS; + } + + for (int32_t i = 0; i < childrenNum; ++i) { + SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet(pOperator->pOperatorParam->pChildren, i); + pOperator->pDownstreamParams[pChild->downstreamIdx] = pChild; } return TSDB_CODE_SUCCESS; @@ -671,8 +667,7 @@ SSDataBlock* getNextBlockFromDownstreamOnce(struct SOperatorInfo* pOperator, int SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) { - pOperator->pOperatorParam = getOperatorParam(pOperator->operatorType, pParam, 0); - int32_t code = setOperatorParams(pOperator, pOperator->pOperatorParam ? pOperator->pOperatorParam->pChild : pParam); + int32_t code = setOperatorParams(pOperator, pParam); if (TSDB_CODE_SUCCESS != code) { pOperator->pTaskInfo->code = code; T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 26ef9aae4b..e7cefb8cd6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -789,7 +789,7 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t code = 0; STableListInfo* pListInfo = pInfo->base.pTableListInfo; - STableScanOperatorParam* pParam = (STableScanOperatorParam*)pOperator->pOperatorParam; + STableScanOperatorParam* pParam = (STableScanOperatorParam*)pOperator->pOperatorParam->value; int32_t num = taosArrayGetSize(pParam->pUidList); if (num <= 0) { qError("empty table scan uid list"); @@ -797,6 +797,9 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) { } qDebug("add total %d dynamic tables to scan", num); + + taosArrayClear(pListInfo->pTableList); + taosHashClear(pListInfo->map); for (int32_t i = 0; i < num; ++i) { uint64_t* pUid = taosArrayGet(pParam->pUidList, i); @@ -816,6 +819,32 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) { return code; } +static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) { + STableScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; + + // reset value for the next group data output + pOperator->status = OP_OPENED; + resetLimitInfoForNextGroup(&pInfo->base.limitInfo); + + int32_t num = 0; + STableKeyInfo* pList = NULL; + tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num); + + pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, pList, num); + pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); + pInfo->scanTimes = 0; + + SSDataBlock* result = doGroupedTableScan(pOperator); + if (result != NULL) { + return result; + } + + setOperatorCompleted(pOperator); + return NULL; +} + static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { STableScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -827,6 +856,10 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } + + if (pInfo->currentGroupId != -1) { + return startNextGroupScan(pOperator); + } } // scan table one by one sequentially @@ -895,25 +928,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { return NULL; } - // reset value for the next group data output - pOperator->status = OP_OPENED; - resetLimitInfoForNextGroup(&pInfo->base.limitInfo); - - int32_t num = 0; - STableKeyInfo* pList = NULL; - tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num); - - pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, pList, num); - pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); - pInfo->scanTimes = 0; - - result = doGroupedTableScan(pOperator); - if (result != NULL) { - return result; - } - - setOperatorCompleted(pOperator); - return NULL; + return startNextGroupScan(pOperator); } } diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 4f43f31226..0a4470da00 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -124,6 +124,7 @@ typedef struct SQWTaskCtx { int32_t queryMsgType; int32_t fetchMsgType; int32_t level; + int32_t dynExecId; uint64_t sId; bool queryGotData; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 06becf99d2..ab676076f1 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -488,14 +488,22 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i } int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg) { - if (!ctx->queryExecDone || !ctx->queryEnd) { - QW_TASK_ELOG("dynamic task prev exec not finished, execDone:%d, queryEnd:%d", ctx->queryExecDone, ctx->queryEnd); + if (!atomic_val_compare_exchange_8((int8_t*)&ctx->queryExecDone, true, false)) { + QW_TASK_ELOG("dynamic task prev exec not finished, execDone:%d", ctx->queryExecDone); return TSDB_CODE_ACTION_IN_PROGRESS; } + if (!atomic_val_compare_exchange_8((int8_t*)&ctx->queryEnd, true, false)) { + QW_TASK_ELOG("dynamic task prev exec not finished, queryEnd:%d", ctx->queryEnd); + return TSDB_CODE_ACTION_IN_PROGRESS; + } qUpdateOperatorParam(ctx->taskHandle, qwMsg->msg); + QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); + atomic_store_8((int8_t *)&ctx->queryInQueue, 1); + + QW_TASK_DLOG("the %dth dynamic task exec started", ctx->dynExecId++); QW_ERR_RET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo)); @@ -515,7 +523,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_SET_PHASE(ctx, phase); - if (atomic_load_8((int8_t *)&ctx->queryEnd)) { + if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) { QW_TASK_ELOG_E("query already end"); QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR); }