diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index da31907245..ba2f4aa2a5 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -309,6 +309,8 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR if (pTableScanNode->scan.node.dynamicOp) { pTaskInfo->dynamicTask = true; + pTableListInfo->idInfo.suid = pTableScanNode->scan.suid; + pTableListInfo->idInfo.tableType = pTableScanNode->scan.tableType; } else { code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9ec52c16fa..11c1536c62 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -784,8 +784,33 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { return NULL; } -static int32_t createTableListInfoFromParam(STableScanInfo* pInfo, STableScanOperatorParam* pParam) { +static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) { + STableScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + int32_t code = 0; + STableListInfo* pListInfo = pInfo->base.pTableListInfo; + int32_t num = taosArrayGetSize(pOperator->pOperatorParam->pUidList); + if (num <= 0) { + qError("empty table scan uid list"); + return TSDB_CODE_INVALID_PARA; + } + qDebug("add total %d dynamic tables to scan", num); + for (int32_t i = 0; i < num; ++i) { + uint64_t* pUid = taosArrayGet(pOperator->pOperatorParam->pUidList, i); + STableKeyInfo info = {.uid = *pUid, .groupId = 0}; + void* p = taosArrayPush(pListInfo->pTableList, &info); + if (p == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosHashPut(pListInfo->map, pUid, sizeof(uint64_t), &i, sizeof(int32_t)); + + qTrace("add dynamic table scan uid:%" PRIu64 ", %s", info.uid, GET_TASKID(pTaskInfo)); + } + + pOperator->pOperatorParam = NULL; + return code; } static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { @@ -794,7 +819,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { SStorageAPI* pAPI = &pTaskInfo->storageAPI; if (pOperator->pOperatorParam) { - int32_t code = createTableListInfoFromParam(pInfo, (STableScanOperatorParam*)pOperator->pOperatorParam); + int32_t code = createTableListInfoFromParam(pOperator); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 4bc357d7dd..eac96faf5d 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -120,6 +120,7 @@ typedef struct SQWTaskCtx { int8_t explain; int8_t needFetch; int8_t localExec; + int8_t dynamicTask; int32_t queryMsgType; int32_t fetchMsgType; int32_t level; diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index d2cf929290..3a9bfbc81e 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -412,11 +412,13 @@ int32_t qwHandleDynamicTaskEnd(QW_FPARAMS_DEF) { QW_ERR_RET(QW_CTX_NOT_EXISTS_ERR_CODE(mgmt)); } - if (!qIsDynamicExecTask(ctx->taskHandle)) { + if (!ctx->dynamicTask) { return TSDB_CODE_SUCCESS; } - qwHandleTaskComplete(QW_FPARAMS_DEF, ctx); + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); + + qwHandleTaskComplete(QW_FPARAMS(), ctx); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index c5edbe1d96..54fabc00af 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -203,7 +203,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { } if (numOfResBlock == 0 || (hasMore == false)) { - if (!qIsDynamicExecTask(taskHandle)) { + if (!ctx->dynamicTask) { if (numOfResBlock == 0) { QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds); } else { @@ -217,6 +217,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { } else { QW_TASK_DLOG("dyn task qExecTask done, useconds:%" PRIu64, useconds); } + + ctx->queryExecDone = true; } dsEndPut(sinkHandle, useconds); @@ -341,7 +343,10 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks, pOutput->numOfRows); - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); + if (!ctx->dynamicTask) { + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); + } + if (NULL == rsp) { QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &rsp)); *pOutput = output; @@ -481,6 +486,22 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i return TSDB_CODE_SUCCESS; } +int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg) { + if (!ctx->queryExecDone || !ctx->queryEnd) { + QW_TASK_ELOG("dynamic task prev exec not finished, execDone:%d, queryEnd:%d", ctx->queryExecDone, ctx->queryEnd); + return TSDB_CODE_ACTION_IN_PROGRESS; + } + + qUpdateOperatorParam(ctx->taskHandle); + + atomic_store_8((int8_t *)&ctx->queryInQueue, 1); + + QW_ERR_RET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo)); + + return TSDB_CODE_SUCCESS; +} + + int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t code = 0; SQWTaskCtx *ctx = NULL; @@ -734,13 +755,17 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { //qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true); ctx->level = plan->level; + ctx->dynamicTask = qIsDynamicExecTask(pTaskInfo) atomic_store_ptr(&ctx->taskHandle, pTaskInfo); atomic_store_ptr(&ctx->sinkHandle, sinkHandle); qwSaveTbVersionInfo(pTaskInfo, ctx); - if (!qIsDynamicExecTask(pTaskInfo)) { + if (!ctx->dynamicTask) { QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); + } else { + ctx->queryExecDone = true; + ctx->queryEnd = true; } _return: @@ -862,7 +887,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ctx->dataConnInfo = qwMsg->connInfo; if (qwMsg->msg) { - qUpdateOperatorParam(ctx->taskHandle); + code = qwStartDynamicTaskNewExec(QW_FPARAMS(), ctx, qwMsg); + goto _return; } SOutputData sOutput = {0};