From c9fa170e65b631218a22cce1dcefceb5cf1cc215 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Jul 2023 16:31:46 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 36 ++++-------------------- source/libs/stream/src/streamExec.c | 38 +++++++++++++++++++------- source/libs/stream/src/streamRecover.c | 35 +----------------------- 3 files changed, 35 insertions(+), 74 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5c4f4048d1..887620c8fa 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1066,8 +1066,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // do recovery step 1 const char* id = pTask->id.idStr; - tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, - streamGetTaskStatusStr(pTask->status.taskStatus)); + const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, pStatus); int64_t st = taosGetTimestampMs(); int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, @@ -1112,6 +1112,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); + // todo remove this // wait for the stream task get ready for scan history data while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) || pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { @@ -1168,20 +1169,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // 5. resume the related stream task. streamTryExec(pTask); - pTask->status.taskStatus = TASK_STATUS__DROPPING; - tqDebug("s-task:%s scan-history-task set status to be dropping", id); - - streamMetaSaveTask(pMeta, pTask); - streamMetaSaveTask(pMeta, pStreamTask); - streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pStreamTask); - - taosWLockLatch(&pMeta->lock); - if (streamMetaCommit(pTask->pMeta) < 0) { - // persist to disk - } - taosWUnLockLatch(&pMeta->lock); } else { // todo update the chkInfo version for current task. // this task has an associated history stream task, so we need to scan wal from the end version of @@ -1241,22 +1230,9 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { } // transfer the ownership of executor state - tqDebug("s-task:%s all upstream tasks end transfer msg", pTask->id.idStr); + tqDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", pTask->id.idStr); + ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1); - // related stream task load the state from the state storage backend - SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId); - if (pStreamTask == NULL) { - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tqError("failed to find related stream task:0x%x, it may have been dropped already", req.taskId); - return -1; - } - - // when all upstream tasks have notified the this task to start transfer state, then we start the transfer procedure. - streamTaskReleaseState(pTask); - streamTaskReloadState(pStreamTask); - streamMetaReleaseTask(pTq->pStreamMeta, pStreamTask); - - ASSERT(pTask->streamTaskId.taskId != 0); pTask->status.transferState = true; streamSchedExec(pTask); @@ -1366,7 +1342,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { if (pTask != NULL) { // even in halt status, the data in inputQ must be processed int8_t status = pTask->status.taskStatus; - if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT) { + if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT || status == TASK_STATUS__SCAN_HISTORY) { tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.version); streamProcessRunReq(pTask); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 06eed5fee2..0a248d0ffe 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -351,30 +351,36 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { } static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { - SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId); + SStreamMeta* pMeta = pTask->pMeta; + + SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { - qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", - pTask->id.idStr, pTask->streamTaskId.taskId); + pTask->status.transferState = false; // reset this value, to avoid transfer state again + + qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, + pTask->streamTaskId.taskId); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } else { qDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr); } - ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId); + ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.transferState == true); + STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; // It must be halted for a source stream task, since when the related scan-history-data task start scan the history // for the step 2. For a agg task + int8_t status = pStreamTask->status.taskStatus; if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { - ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT); + ASSERT(status == TASK_STATUS__HALT); } else { - ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL); + ASSERT(status == TASK_STATUS__SCAN_HISTORY); pStreamTask->status.taskStatus = TASK_STATUS__HALT; - qDebug("s-task:%s status: halt by related fill history task:%s", pStreamTask->id.idStr, pTask->id.idStr); + qDebug("s-task:%s halt by related fill history task:%s", pStreamTask->id.idStr, pTask->id.idStr); } - // wait for the stream task to be idle + // wait for the stream task to handle all in the inputQ, and to be idle waitForTaskIdle(pTask, pStreamTask); // In case of sink tasks, no need to be halted for them. @@ -399,10 +405,23 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { streamTaskReleaseState(pTask); streamTaskReloadState(pStreamTask); + // reset the status of stream task streamSetStatusNormal(pStreamTask); + pTask->status.taskStatus = TASK_STATUS__DROPPING; + qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); + + // save to disk + taosWLockLatch(&pMeta->lock); + streamMetaSaveTask(pMeta, pTask); + streamMetaSaveTask(pMeta, pStreamTask); + if (streamMetaCommit(pMeta) < 0) { + // persist to disk + } + taosWUnLockLatch(&pMeta->lock); + streamSchedExec(pStreamTask); - streamMetaReleaseTask(pTask->pMeta, pStreamTask); + streamMetaReleaseTask(pMeta, pStreamTask); return TSDB_CODE_SUCCESS; } @@ -480,7 +499,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { ASSERT(batchSize == 0); if (pTask->info.fillHistory && pTask->status.transferState) { int32_t code = streamTransferStateToStreamTask(pTask); - pTask->status.transferState = false; // reset this value, to avoid transfer state again if (code != TSDB_CODE_SUCCESS) { // todo handle this return 0; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index a39741117c..ddd7ae4676 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -62,10 +62,6 @@ const char* streamGetTaskStatusStr(int32_t status) { static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { SVersionRange* pRange = &pTask->dataRange.range; - -// qDebug("s-task:%s vgId:%d status:%s, start scan-history-data task, verRange:%" PRId64 " - %" PRId64, pTask->id.idStr, -// pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), pRange->minVer, pRange->maxVer); -// streamSetParamForScanHistory(pTask); streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window); @@ -84,12 +80,10 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { walReaderGetCurrentVer(pTask->exec.pWalReader)); } } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { - streamSetStatusNormal(pTask); streamSetParamForScanHistory(pTask); streamAggScanHistoryPrepare(pTask); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - streamSetStatusNormal(pTask); - qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr); + qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); } return 0; @@ -145,7 +139,6 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) { streamTaskSetForReady(pTask, 0); streamTaskSetRangeStreamCalc(pTask); - streamTaskLaunchScanHistory(pTask); launchFillHistoryTask(pTask); } @@ -759,32 +752,6 @@ void streamTaskCheckDownstreamTasks(SStreamTask* pTask) { return; } - // calculate the correct start time window, and start the handle the history data for the main task. -/* if (pTask->historyTaskId.taskId != 0) { - // check downstream tasks for associated scan-history-data tasks - streamLaunchFillHistoryTask(pTask); - - // launch current task - SHistDataRange* pRange = &pTask->dataRange; - int64_t ekey = pRange->window.ekey + 1; - int64_t ver = pRange->range.minVer; - - pRange->window.skey = ekey; - pRange->window.ekey = INT64_MAX; - pRange->range.minVer = 0; - pRange->range.maxVer = ver; - - qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64 - ", ver range:%" PRId64 " - %" PRId64, - pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, - pRange->range.maxVer); - } else { - SHistDataRange* pRange = &pTask->dataRange; - qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64 - " - %" PRId64, - pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); - }*/ - ASSERT(pTask->status.downstreamReady == 0); // check downstream tasks for itself