diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b169d82574..3306c0bb27 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -45,7 +45,7 @@ enum { TASK_STATUS__FAIL, TASK_STATUS__STOP, TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner - TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused, todo remove it? + TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__PAUSE, // pause }; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 69d07a84c6..e25a8d3a71 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -925,7 +925,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { streamSetupScheduleTrigger(pTask); tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 - " child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms, disable pause", + " child id:%d, level:%d, fill-history:%d, trigger:%" PRId64 " ms, disable pause", vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam); @@ -1099,10 +1099,20 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { taosMsleep(100); } + ASSERT(pTask->status.pauseAllowed == false); + + if (pTask->info.fillHistory == 1) { + streamTaskEnablePause(pTask); + } + if (!streamTaskRecoverScanStep1Finished(pTask)) { streamSourceScanHistoryData(pTask); } + if (pTask->info.fillHistory == 1) { + streamTaskDisablePause(pTask); + } + if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); @@ -1114,8 +1124,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", id, el); if (pTask->info.fillHistory) { - streamTaskEnablePause(pTask); - SVersionRange* pRange = NULL; SStreamTask* pStreamTask = NULL; @@ -1123,7 +1131,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // 1. stop the related stream task, get the current scan wal version of stream task, ver. pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { - qError("failed to find s-task:0x%x, it may have been destroyed, drop fill history task:%s", + qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s", pTask->streamTaskId.taskId, pTask->id.idStr); pTask->status.taskStatus = TASK_STATUS__DROPPING; @@ -1136,6 +1144,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); + // stream task in TASK_STATUS__SCAN_HISTORY can not be paused. // wait for the stream task get ready for scan history data while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { tqDebug( @@ -1146,7 +1155,9 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // now we can stop the stream task execution pStreamTask->status.taskStatus = TASK_STATUS__HALT; - tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr, + + // todo disable the pause + tqDebug("s-task:%s level:%d status is set to halt by fill-history task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel, id); // if it's an source task, extract the last version in wal. @@ -1392,7 +1403,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { if (pTask != NULL) { // even in halt status, the data in inputQ must be processed int8_t status = pTask->status.taskStatus; - if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT || status == TASK_STATUS__SCAN_HISTORY) { + if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) { tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.version); streamProcessRunReq(pTask); @@ -1599,9 +1610,8 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { tDecoderClear(&decoder); int32_t taskId = req.taskId; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); - if (pTask) { + if (pTask != NULL) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; streamProcessDispatchMsg(pTask, &req, &rsp, false); streamMetaReleaseTask(pTq->pStreamMeta, pTask); @@ -1609,18 +1619,22 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { taosFreeQitem(pMsg); return 0; } else { + tDeleteStreamDispatchReq(&req); } code = TSDB_CODE_STREAM_TASK_NOT_EXIST; FAIL: - if (pMsg->info.handle == NULL) return -1; + if (pMsg->info.handle == NULL) { + tqError("s-task:0x%x vgId:%d msg handle is null, abort enqueue dispatch msg", pTq->pStreamMeta->vgId, taskId); + return -1; + } SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); if (pRspHead == NULL) { SRpcMsg rsp = {.code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info}; - tqDebug("send dispatch error rsp, code: %x", code); + tqError("s-task:0x%x send dispatch error rsp, code:%s", taskId, tstrerror(code)); tmsgSendRsp(&rsp); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); @@ -1636,9 +1650,10 @@ FAIL: pRsp->downstreamTaskId = htonl(req.taskId); pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL; - SRpcMsg rsp = { - .code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead}; - tqDebug("send dispatch error rsp, code: %x", code); + int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); + SRpcMsg rsp = { .code = code, .info = pMsg->info, .contLen = len, .pCont = pRspHead}; + tqError("s-task:0x%x send dispatch error rsp, code:%s", taskId, tstrerror(code)); + tmsgSendRsp(&rsp); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 0d9edbe5f4..3f5829d3ae 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -240,7 +240,9 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } int32_t status = pTask->status.taskStatus; - if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { + + // non-source or fill-history tasks don't need to response the WAL scan action. + if (pTask->info.taskLevel != TASK_LEVEL__SOURCE || pTask->info.fillHistory == 1) { streamMetaReleaseTask(pStreamMeta, pTask); continue; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 090cef48de..cae7e10e3e 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -262,14 +262,15 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set // flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure // happened too fast. todo handle the shuffle dispatch failure - qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr, - pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount); - int32_t ret = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); - if (ret != TSDB_CODE_SUCCESS) { - + if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { + qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, no-retry", pTask->id.idStr, + pRsp->downstreamTaskId, tstrerror(code)); + return code; + } else { + qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr, + pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount); + return streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); } - - return TSDB_CODE_SUCCESS; } qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code); @@ -359,6 +360,9 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return -1; } + int32_t msgLen = px->submit.msgLen; + int64_t ver = px->submit.ver; + int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); if (code != TSDB_CODE_SUCCESS) { streamDataSubmitDestroy(px); @@ -366,8 +370,9 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return code; } + // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already. qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, - px->submit.msgLen, px->submit.ver, total, size + px->submit.msgLen/1048576.0); + msgLen, ver, total, size + msgLen/1048576.0); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 0654bcf69f..49b8231336 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -29,7 +29,7 @@ bool streamTaskShouldStop(const SStreamStatus* pStatus) { bool streamTaskShouldPause(const SStreamStatus* pStatus) { int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); - return (status == TASK_STATUS__PAUSE); + return (status == TASK_STATUS__PAUSE || status == TASK_STATUS__HALT); } static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize, @@ -365,10 +365,6 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { pStreamTask->id.idStr); } - // todo fix race condition - streamTaskDisablePause(pTask); - streamTaskDisablePause(pStreamTask); - ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.transferState == true); STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; @@ -426,7 +422,6 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { // pause allowed streamTaskEnablePause(pStreamTask); - streamTaskEnablePause(pTask); streamSchedExec(pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 54688ed0cc..684e954f57 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -141,11 +141,6 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) { streamTaskSetRangeStreamCalc(pTask); streamTaskLaunchScanHistory(pTask); - // enable pause when init completed. - if (pTask->historyTaskId.taskId == 0) { - streamTaskEnablePause(pTask); - } - launchFillHistoryTask(pTask); } @@ -473,7 +468,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory streamNotifyUpstreamContinue(pTask); - // sink node does not receive the pause msg from mnode + // sink node does not receive the pause msg from mnode, so does not need enable it if (pTask->info.taskLevel == TASK_LEVEL__AGG) { streamTaskEnablePause(pTask); } @@ -497,6 +492,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { streamMetaSaveTask(pMeta, pTask); taosWUnLockLatch(&pMeta->lock); + // history data scan in the stream time window finished, now let's enable the pause streamTaskEnablePause(pTask); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { @@ -758,9 +754,15 @@ int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistory void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { if (pTask->historyTaskId.taskId == 0) { SHistDataRange* pRange = &pTask->dataRange; - qDebug("s-task:%s no related fill-history task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64 - " - %" PRId64, - pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); + if (pTask->info.fillHistory == 1) { + qDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 + "-%" PRId64, + pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); + } else { + qDebug("s-task:%s no related fill-history task, stream time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 + "-%" PRId64, + pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); + } } else { SHistDataRange* pRange = &pTask->dataRange; @@ -808,7 +810,25 @@ void streamTaskPause(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; int64_t st = taosGetTimestampMs(); - while(!pTask->status.pauseAllowed) { + + int8_t status = pTask->status.taskStatus; + if (status == TASK_STATUS__DROPPING) { + qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr); + return; + } + + const char* str = streamGetTaskStatusStr(status); + if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) { + qDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str); + return; + } + + while(!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) { + if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { + qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr); + return; + } + qDebug("s-task:%s wait for the task can be paused, vgId:%d", pTask->id.idStr, pMeta->vgId); taosMsleep(100); } @@ -826,8 +846,8 @@ void streamTaskDisablePause(SStreamTask* pTask) { // pre-condition check const char* id = pTask->id.idStr; while (pTask->status.taskStatus == TASK_STATUS__PAUSE) { + qDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, recheck in 100ms", id); taosMsleep(100); - qDebug("s-task:%s already in pause, wait for pause being cancelled, and set pause disabled, check in 100ms", id); } qDebug("s-task:%s disable task pause", id);