diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 0a9037d21c..617ca7c23a 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -97,6 +97,8 @@ int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pC void dsEndPut(DataSinkHandle handle, uint64_t useconds); +void dsReset(DataSinkHandle handle); + /** * Get the length of the data returned by the next call to dsGetDataBlock. * @param handle diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index edd44e3727..0eb3ee09b4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -20,6 +20,7 @@ int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **p int32_t code = 0; STsdbFD *pFD = NULL; + szPage = 4096; //debug only!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! *ppFD = NULL; pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD) + strlen(path) + 1); diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h index 9893b4eb76..dcebd2c6fd 100644 --- a/source/libs/executor/inc/dataSinkInt.h +++ b/source/libs/executor/inc/dataSinkInt.h @@ -35,6 +35,7 @@ typedef struct SDataSinkManager { typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue); typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds); +typedef void (*FReset)(struct SDataSinkHandle* pHandle); typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd); typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput); typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle); @@ -43,6 +44,7 @@ typedef int32_t (*FGetCacheSize)(struct SDataSinkHandle* pHandle, uint64_t* size typedef struct SDataSinkHandle { FPutDataBlock fPut; FEndPut fEndPut; + FReset fReset; FGetDataLength fGetLen; FGetDataBlock fGetData; FDestroyDataSinker fDestroy; diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index bbb6c06f3d..0523b19308 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -41,7 +41,7 @@ typedef struct SGcFileCacheCtx { uint32_t fileId; SHashObj* pCacheFile; int32_t baseNameLen; - char baseFilename[PATH_MAX]; + char baseFilename[256]; } SGcFileCacheCtx; typedef struct SGcDownstreamCtx { @@ -115,6 +115,7 @@ typedef struct SGcSessionCtx { bool semInit; tsem_t waitSem; bool newFetch; + int64_t resRows; } SGcSessionCtx; typedef struct SGcBlkBufInfo { diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 2a22656d8c..38f6541a39 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -156,6 +156,13 @@ static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) { taosThreadMutexUnlock(&pDispatcher->mutex); } +static void resetDispatcher(struct SDataSinkHandle* pHandle) { + SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; + taosThreadMutexLock(&pDispatcher->mutex); + pDispatcher->queryEnd = false; + taosThreadMutexUnlock(&pDispatcher->mutex); +} + static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; if (taosQueueEmpty(pDispatcher->pDataBlocks)) { @@ -182,6 +189,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->numOfRows); } + static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; if (NULL == pDispatcher->nextOutput.pData) { @@ -244,6 +252,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD } dispatcher->sink.fPut = putDataBlock; dispatcher->sink.fEndPut = endPut; + dispatcher->sink.fReset = resetDispatcher; dispatcher->sink.fGetLen = getDataLength; dispatcher->sink.fGetData = getDataBlock; dispatcher->sink.fDestroy = destroyDataSinker; diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 3a972c1c20..38ec3ad393 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -59,6 +59,13 @@ void dsEndPut(DataSinkHandle handle, uint64_t useconds) { return pHandleImpl->fEndPut(pHandleImpl, useconds); } +void dsReset(DataSinkHandle handle) { + SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; + if (pHandleImpl->fReset) { + return pHandleImpl->fReset(pHandleImpl); + } +} + void dsGetDataLength(DataSinkHandle handle, int64_t* pLen, bool* pQueryEnd) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; pHandleImpl->fGetLen(pHandleImpl, pLen, pQueryEnd); diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index a6325e71b7..5535fe0216 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -186,7 +186,7 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr taosWLockLatch(&pCache->dirtyLock); pCache->blkCacheSize += pBufInfo->basic.bufSize; - qError("group cache block cache num:%d size:%" PRId64 , taosHashGetSize(pCache->pDirtyBlk), pCache->blkCacheSize); + qError("group cache total dirty block num:%d size:%" PRId64 , taosHashGetSize(pCache->pDirtyBlk), pCache->blkCacheSize); if (NULL == pCache->pDirtyHead) { pCache->pDirtyHead = pBufInfo; @@ -259,7 +259,7 @@ static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pB return code; } -void blockDataDeepCleanup(SSDataBlock* pDataBlock) { +void blockDataDeepClear(SSDataBlock* pDataBlock) { size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); @@ -276,6 +276,25 @@ void blockDataDeepCleanup(SSDataBlock* pDataBlock) { pDataBlock->info.rows = 0; } + +void blockDataDeepCleanup(SSDataBlock* pDataBlock) { + size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); + taosMemoryFreeClear(p->pData); + if (IS_VAR_DATA_TYPE(p->info.type)) { + taosMemoryFreeClear(p->varmeta.offset); + p->varmeta.length = 0; + p->varmeta.allocLen = 0; + } else { + taosMemoryFreeClear(p->nullbitmap); + } + } + pDataBlock->info.capacity = 0; + pDataBlock->info.rows = 0; +} + + static int32_t buildGroupCacheBaseBlock(SSDataBlock** ppDst, SSDataBlock* pSrc) { *ppDst = taosMemoryMalloc(sizeof(*pSrc)); if (NULL == *ppDst) { @@ -288,7 +307,7 @@ static int32_t buildGroupCacheBaseBlock(SSDataBlock** ppDst, SSDataBlock* pSrc) return TSDB_CODE_OUT_OF_MEMORY; } memcpy(&(*ppDst)->info, &pSrc->info, sizeof(pSrc->info)); - blockDataDeepCleanup(*ppDst); + blockDataDeepClear(*ppDst); return TSDB_CODE_SUCCESS; } @@ -919,8 +938,12 @@ static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock } code = getBlkFromSessionCache(pOperator, pGcParam->sessionId, pSession, ppRes); - if (NULL == ppRes) { + if (NULL == *ppRes) { taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); + qError("session %" PRId64 " in downstream %d total got %" PRId64 " rows", pGcParam->sessionId, pCtx->id, pSession->resRows); + } else { + pSession->resRows += (*ppRes)->info.rows; + qError("session %" PRId64 " in downstream %d got %" PRId64 " rows in one block", pGcParam->sessionId, pCtx->id, (*ppRes)->info.rows); } return code; diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 7bdea2a9c6..872ea53329 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -43,6 +43,7 @@ typedef struct SMJoinOperatorInfo { SSDataBlock* pRes; int32_t joinType; int32_t inputOrder; + bool downstreamFetchDone[2]; int16_t downstreamResBlkId[2]; SSDataBlock* pLeft; @@ -65,6 +66,8 @@ typedef struct SMJoinOperatorInfo { SSHashObj* rightBuildTable; SMJoinRowCtx rowCtx; + + int64_t resRows; } SMJoinOperatorInfo; static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); @@ -457,7 +460,7 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator } if (dataBlock == NULL) { - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + pJoinInfo->downstreamFetchDone[whichChild] = true; endPos = -1; break; } @@ -654,11 +657,15 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs bool leftEmpty = false; if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { - pJoinInfo->pLeft = getNextBlockFromDownstream(pOperator, 0); - - pJoinInfo->leftPos = 0; - qError("merge join left got block, rows:%" PRId64, pJoinInfo->pLeft ? pJoinInfo->pLeft->info.rows : 0); + if (!pJoinInfo->downstreamFetchDone[0]) { + pJoinInfo->pLeft = getNextBlockFromDownstream(pOperator, 0); + pJoinInfo->leftPos = 0; + qError("merge join left got block, rows:%" PRId64, pJoinInfo->pLeft ? pJoinInfo->pLeft->info.rows : 0); + } else { + pJoinInfo->pLeft = NULL; + } + if (pJoinInfo->pLeft == NULL) { if (pOperator->pOperatorParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorParam->value)->initParam) { leftEmpty = true; @@ -670,10 +677,14 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs } if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { - pJoinInfo->pRight = getNextBlockFromDownstream(pOperator, 1); + if (!pJoinInfo->downstreamFetchDone[1]) { + pJoinInfo->pRight = getNextBlockFromDownstream(pOperator, 1); - pJoinInfo->rightPos = 0; - qError("merge join right got block, rows:%" PRId64, pJoinInfo->pRight ? pJoinInfo->pRight->info.rows : 0); + pJoinInfo->rightPos = 0; + qError("merge join right got block, rows:%" PRId64, pJoinInfo->pRight ? pJoinInfo->pRight->info.rows : 0); + } else { + pJoinInfo->pRight = NULL; + } if (pJoinInfo->pRight == NULL) { setMergeJoinDone(pOperator); @@ -753,15 +764,21 @@ void resetMergeJoinOperator(struct SOperatorInfo* pOperator) { pJoinInfo->leftPos = 0; pJoinInfo->pRight = NULL; pJoinInfo->rightPos = 0; + pJoinInfo->downstreamFetchDone[0] = false; + pJoinInfo->downstreamFetchDone[1] = false; + pJoinInfo->resRows = 0; + pOperator->status = OP_OPENED; } SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { SMJoinOperatorInfo* pJoinInfo = pOperator->info; if (pOperator->status == OP_EXEC_DONE) { if (NULL == pOperator->pDownstreamParams[0] || NULL == pOperator->pDownstreamParams[1]) { + qError("total merge join res rows:%" PRId64, pJoinInfo->resRows); return NULL; } else { resetMergeJoinOperator(pOperator); + qError("start new merge join"); } } @@ -785,7 +802,15 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { break; } } - return (pRes->info.rows > 0) ? pRes : NULL; + + if (pRes->info.rows > 0) { + pJoinInfo->resRows += pRes->info.rows; + qError("merge join returns res rows:%" PRId64, pRes->info.rows); + return pRes; + } else { + qError("total merge join res rows:%" PRId64, pJoinInfo->resRows); + return NULL; + } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a45621bf71..7a03dd85a7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -805,10 +805,11 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) { } STableKeyInfo info = {.groupId = 0}; + int32_t tableIdx = 0; for (int32_t i = 0; i < num; ++i) { uint64_t* pUid = taosArrayGet(pParam->pUidList, i); - if (taosHashPut(pListInfo->map, pUid, sizeof(uint64_t), &i, sizeof(int32_t))) { + if (taosHashPut(pListInfo->map, pUid, sizeof(uint64_t), &tableIdx, sizeof(int32_t))) { if (TSDB_CODE_DUP_KEY == terrno) { continue; } @@ -821,6 +822,7 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) { return TSDB_CODE_OUT_OF_MEMORY; } + tableIdx++; qError("add dynamic table scan uid:%" PRIu64 ", %s", info.uid, GET_TASKID(pTaskInfo)); } diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 0a4470da00..e086d89d0e 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -387,7 +387,7 @@ int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx); int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode); -int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status); +int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status, bool dynamicTask); int32_t qwDropTask(QW_FPARAMS_DEF); void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx); int32_t qwOpenRef(void); @@ -400,7 +400,7 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx); int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx); void qwDbgDumpMgmtInfo(SQWorker *mgmt); -int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore); +int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore, bool dynamicTask); int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet); int32_t qwAddTaskCtx(QW_FPARAMS_DEF); void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped); diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index f2e48918ab..09a3af295c 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -17,7 +17,7 @@ SQWDebug gQWDebug = {.lockEnable = false, .sleepSimulate = false, .forceStop = false}; -int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { +int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore, bool dynamicTask) { if (!gQWDebug.statusEnable) { return TSDB_CODE_SUCCESS; } @@ -25,7 +25,7 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, int32_t code = 0; if (oriStatus == newStatus) { - if (newStatus == JOB_TASK_STATUS_EXEC || newStatus == JOB_TASK_STATUS_FAIL) { + if (dynamicTask || newStatus == JOB_TASK_STATUS_EXEC || newStatus == JOB_TASK_STATUS_FAIL) { *ignore = true; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 303eaf38ff..13a1a096b0 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -45,7 +45,7 @@ char *qwBufStatusStr(int32_t bufStatus) { return "UNKNOWN"; } -int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) { +int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status, bool dynamicTask) { int32_t code = 0; int8_t origStatus = 0; bool ignore = false; @@ -53,7 +53,7 @@ int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) { while (true) { origStatus = atomic_load_8(&task->status); - QW_ERR_RET(qwDbgValidateStatus(QW_FPARAMS(), origStatus, status, &ignore)); + QW_ERR_RET(qwDbgValidateStatus(QW_FPARAMS(), origStatus, status, &ignore, dynamicTask)); if (ignore) { break; } @@ -381,7 +381,7 @@ _return: QW_RET(code); } -int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) { +int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status, bool dynamicTask) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; @@ -389,7 +389,7 @@ int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status) { QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch)); QW_ERR_JRET(qwAcquireTaskStatus(QW_FPARAMS(), QW_READ, sch, &task)); - QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status)); + QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status, dynamicTask)); _return: @@ -417,7 +417,7 @@ int32_t qwHandleDynamicTaskEnd(QW_FPARAMS_DEF) { return TSDB_CODE_SUCCESS; } - QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC)); + QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask)); QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx)); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index ab676076f1..328195b025 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -345,7 +345,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, pOutput->numOfRows); if (!ctx->dynamicTask) { - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask); } if (NULL == rsp) { @@ -391,7 +391,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) { QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks, pOutput->numOfRows); - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask); break; } @@ -496,6 +496,8 @@ int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg QW_TASK_ELOG("dynamic task prev exec not finished, queryEnd:%d", ctx->queryEnd); return TSDB_CODE_ACTION_IN_PROGRESS; } + + dsReset(ctx->sinkHandle); qUpdateOperatorParam(ctx->taskHandle, qwMsg->msg); @@ -544,7 +546,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_ERR_JRET(ctx->rspCode); } - QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC)); + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC, ctx->dynamicTask)); break; } case QW_PHASE_PRE_FETCH: { @@ -653,7 +655,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp _return: if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) { - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC); + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC, ctx->dynamicTask); ctx->queryGotData = true; } @@ -679,7 +681,7 @@ _return: } if (code) { - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL); + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); } QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code)); @@ -926,7 +928,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } else if (QW_QUERY_RUNNING(ctx)) { atomic_store_8((int8_t *)&ctx->queryContinue, 1); } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) { - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC); + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC, ctx->dynamicTask); atomic_store_8((int8_t *)&ctx->queryInQueue, 1); @@ -989,7 +991,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (QW_QUERY_RUNNING(ctx)) { QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED)); - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP); + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP, ctx->dynamicTask); } else { QW_ERR_JRET(qwDropTask(QW_FPARAMS())); dropped = true; @@ -1007,7 +1009,7 @@ _return: QW_UPDATE_RSP_CODE(ctx, code); } - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL); + qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL, ctx->dynamicTask); } if (locked) {