diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index fe669b7fac..629efa00b3 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -272,6 +272,7 @@ typedef struct SStreamStatus { bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it int8_t pauseAllowed; // allowed task status to be set to be paused int32_t timerActive; // timer is active + int32_t inScanHistorySentinel; } SStreamStatus; typedef struct SDataRange { @@ -358,8 +359,6 @@ typedef struct STaskOutputInfo { STaskSinkSma smaSink; STaskSinkFetch fetchSink; }; - -// void* pTimer; // timer for launch sink tasks int8_t type; STokenBucket* pTokenBucket; } STaskOutputInfo; @@ -375,7 +374,7 @@ struct SStreamTask { SSTaskBasicInfo info; STaskOutputQueue outputq; STaskInputInfo inputInfo; - STaskSchedInfo schedInfo; // todo remove it + STaskSchedInfo schedInfo; STaskOutputInfo outputInfo; SDispatchMsgInfo msgInfo; SStreamStatus status; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 285f4df7b4..5ec83d868f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1037,14 +1037,15 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms return code; } +// this function should be executed by only one thread int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont; SStreamMeta* pMeta = pTq->pStreamMeta; + int32_t code = TSDB_CODE_SUCCESS; - int32_t code = TSDB_CODE_SUCCESS; SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); if (pTask == NULL) { - tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed", + tqError("vgId:%d failed to acquire stream task:0x%x during scan history data, task may have been destroyed", pMeta->vgId, pReq->taskId); return -1; } @@ -1052,16 +1053,38 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // do recovery step1 const char* id = pTask->id.idStr; const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); - tqDebug("s-task:%s start scan-history stage(step 1), status:%s", id, pStatus); + + // avoid multi-thread exec + while(1) { + int32_t sentinel = atomic_val_compare_exchange_32(&pTask->status.inScanHistorySentinel, 0, 1); + if (sentinel != 0) { + tqDebug("s-task:%s already in scan-history func, wait for 100ms, and try again", id); + taosMsleep(100); + } else { + break; + } + } if (pTask->execInfo.step1Start == 0) { ASSERT(pTask->status.pauseAllowed == false); - pTask->execInfo.step1Start = taosGetTimestampMs(); + int64_t ts = taosGetTimestampMs(); + + pTask->execInfo.step1Start = ts; + tqDebug("s-task:%s start scan-history stage(step 1), status:%s, step1 startTs:%" PRId64, id, pStatus, ts); + + // NOTE: in case of stream task, scan-history data in wal is not allowed to pause if (pTask->info.fillHistory == 1) { streamTaskEnablePause(pTask); } } else { - tqDebug("s-task:%s resume from paused, start ts:%" PRId64, pTask->id.idStr, pTask->execInfo.step1Start); + if (pTask->execInfo.step2Start == 0) { + tqDebug("s-task:%s resume from paused, original step1 startTs:%" PRId64, id, pTask->execInfo.step1Start); + } else { + tqDebug("s-task:%s already in step2, no need to scan-history data, step2 starTs:%"PRId64, id, pTask->execInfo.step2Start); + atomic_store_32(&pTask->status.inScanHistorySentinel, 0); + streamMetaReleaseTask(pMeta, pTask); + return 0; + } } // we have to continue retrying to successfully execute the scan history task. @@ -1070,6 +1093,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { "s-task:%s failed to start scan-history in first stream time window since already started, unexpected " "sched-status:%d", id, pTask->status.schedStatus); + atomic_store_32(&pTask->status.inScanHistorySentinel, 0); streamMetaReleaseTask(pMeta, pTask); return 0; } @@ -1079,16 +1103,18 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } streamScanHistoryData(pTask); + + double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { - double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; int8_t status = streamTaskSetSchedStatusInActive(pTask); tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, status); + + atomic_store_32(&pTask->status.inScanHistorySentinel, 0); streamMetaReleaseTask(pMeta, pTask); return 0; } // the following procedure should be executed, no matter status is stop/pause or not - double el = (taosGetTimestampMs() - pTask->execInfo.step1Start) / 1000.0; tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el); if (pTask->info.fillHistory) { @@ -1106,6 +1132,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s fill-history task set status to be dropping", id); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); + + atomic_store_32(&pTask->status.inScanHistorySentinel, 0); streamMetaReleaseTask(pMeta, pTask); return -1; } @@ -1122,14 +1150,13 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } // now we can stop the stream task execution - int64_t latestVer = 0; + int64_t nextProcessedVer = 0; while (1) { taosThreadMutexLock(&pStreamTask->lock); int8_t status = pStreamTask->status.taskStatus; if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { - // return; - // do nothing + // return; do nothing } if (status == TASK_STATUS__HALT) { @@ -1160,9 +1187,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pStreamTask->status.keepTaskStatus = status; pStreamTask->status.taskStatus = TASK_STATUS__HALT; - tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, - pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id); - latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); + nextProcessedVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); + + tqDebug("s-task:%s level:%d nextProcessedVer:%" PRId64 ", sched-status:%d is halt by fill-history task:%s", + pStreamTask->id.idStr, pStreamTask->info.taskLevel, nextProcessedVer, pStreamTask->status.schedStatus, + id); taosThreadMutexUnlock(&pStreamTask->lock); break; @@ -1170,10 +1199,10 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // if it's an source task, extract the last version in wal. pRange = &pTask->dataRange.range; - done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer); + done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer); + pTask->execInfo.step2Start = taosGetTimestampMs(); if (done) { - pTask->execInfo.step2Start = taosGetTimestampMs(); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); streamTaskPutTranstateIntoInputQ(pTask); @@ -1192,27 +1221,26 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pStreamTask->id.idStr); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); - pTask->execInfo.step2Start = taosGetTimestampMs(); streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); int64_t dstVer = pTask->dataRange.range.minVer; pTask->chkInfo.nextProcessVer = dstVer; + walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE); /*int8_t status = */streamTaskSetSchedStatusInActive(pTask); - // set the fill-history task to be normal + // the fill-history task starts to process data in wal, let's set it status to be normal now if (pTask->info.fillHistory == 1 && !streamTaskShouldStop(&pTask->status)) { streamSetStatusNormal(pTask); } tqScanWalAsync(pTq, false); } - - streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pStreamTask); + } else { STimeWindow* pWindow = &pTask->dataRange.window; @@ -1233,14 +1261,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } code = streamTaskScanHistoryDataComplete(pTask); - streamMetaReleaseTask(pMeta, pTask); - - // when all source task complete to scan history data in stream time window, they are allowed to handle stream data - // at the same time. - return code; } - return 0; + atomic_store_32(&pTask->status.inScanHistorySentinel, 0); + streamMetaReleaseTask(pMeta, pTask); + return code; } // only the agg tasks and the sink tasks will receive this message from upstream tasks diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index c4aefa40ea..e49bf74c7d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -394,12 +394,17 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) { int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) { *pAdded = false; - STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + STaskId id = streamTaskExtractKey(pTask); void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p != NULL) { return 0; } + if (pTask->info.fillHistory == 1) { + stDebug("s-task:0x%x initial nextProcessVer is set to 1 for fill-history task", pTask->id.taskId); + ver = 1; + } + if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { tFreeStreamTask(pTask); return -1; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 6aacb5d2bb..cd15595411 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -792,24 +792,24 @@ int32_t streamTaskFillHistoryFinished(SStreamTask* pTask) { return qStreamInfoResetTimewindowFilter(exec); } -bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { +bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVer) { SVersionRange* pRange = &pTask->dataRange.range; - ASSERT(latestVer >= pRange->maxVer); + ASSERT(nextProcessVer >= pRange->maxVer); - int64_t nextStartVer = pRange->maxVer + 1; - if (nextStartVer > latestVer - 1) { - // no input data yet. no need to execute the secondardy scan while stream task halt + int64_t walScanStartVer = pRange->maxVer + 1; + if (walScanStartVer > nextProcessVer - 1) { + // no input data yet. no need to execute the secondary scan while stream task halt streamTaskFillHistoryFinished(pTask); stDebug( "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, " "related stream task currentVer:%" PRId64, - pTask->id.idStr, latestVer); + pTask->id.idStr, nextProcessVer); return true; } else { // 2. do secondary scan of the history data, the time window remain, and the version range is updated to // [pTask->dataRange.range.maxVer, ver1] - pRange->minVer = nextStartVer; - pRange->maxVer = latestVer - 1; + pRange->minVer = walScanStartVer; + pRange->maxVer = nextProcessVer - 1; return false; } } diff --git a/tests/system-test/8-stream/pause_resume_test.py b/tests/system-test/8-stream/pause_resume_test.py index 421f499a3d..484383f1ce 100644 --- a/tests/system-test/8-stream/pause_resume_test.py +++ b/tests/system-test/8-stream/pause_resume_test.py @@ -56,6 +56,9 @@ class TDTestCase: self.tdCom.create_stream(stream_name=f'{self.stb_name}{self.tdCom.stream_suffix}', des_table=self.stb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.stb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=stb_subtable_value, fill_history_value=fill_history_value) self.tdCom.create_stream(stream_name=f'{self.ctb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.ctb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.stb_source_select_str} from {self.ctb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=ctb_subtable_value, fill_history_value=fill_history_value) self.tdCom.create_stream(stream_name=f'{self.tb_name}{self.tdCom.stream_suffix}', des_table=self.tdCom.tb_stream_des_table, source_sql=f'select _wstart AS wstart, {self.tdCom.tb_source_select_str} from {self.tb_name} {partition_elm} interval({self.tdCom.dataDict["interval"]}s)', trigger_mode="at_once", subtable_value=tb_subtable_value, fill_history_value=fill_history_value) + + time.sleep(1) + for i in range(range_count): ts_value = str(date_time+self.tdCom.dataDict["interval"])+f'+{i*10}s' ts_cast_delete_value = self.tdCom.time_cast(ts_value) @@ -75,6 +78,9 @@ class TDTestCase: partition_elm = f'partition by {partition}' else: partition_elm = "" + + time.sleep(1) + # if i == int(range_count/2): if i > 2 and i % 3 == 0: for stream_name in [f'{self.stb_name}{self.tdCom.stream_suffix}', f'{self.ctb_name}{self.tdCom.stream_suffix}', f'{self.tb_name}{self.tdCom.stream_suffix}']: