From 0718859c0a9bcb44be29c682e5e46d303e339890 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 5 Jul 2023 19:24:06 +0800 Subject: [PATCH] enh: support passing params between nodes --- include/common/tmsg.h | 31 +++++-- include/libs/executor/executor.h | 2 + source/common/src/tmsg.c | 85 +++++++++++++++++++ source/libs/executor/inc/executorInt.h | 1 + source/libs/executor/inc/operator.h | 13 --- source/libs/executor/inc/querytask.h | 3 + .../libs/executor/src/dynqueryctrloperator.c | 1 + source/libs/executor/src/exchangeoperator.c | 48 +++++++++-- source/libs/executor/src/executor.c | 21 ++++- source/libs/executor/src/operator.c | 31 ++++--- source/libs/executor/src/scanoperator.c | 12 +++ source/libs/qworker/src/qwMsg.c | 2 +- source/libs/qworker/src/qwUtil.c | 21 +++++ source/libs/qworker/src/qworker.c | 28 ++++-- 14 files changed, 256 insertions(+), 43 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 63805d83ae..e4038b27de 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1842,13 +1842,32 @@ typedef struct { int32_t tversion; } SResReadyRsp; +typedef struct SOperatorSpecParam { + int32_t opType; + void* value; +} SOperatorSpecParam; + +typedef struct SOperatorBaseParam { + SOperatorParam* pChild; +} SOperatorBaseParam; + +typedef struct SOperatorParam { + SArray* pOpParams; //SArray +} SOperatorParam; + +typedef struct STableScanOperatorParam { + SOperatorParam* pChild; + SArray* pUidList; +} STableScanOperatorParam; + + typedef struct { - SMsgHead header; - uint64_t sId; - uint64_t queryId; - uint64_t taskId; - int32_t execId; - void* opParam; + SMsgHead header; + uint64_t sId; + uint64_t queryId; + uint64_t taskId; + int32_t execId; + SOperatorParam* pOpParam; } SResFetchReq; int32_t tSerializeSResFetchReq(void* buf, int32_t bufLen, SResFetchReq* pReq); diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 3bef15f3a7..5cc8eec39e 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -132,6 +132,8 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, */ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd); +bool qIsDynamicExecTask(qTaskInfo_t tinfo); + /** * Create the exec task object according to task json * @param readHandle diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index debb93e8ba..74bca00cb8 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5499,6 +5499,78 @@ void tFreeSSubQueryMsg(SSubQueryMsg *pReq) { taosMemoryFreeClear(pReq->msg); } +int32_t tSerializeSOperatorParam(SEncoder* pEncoder, SOperatorParam* pOpParam) { + int32_t n = taosArrayGetSize(pOpParam->pOpParams); + if (tEncodeI32(pEncoder, n) < 0) return -1; + for (int32_t i = 0; i < n; ++i) { + SOperatorSpecParam* pSpec = (SOperatorSpecParam*)taosArrayGet(pOpParam->pOpParams, i); + if (tEncodeI32(pEncoder, pSpec->opType) < 0) return -1; + switch (pSpec->opType) { + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: { + STableScanOperatorParam* pScan = (STableScanOperatorParam*)pSpec->value; + if (pScan->pChild) { + if (tSerializeSOperatorParam(pEncoder, pScan->pChild) < 0) return -1; + } else { + if (tEncodeI32(pEncoder, 0) < 0) return -1; + } + int32_t uidNum = taosArrayGetSize(pScan->pUidList); + if (tEncodeI32(pEncoder, uidNum) < 0) return -1; + for (int32_t m = 0; m < uidNum; ++m) { + int64_t* pUid = taosArrayGet(pScan->pUidList, m); + if (tEncodeI64(pEncoder, *pUid) < 0) return -1; + } + break; + } + default: + return TSDB_CODE_INVALID_PARA; + } + } + + return 0; +} + +int32_t tDeserializeSOperatorParam(SDecoder *pDecoder, SOperatorParam* pOpParam, int32_t specNum) { + pOpParam->pOpParams = taosArrayInit(specNum, sizeof(SOperatorSpecParam)) + if (NULL == pOpParam->pOpParams) return -1; + + SOperatorSpecParam specParam; + for (int32_t i = 0; i < specNum; ++i) { + if (tDecodeI32(pDecoder, &specParam.opType) < 0) return -1; + switch (specParam.opType) { + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: { + STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam)); + if (NULL == pScan) return -1; + int32_t childSpecNum = 0; + if (tDecodeI32(pDecoder, &childSpecNum) < 0) return -1; + if (childSpecNum > 0) { + pScan->pChild = taosMemoryMalloc(sizeof(SOperatorParam)); + if (NULL == pScan->pChild) return -1; + if (tDeserializeSOperatorParam(pDecoder, pScan->pChild, childSpecNum) < 0) return -1; + } + int32_t uidNum = 0; + int64_t uid = 0; + if (tDecodeI32(pDecoder, &uidNum) < 0) return -1; + if (uidNum > 0) { + pScan->pUidList = taosArrayInit(uidNum, sizeof(int64_t)); + if (NULL == pScan->pUidList) return -1; + for (int32_t m = 0; m < uidNum; ++m) { + if (tDecodeI64(pDecoder, &uid) < 0) return -1; + taosArrayPush(pScan->pUidList, &uid); + } + } + specParam.value = pScan; + break; + } + default: + return TSDB_CODE_INVALID_PARA; + } + taosArrayPush(pOpParam->pOpParams, &specParam); + } + + return 0; +} + + int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) { @@ -5514,6 +5586,11 @@ int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) { if (tEncodeU64(&encoder, pReq->queryId) < 0) return -1; if (tEncodeU64(&encoder, pReq->taskId) < 0) return -1; if (tEncodeI32(&encoder, pReq->execId) < 0) return -1; + if (pReq->pOpParam) { + if (tSerializeSOperatorParam(&encoder, pReq->pOpParam) < 0) return -1; + } else { + if (tEncodeI32(&encoder, 0) < 0) return -1; + } tEndEncode(&encoder); @@ -5546,6 +5623,14 @@ int32_t tDeserializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq) if (tDecodeU64(&decoder, &pReq->taskId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->execId) < 0) return -1; + int32_t specNum = 0; + if (tDecodeI32(&decoder, &specNum) < 0) return -1; + if (specNum > 0) { + pReq->pOpParam = taosMemoryMalloc(sizeof(*pReq->pOpParam)); + if (NULL == pReq->pOpParam) return -1; + if (tDeserializeSOperatorParam(&decoder, pReq->pOpParam, specNum) < 0) return -1; + } + tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 522475e568..7c134f07b0 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -146,6 +146,7 @@ typedef struct SSortMergeJoinOperatorParam { typedef struct SExchangeOperatorParam { SOperatorParam* pChild; int32_t vgId; + int32_t srcOpType; SArray* uidList; } SExchangeOperatorParam; diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 5693586882..59e57e6060 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -27,19 +27,6 @@ typedef struct SOperatorCostInfo { struct SOperatorInfo; -typedef struct SOperatorSpecParam { - int32_t opType; - void* value; -} SOperatorSpecParam; - -typedef struct SOperatorBaseParam { - SOperatorParam* pChild; -} SOperatorBaseParam; - -typedef struct SOperatorParam { - SArray* pOpParams; //SArray -} SOperatorParam; - typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length); typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result); diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index cdf37bcc6b..68acf07aee 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -94,6 +94,9 @@ struct SExecTaskInfo { STaskStopInfo stopInfo; SRWLatch lock; // secure the access of STableListInfo SStorageAPI storageAPI; + int8_t dynamicTask; + SOperatorParam* pOpParam; + bool paramSet; }; void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst); diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index db124b3a9d..6aecce1a76 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -49,6 +49,7 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, return TSDB_CODE_OUT_OF_MEMORY; } + pGc->pChild = pChild; pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1); pGc->downstreamIdx = downstreamIdx; pGc->needCache = needCache; diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 079712f7f1..9083608e83 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -40,7 +40,8 @@ typedef struct SSourceDataInfo { int32_t code; EX_SOURCE_STATUS status; const char* taskId; - SArray* pUidList; + SArray* pSrcUidList; + int32_t srcOpType; } SSourceDataInfo; static void destroyExchangeOperatorInfo(void* param); @@ -416,6 +417,37 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { return code; } +int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) { + *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (*ppRes)->pOpParams = taosArrayInit(1, sizeof(SOperatorSpecParam)); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam)); + if (NULL == pScan) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pScan->pChild = NULL; + pScan->pUidList = taosArrayDup(pUidList, NULL); + if (NULL == pScan->pUidList) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SOperatorSpecParam specParam; + specParam.opType = srcOpType; + specParam.value = pScan; + + taosArrayPush((*ppRes)->pOpParams, &specParam); + + return TSDB_CODE_SUCCESS; +} + + int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) { SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex); if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) { @@ -445,10 +477,15 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas req.taskId = pSource->taskId; req.queryId = pTaskInfo->id.queryId; req.execId = pSource->execId; - if (pDataInfo->pUidList) { - req.opParam = buildTableScanOperatorParam(pDataInfo->pUidList); + if (pDataInfo->pSrcUidList) { + int32_t code = buildTableScanOperatorParam(&req.opParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType); + if (TSDB_CODE_SUCCESS != code) { + pTaskInfo->code = code; + taosMemoryFree(pWrapper); + return pTaskInfo->code; + } } - + int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req); if (msgSize < 0) { pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; @@ -726,7 +763,8 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) { dataInfo.status = EX_SOURCE_DATA_NOT_READY; dataInfo.taskId = GET_TASKID(pOperator->pTaskInfo); dataInfo.index = *pIdx; - dataInfo.pUidList = taosArrayDup(pParam->uidList, NULL); + dataInfo.pSrcUidList = taosArrayDup(pParam->uidList, NULL); + dataInfo.srcOpType = pParam->srcOpType; taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo); } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a3d94a0891..5db9ac308f 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -512,6 +512,16 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table return 0; } +bool qIsDynamicExecTask(qTaskInfo_t tinfo) { + return ((SExecTaskInfo*)tinfo)->dynamicTask; +} + +void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam) { + destroyOperatorParam(((SExecTaskInfo*)tinfo)->pOpParam); + ((SExecTaskInfo*)tinfo)->pOpParam = pParam; + ((SExecTaskInfo*)tinfo)->paramSet = false; +} + int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) { SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; @@ -602,8 +612,15 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo int64_t st = taosGetTimestampUs(); + if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) { + pTaskInfo->paramSet = true; + pRes = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam); + } else { + pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot); + } + int32_t blockIndex = 0; - while ((pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot)) != NULL) { + while (pRes != NULL) { SSDataBlock* p = NULL; if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) { SSDataBlock* p1 = createOneDataBlock(pRes, true); @@ -623,6 +640,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo if (current >= 4096) { break; } + + pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot); } *hasMore = (pRes != NULL); diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 854ff7f6cf..da31907245 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -96,7 +96,7 @@ int32_t optrDefaultBufFn(SOperatorInfo* pOperator) { SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) { pOperator->pOperatorParam = getOperatorParam(pOperator->operatorType, pParam, 0); - int32_t code = setOperatorParams(pOperator, pOperator->pOperatorParam ? pOperator->pOperatorParam->pChild : NULL); + int32_t code = setOperatorParams(pOperator, pOperator->pOperatorParam ? pOperator->pOperatorParam->pChild : pParam); if (TSDB_CODE_SUCCESS != code) { pOperator->pTaskInfo->code = code; T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); @@ -292,7 +292,6 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR SOperatorInfo* pOperator = NULL; if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - // NOTE: this is an patch to fix the physical plan // TODO remove it later if (pTableScanNode->scan.node.pLimit != NULL) { @@ -300,21 +299,25 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR } STableListInfo* pTableListInfo = tableListCreate(); - int32_t code = - createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, - pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); + + int32_t code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo); if (code) { pTaskInfo->code = code; tableListDestroy(pTableListInfo); - qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr); return NULL; } - code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo); - if (code) { - pTaskInfo->code = code; - tableListDestroy(pTableListInfo); - return NULL; + if (pTableScanNode->scan.node.dynamicOp) { + pTaskInfo->dynamicTask = true; + } else { + code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, + pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); + if (code) { + pTaskInfo->code = code; + tableListDestroy(pTableListInfo); + qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr); + return NULL; + } } pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo); @@ -659,4 +662,10 @@ FORCE_INLINE SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOper return pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx]); } +void destroyOperatorParam(SOperatorParam* pParam) { + if (NULL == pParam) { + return; + } +} + diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5dcb20d90f..9ec52c16fa 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -784,11 +784,23 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { return NULL; } +static int32_t createTableListInfoFromParam(STableScanInfo* pInfo, STableScanOperatorParam* pParam) { + +} + static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { STableScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; + if (pOperator->pOperatorParam) { + int32_t code = createTableListInfoFromParam(pInfo, (STableScanOperatorParam*)pOperator->pOperatorParam); + if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } + } + // scan table one by one sequentially if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { int32_t numOfTables = 0; // tableListGetSize(pTaskInfo->pTableListInfo); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 508e957e26..ec4fd8f9ce 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -512,7 +512,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int int64_t rId = 0; int32_t eId = req.execId; - SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info, .msgType = pMsg->msgType}; + SQWMsg qwMsg = {.node = node, .msg = req.pOpParam, .msgLen = 0, .connInfo = pMsg->info, .msgType = pMsg->msgType}; QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle); diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index a342e48cc1..d2cf929290 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -401,7 +401,28 @@ _return: QW_RET(code); } +int32_t qwHandleDynamicTaskEnd(QW_FPARAMS_DEF) { + char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; + QW_SET_QTID(id, qId, tId, eId); + SQWTaskCtx octx; + + SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); + if (NULL == ctx) { + QW_TASK_DLOG_E("task ctx not exist, may be dropped"); + QW_ERR_RET(QW_CTX_NOT_EXISTS_ERR_CODE(mgmt)); + } + + if (!qIsDynamicExecTask(ctx->taskHandle)) { + return TSDB_CODE_SUCCESS; + } + + qwHandleTaskComplete(QW_FPARAMS_DEF, ctx); + + return TSDB_CODE_SUCCESS; +} + int32_t qwDropTask(QW_FPARAMS_DEF) { + QW_ERR_RET(qwHandleDynamicTaskEnd(QW_FPARAMS())); QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS())); QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS())); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 6f641e677a..c5edbe1d96 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -203,15 +203,24 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { } if (numOfResBlock == 0 || (hasMore == false)) { - if (numOfResBlock == 0) { - QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds); + if (!qIsDynamicExecTask(taskHandle)) { + if (numOfResBlock == 0) { + QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds); + } else { + QW_TASK_DLOG("qExecTask done, useconds:%" PRIu64, useconds); + } + + QW_ERR_JRET(qwHandleTaskComplete(QW_FPARAMS(), ctx)); } else { - QW_TASK_DLOG("qExecTask done, useconds:%" PRIu64, useconds); + if (numOfResBlock == 0) { + QW_TASK_DLOG("dyn task qExecTask end with empty res, useconds:%" PRIu64, useconds); + } else { + QW_TASK_DLOG("dyn task qExecTask done, useconds:%" PRIu64, useconds); + } } dsEndPut(sinkHandle, useconds); - QW_ERR_JRET(qwHandleTaskComplete(QW_FPARAMS(), ctx)); - + if (queryStop) { *queryStop = true; } @@ -729,8 +738,11 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { atomic_store_ptr(&ctx->sinkHandle, sinkHandle); qwSaveTbVersionInfo(pTaskInfo, ctx); - QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); + if (!qIsDynamicExecTask(pTaskInfo)) { + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); + } + _return: taosMemoryFree(sql); @@ -849,6 +861,10 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ctx->fetchMsgType = qwMsg->msgType; ctx->dataConnInfo = qwMsg->connInfo; + if (qwMsg->msg) { + qUpdateOperatorParam(ctx->taskHandle); + } + SOutputData sOutput = {0}; QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));