From b83860372cc04a084c9dedb8c002577d8a271a55 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 1 Aug 2023 00:50:30 +0800 Subject: [PATCH 1/4] fix(stream): dump results to sink node before paused. --- source/libs/executor/src/executor.c | 5 ----- source/libs/stream/src/streamExec.c | 5 ++++- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index e4ddf9ca6c..231653c728 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -186,11 +186,6 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) { doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI); } -//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) { -// SExecTaskInfo* pTaskInfo = tinfo; -// pTaskInfo->code = code; -//} - int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { if (tinfo == NULL) { return TSDB_CODE_APP_ERROR; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d1dff0f2e7..4e39f1448a 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -196,10 +196,13 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { SSDataBlock* output = NULL; uint64_t ts = 0; - if (qExecTask(exec, &output, &ts) < 0) { + code = qExecTask(exec, &output, &ts); + if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) { + qError("%s scan-history data error occurred code:%s, continue scan", pTask->id.idStr, tstrerror(code)); continue; } + // the generated results before fill-history task been paused, should be dispatched to sink node if (output == NULL && qStreamRecoverScanFinished(exec)) { finished = true; break; From f6ae9cf9bb1100e75a357139ebf21f3b9655f8ee Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 1 Aug 2023 10:16:14 +0800 Subject: [PATCH 2/4] fix(stream): fetch all data before paused and dump to sink node. --- include/libs/stream/tstream.h | 2 +- source/libs/executor/src/executor.c | 30 ++++++++++++++++++++--------- source/libs/stream/src/streamExec.c | 21 +++++++------------- 3 files changed, 29 insertions(+), 24 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index db0509d81d..b4ae30910c 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -590,7 +590,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask); int32_t streamTaskEndScanWAL(SStreamTask* pTask); SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); -int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); +int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 231653c728..05767db286 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -647,23 +647,33 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { *pRes = NULL; int64_t curOwner = 0; - if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { + + // todo extract method + taosRLockLatch(&pTaskInfo->lock); + bool isKilled = isTaskKilled(pTaskInfo); + if (isKilled) { + clearStreamBlock(pTaskInfo->pRoot); + qDebug("%s already killed, abort", GET_TASKID(pTaskInfo)); + + taosRUnLockLatch(&pTaskInfo->lock); + return TSDB_CODE_SUCCESS; + } + + if (pTaskInfo->owner != 0) { qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner); pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; + + taosRUnLockLatch(&pTaskInfo->lock); return pTaskInfo->code; } + pTaskInfo->owner = threadId; + taosRUnLockLatch(&pTaskInfo->lock); + if (pTaskInfo->cost.start == 0) { pTaskInfo->cost.start = taosGetTimestampUs(); } - if (isTaskKilled(pTaskInfo)) { - clearStreamBlock(pTaskInfo->pRoot); - atomic_store_64(&pTaskInfo->owner, 0); - qDebug("%s already killed, abort", GET_TASKID(pTaskInfo)); - return TSDB_CODE_SUCCESS; - } - // error occurs, record the error code and return to client int32_t ret = setjmp(pTaskInfo->env); if (ret != TSDB_CODE_SUCCESS) { @@ -767,11 +777,13 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) { qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo)); setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED); + taosWLockLatch(&pTaskInfo->lock); while (qTaskIsExecuting(pTaskInfo)) { taosMsleep(10); } - pTaskInfo->code = rspCode; + taosWUnLockLatch(&pTaskInfo->lock); + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 4e39f1448a..1f126e002b 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -162,7 +162,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i return code; } -int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { +int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); int32_t code = TSDB_CODE_SUCCESS; void* exec = pTask->exec.pExecutor; @@ -174,7 +174,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { if (streamTaskShouldPause(&pTask->status)) { double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0; qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el); - return 0; + break; } SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); @@ -190,10 +190,6 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { return 0; } - if (streamTaskShouldPause(&pTask->status)) { - break; - } - SSDataBlock* output = NULL; uint64_t ts = 0; code = qExecTask(exec, &output, &ts); @@ -203,13 +199,9 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { } // the generated results before fill-history task been paused, should be dispatched to sink node - if (output == NULL && qStreamRecoverScanFinished(exec)) { - finished = true; + if (output == NULL) { + finished = qStreamRecoverScanFinished(exec); break; - } else { - if (output == NULL) { - ASSERT(0); - } } SSDataBlock block = {0}; @@ -218,8 +210,9 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { taosArrayPush(pRes, &block); numOfBlocks++; - qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, numOfBlocks, batchSz); - if (numOfBlocks >= batchSz) { + if (numOfBlocks >= batchSize || code != TSDB_CODE_SUCCESS) { + qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d, code:%s", pTask->id.idStr, numOfBlocks, batchSize, + tstrerror(code)); break; } } From e11cda17c0b326877798862f27a4a8d2f32a5331 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 1 Aug 2023 10:30:24 +0800 Subject: [PATCH 3/4] fix(stream): remove invalid check. --- source/libs/stream/src/streamExec.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 1f126e002b..c83e29ae2f 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -209,10 +209,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) { block.info.childId = pTask->info.selfChildId; taosArrayPush(pRes, &block); - numOfBlocks++; - if (numOfBlocks >= batchSize || code != TSDB_CODE_SUCCESS) { - qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d, code:%s", pTask->id.idStr, numOfBlocks, batchSize, - tstrerror(code)); + if ((++numOfBlocks) >= batchSize) { + qDebug("s-task:%s scan exec numOfBlocks:%d, output limit:%d reached", pTask->id.idStr, numOfBlocks, batchSize); break; } } From f1e0e6491803d2a3c5ca609ff07738045cc30b05 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 1 Aug 2023 14:31:41 +0800 Subject: [PATCH 4/4] fix(stream): add more check. --- source/libs/stream/src/streamExec.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c83e29ae2f..bbd26d0442 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -304,11 +304,12 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; + // todo. the dropping status should be append to the status after the halt completed. // It must be halted for a source stream task, since when the related scan-history-data task start scan the history - // for the step 2. For a agg task + // for the step 2. int8_t status = pStreamTask->status.taskStatus; if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { - ASSERT(status == TASK_STATUS__HALT); + ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING); } else { ASSERT(status == TASK_STATUS__SCAN_HISTORY); pStreamTask->status.taskStatus = TASK_STATUS__HALT;