Merge pull request #22260 from taosdata/fix/3_liaohj

fix(stream): kill task when pause the stream execution.
This commit is contained in:
Haojun Liao 2023-07-31 22:55:36 +08:00 committed by GitHub
commit 7e8ecceada
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 77 additions and 54 deletions

View File

@ -604,13 +604,10 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq*
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask);
// common // common
int32_t streamSetParamForScanHistory(SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask); int32_t streamSetStatusNormal(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status); const char* streamGetTaskStatusStr(int32_t status);
@ -624,7 +621,6 @@ void streamTaskEnablePause(SStreamTask* pTask);
// source level // source level
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
int32_t streamSourceScanHistoryData(SStreamTask* pTask); int32_t streamSourceScanHistoryData(SStreamTask* pTask);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);

View File

@ -1567,9 +1567,8 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
pReq->taskId); pReq->taskId);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1581,9 +1580,8 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
if (pTask->historyTaskId.taskId != 0) { if (pTask->historyTaskId.taskId != 0) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId);
if (pHistoryTask == NULL) { if (pHistoryTask == NULL) {
tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already. Pause success", tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already",
pMeta->vgId, pTask->historyTaskId.taskId); pMeta->vgId, pTask->historyTaskId.taskId);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
@ -1591,14 +1589,12 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
} }
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr); tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
streamTaskPause(pHistoryTask);
}
streamMetaReleaseTask(pMeta, pTask); streamTaskPause(pHistoryTask);
if (pHistoryTask != NULL) {
streamMetaReleaseTask(pMeta, pHistoryTask); streamMetaReleaseTask(pMeta, pHistoryTask);
} }
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1627,7 +1623,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
} }
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
streamStartRecoverTask(pTask, igUntreated); streamStartScanHistoryAsync(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) { } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
tqStartStreamTasks(pTq); tqStartStreamTasks(pTq);
} else { } else {

View File

@ -62,6 +62,7 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq); int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
extern int32_t streamBackendId; extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId; extern int32_t streamBackendCfWrapperId;

View File

@ -292,12 +292,12 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
} }
} }
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) { if (pStreamTask == NULL) {
// todo: destroy this task here // todo: destroy the fill-history task here
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr,
pTask->streamTaskId.taskId); pTask->streamTaskId.taskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST; return TSDB_CODE_STREAM_TASK_NOT_EXIST;
@ -338,34 +338,36 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr); qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
} }
// expand the query time window for stream scanner // 1. expand the query time window for stream task of WAL scanner
pTimeWindow->skey = INT64_MIN; pTimeWindow->skey = INT64_MIN;
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
// transfer the ownership of executor state // 2. transfer the ownership of executor state
streamTaskReleaseState(pTask); streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask); streamTaskReloadState(pStreamTask);
// clear the link between fill-history task and stream task info // 3. clear the link between fill-history task and stream task info
pStreamTask->historyTaskId.taskId = 0; pStreamTask->historyTaskId.taskId = 0;
// 4. resume the state of stream task, after this function, the stream task will run immidately. But it can not be
// pause, since the pause allowed attribute is not set yet.
streamTaskResumeFromHalt(pStreamTask); streamTaskResumeFromHalt(pStreamTask);
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
int32_t taskId = pTask->id.taskId; int32_t taskId = pTask->id.taskId;
// free it and remove it from disk meta-store // 5. free it and remove fill-history task from disk meta-store
streamMetaUnregisterTask(pMeta, taskId); streamMetaUnregisterTask(pMeta, taskId);
// save to disk // 6. save to disk
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
streamMetaSaveTask(pMeta, pStreamTask); streamMetaSaveTask(pMeta, pStreamTask);
if (streamMetaCommit(pMeta) < 0) { if (streamMetaCommit(pMeta) < 0) {
// persist to disk // persist to disk
} }
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
// pause allowed // 7. pause allowed.
streamTaskEnablePause(pStreamTask); streamTaskEnablePause(pStreamTask);
streamSchedExec(pStreamTask); streamSchedExec(pStreamTask);
@ -373,6 +375,26 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
int32_t code = TSDB_CODE_SUCCESS;
if (!pTask->status.transferState) {
return code;
}
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) {
streamTaskFillHistoryFinished(pTask);
streamTaskEndScanWAL(pTask);
} else if (level == TASK_LEVEL__AGG) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle this
return code;
}
}
return code;
}
static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
const char* id) { const char* id) {
int32_t retryTimes = 0; int32_t retryTimes = 0;
@ -526,17 +548,16 @@ int32_t streamTaskEndScanWAL(SStreamTask* pTask) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
// 3. notify downstream tasks to transfer executor state after handle all history blocks. // 1. notify all downstream tasks to transfer executor state after handle all history blocks.
pTask->status.transferState = true;
int32_t code = streamDispatchTransferStateMsg(pTask); int32_t code = streamDispatchTransferStateMsg(pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
// todo handle error // todo handle error
} }
// the last execution of fill-history task, in order to transfer task operator states. // 2. do transfer stream task operator states.
code = streamTransferStateToStreamTask(pTask); pTask->status.transferState = true;
if (code != TSDB_CODE_SUCCESS) { // todo handle this code = streamDoTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle error
return code; return code;
} }
@ -560,9 +581,11 @@ int32_t streamTryExec(SStreamTask* pTask) {
// todo the task should be commit here // todo the task should be commit here
if (taosQueueEmpty(pTask->inputQueue->queue)) { if (taosQueueEmpty(pTask->inputQueue->queue)) {
// fill-history WAL scan has completed // fill-history WAL scan has completed
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && pTask->status.transferState == true) { if (pTask->status.transferState) {
streamTaskRecoverSetAllStepFinished(pTask); code = streamTransferStateToStreamTask(pTask);
streamTaskEndScanWAL(pTask); if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else { } else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),

View File

@ -17,23 +17,30 @@
#include "ttimer.h" #include "ttimer.h"
#include "wal.h" #include "wal.h"
static void launchFillHistoryTask(SStreamTask* pTask); typedef struct SStreamTaskRetryInfo {
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); SStreamMeta* pMeta;
int32_t taskId;
} SStreamTaskRetryInfo;
static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) { static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
static void launchFillHistoryTask(SStreamTask* pTask);
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) {
ASSERT(pTask->status.downstreamReady == 0); ASSERT(pTask->status.downstreamReady == 0);
pTask->status.downstreamReady = 1; pTask->status.downstreamReady = 1;
int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init);
int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init);
qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s", qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s",
pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus)); pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus));
} }
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) { int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
SStreamScanHistoryReq req; SStreamScanHistoryReq req;
streamBuildSourceRecover1Req(pTask, &req, igUntreated); initScanHistoryReq(pTask, &req, igUntreated);
int32_t len = sizeof(SStreamScanHistoryReq);
int32_t len = sizeof(SStreamScanHistoryReq);
void* serializedReq = rpcMallocCont(len); void* serializedReq = rpcMallocCont(len);
if (serializedReq == NULL) { if (serializedReq == NULL) {
return -1; return -1;
@ -65,9 +72,9 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
streamSetParamForScanHistory(pTask); streamSetParamForScanHistory(pTask);
} }
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
int32_t code = streamStartRecoverTask(pTask, 0); streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
int32_t code = streamStartScanHistoryAsync(pTask, 0);
return code; return code;
} }
@ -142,7 +149,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
} else { } else {
qDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); qDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
streamTaskSetForReady(pTask, 0); streamTaskSetReady(pTask, 0);
streamTaskSetRangeStreamCalc(pTask); streamTaskSetRangeStreamCalc(pTask);
streamTaskLaunchScanHistory(pTask); streamTaskLaunchScanHistory(pTask);
@ -188,7 +195,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask) {
} }
static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
streamTaskSetForReady(pTask, numOfReqs); streamTaskSetReady(pTask, numOfReqs);
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int8_t status = pTask->status.taskStatus; int8_t status = pTask->status.taskStatus;
@ -319,7 +326,7 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *p
return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow); return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
} }
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) { int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
pReq->msgHead.vgId = pTask->info.nodeId; pReq->msgHead.vgId = pTask->info.nodeId;
pReq->streamId = pTask->id.streamId; pReq->streamId = pTask->id.streamId;
pReq->taskId = pTask->id.taskId; pReq->taskId = pTask->id.taskId;
@ -524,11 +531,6 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
streamTaskDoCheckDownstreamTasks(pHTask); streamTaskDoCheckDownstreamTasks(pHTask);
} }
typedef struct SStreamTaskRetryInfo {
SStreamMeta* pMeta;
int32_t taskId;
} SStreamTaskRetryInfo;
static void tryLaunchHistoryTask(void* param, void* tmrId) { static void tryLaunchHistoryTask(void* param, void* tmrId) {
SStreamTaskRetryInfo* pInfo = param; SStreamTaskRetryInfo* pInfo = param;
SStreamMeta* pMeta = pInfo->pMeta; SStreamMeta* pMeta = pInfo->pMeta;
@ -638,7 +640,7 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
} }
} }
// dispatch recover finish req to all related downstream task // dispatch scan-history finish req to all related downstream task
code = streamDispatchScanHistoryFinishMsg(pTask); code = streamDispatchScanHistoryFinishMsg(pTask);
if (code < 0) { if (code < 0) {
return -1; return -1;
@ -647,7 +649,7 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
return 0; return 0;
} }
int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) { int32_t streamTaskFillHistoryFinished(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor; void* exec = pTask->exec.pExecutor;
return qStreamInfoResetTimewindowFilter(exec); return qStreamInfoResetTimewindowFilter(exec);
} }
@ -659,7 +661,7 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
int64_t nextStartVer = pRange->maxVer + 1; int64_t nextStartVer = pRange->maxVer + 1;
if (nextStartVer > latestVer - 1) { if (nextStartVer > latestVer - 1) {
// no input data yet. no need to execute the secondardy scan while stream task halt // no input data yet. no need to execute the secondardy scan while stream task halt
streamTaskRecoverSetAllStepFinished(pTask); streamTaskFillHistoryFinished(pTask);
qDebug( qDebug(
"s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, " "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, "
"related stream task currentVer:%" PRId64, "related stream task currentVer:%" PRId64,
@ -674,7 +676,6 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) {
} }
} }
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
@ -847,7 +848,7 @@ void streamTaskPause(SStreamTask* pTask) {
taosMsleep(100); taosMsleep(100);
} }
// todo: use the lock of the task. // todo: use the task lock, stead of meta lock
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
status = pTask->status.taskStatus; status = pTask->status.taskStatus;
@ -861,6 +862,12 @@ void streamTaskPause(SStreamTask* pTask) {
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
// in case of fill-history task, stop the tsdb file scan operation.
if (pTask->info.fillHistory == 1) {
void* pExecutor = pTask->exec.pExecutor;
qKillTask(pExecutor, TSDB_CODE_SUCCESS);
}
int64_t el = taosGetTimestampMs() - st; int64_t el = taosGetTimestampMs() - st;
qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr, qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el); streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el);