From 7c6fbd77728d2fa03b358561a7f32a9e0cda326f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 14 Jun 2023 14:10:09 +0800 Subject: [PATCH] enh(stream): do some internal refactor and support secondary scan for history data. --- include/libs/executor/executor.h | 6 +- include/libs/stream/tstream.h | 8 +-- source/dnode/vnode/src/tq/tq.c | 44 +++++++++---- source/libs/executor/src/executor.c | 86 +++++++++++++++----------- source/libs/stream/src/streamRecover.c | 25 ++++---- 5 files changed, 101 insertions(+), 68 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 7a8c074283..852257f5df 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -220,11 +220,11 @@ void* qExtractReaderFromStreamScanner(void* scanner); int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner); -int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo); -int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); +int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo); +int32_t qStreamSourceScanParamForHistoryScan(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); -int32_t qStreamRestoreParam(qTaskInfo_t tinfo); +int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo); bool qStreamRecoverScanFinished(qTaskInfo_t tinfo); void qStreamCloseTsdbReader(void* task); void resetTaskInfo(qTaskInfo_t tinfo); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 223fd3c3b7..0891c35716 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -451,7 +451,7 @@ typedef struct { SMsgHead msgHead; int64_t streamId; int32_t taskId; -} SStreamRecoverStep1Req, SStreamRecoverStep2Req; +} SStreamScanHistoryReq, SStreamRecoverStep2Req; typedef struct { int64_t streamId; @@ -582,9 +582,9 @@ int32_t streamSetStatusNormal(SStreamTask* pTask); const char* streamGetTaskStatusStr(int32_t status); // source level -int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow); -int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq); -int32_t streamSourceRecoverScanStep1(SStreamTask* pTask); +int32_t streamSetParamForStreamScanner(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow); +int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq); +int32_t streamSourceScanHistoryData(SStreamTask* pTask); //int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index dd36075eb1..a8d712ae01 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1066,10 +1066,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { int32_t code = TSDB_CODE_SUCCESS; char* msg = pMsg->pCont; - int32_t msgLen = pMsg->contLen; SStreamMeta* pMeta = pTq->pStreamMeta; - SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg; + SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)msg; SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); if (pTask == NULL) { @@ -1089,16 +1088,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus)); int64_t st = taosGetTimestampMs(); - - // todo set the correct status flag - int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, - TASK_SCHED_STATUS__WAITING); + int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, + TASK_SCHED_STATUS__WAITING); if (schedStatus != TASK_SCHED_STATUS__INACTIVE) { ASSERT(0); return 0; } - streamSourceRecoverScanStep1(pTask); + streamSourceScanHistoryData(pTask); if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr); streamMetaReleaseTask(pMeta, pTask); @@ -1124,21 +1121,47 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { taosMsleep(100); } + taosMsleep(10000); + + // now we can stop the stream task execution pStreamTask->status.taskStatus = TASK_STATUS__HALT; tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel, pTask->id.idStr); // if it's an source task, extract the last version in wal. - int64_t ver = pTask->dataRange.range.maxVer; + int64_t ver = pTask->dataRange.range.maxVer + 1; int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); if (latestVer >= ver) { ver = latestVer; - } else { - ASSERT(latestVer == -1); } // 2. do secondary scan of the history data, the time window remain, and the version range is updated to [pTask->dataRange.range.maxVer, ver1] + SVersionRange* pRange = &pTask->dataRange.range; + pRange->minVer = pRange->maxVer + 1; + pRange->maxVer = ver; + if (pRange->minVer == pRange->maxVer) { + tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no new data ingest", pTask->id.idStr); + } else { + tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 + " do secondary scan-history-data after halt the related stream task:%s", + pTask->id.idStr, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pStreamTask->id.idStr); + + ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); + st = taosGetTimestampMs(); + + streamSetParamForStreamScanner(pTask, pRange, &pTask->dataRange.window); + + streamSourceScanHistoryData(pTask); + if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { + tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr); + streamMetaReleaseTask(pMeta, pTask); + return 0; + } + + el = (taosGetTimestampMs() - st) / 1000.0; + tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pTask->id.idStr, el); + } // 3. notify the downstream tasks to transfer executor state after handle all history blocks. pTask->status.transferState = true; @@ -1177,7 +1200,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } return 0; - } // notify the downstream tasks to transfer executor state after handle all history blocks. diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index d77323d737..f4135f58d1 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -92,6 +92,7 @@ static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) { qError("join not supported for stream block scan, %s" PRIx64, id); return TSDB_CODE_APP_ERROR; } + pOperator->status = OP_NOT_OPENED; return doSetStreamOpOpen(pOperator->pDownstream[0], id); } @@ -869,12 +870,20 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) { } } -int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow) { +int32_t qStreamSourceScanParamForHistoryScan(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); - pTaskInfo->streamInfo.fillHistoryVer = *pVerRange; - pTaskInfo->streamInfo.fillHistoryWindow = *pWindow; - pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE1; + + SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo; + + pStreamInfo->fillHistoryVer = *pVerRange; + pStreamInfo->fillHistoryWindow = *pWindow; + pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE1; + + qDebug("%s set param for stream scanner for scan history data, Ver:%" PRId64 " - %" PRId64 ", window:%" PRId64 + " - %" PRId64, + GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, + pWindow->ekey); return 0; } @@ -893,55 +902,58 @@ int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) { return 0; } -int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) { +int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; while (1) { - if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || - pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || - pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { + int32_t type = pOperator->operatorType; + if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || + type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; - ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || - pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); - ASSERT(pInfo->twAggSup.calTriggerSaved == 0 && pInfo->twAggSup.deleteMarkSaved == 0); + STimeWindowAggSupp* pSup = &pInfo->twAggSup; - qInfo("save stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); + ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); + ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0); - pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger; - pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark; - pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; - pInfo->twAggSup.deleteMark = INT64_MAX; + qInfo("save stream param for interval: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); + + pSup->calTriggerSaved = pSup->calTrigger; + pSup->deleteMarkSaved = pSup->deleteMark; + pSup->calTrigger = STREAM_TRIGGER_AT_ONCE; + pSup->deleteMark = INT64_MAX; pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData; pInfo->ignoreExpiredData = false; - } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || - pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || - pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { + } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || + type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || + type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || - pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); + STimeWindowAggSupp* pSup = &pInfo->twAggSup; - ASSERT(pInfo->twAggSup.calTriggerSaved == 0 && pInfo->twAggSup.deleteMarkSaved == 0); - qInfo("save stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); + ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); + ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0); - pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger; - pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark; - pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; - pInfo->twAggSup.deleteMark = INT64_MAX; + qInfo("save stream param for session: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); + + pSup->calTriggerSaved = pSup->calTrigger; + pSup->deleteMarkSaved = pSup->deleteMark; + pSup->calTrigger = STREAM_TRIGGER_AT_ONCE; + pSup->deleteMark = INT64_MAX; pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData; pInfo->ignoreExpiredData = false; - } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { + } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { SStreamStateAggOperatorInfo* pInfo = pOperator->info; - ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || - pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); - ASSERT(pInfo->twAggSup.calTriggerSaved == 0 && pInfo->twAggSup.deleteMarkSaved == 0); + STimeWindowAggSupp* pSup = &pInfo->twAggSup; - qInfo("save stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); + ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); + ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0); - pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger; - pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark; - pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; - pInfo->twAggSup.deleteMark = INT64_MAX; + qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); + + pSup->calTriggerSaved = pSup->calTrigger; + pSup->deleteMarkSaved = pSup->deleteMark; + pSup->calTrigger = STREAM_TRIGGER_AT_ONCE; + pSup->deleteMark = INT64_MAX; pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData; pInfo->ignoreExpiredData = false; } @@ -962,7 +974,7 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) { return 0; } -int32_t qStreamRestoreParam(qTaskInfo_t tinfo) { +int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index f59ffa68a0..38c6ad6f29 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -36,11 +36,11 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { pRange->minVer, pRange->maxVer); streamSetParamForScanHistoryData(pTask); - streamSourceRecoverPrepareStep1(pTask, pRange, &pTask->dataRange.window); + streamSetParamForStreamScanner(pTask, pRange, &pTask->dataRange.window); - SStreamRecoverStep1Req req; + SStreamScanHistoryReq req; streamBuildSourceRecover1Req(pTask, &req); - int32_t len = sizeof(SStreamRecoverStep1Req); + int32_t len = sizeof(SStreamScanHistoryReq); void* serializedReq = rpcMallocCont(len); if (serializedReq == NULL) { @@ -242,13 +242,13 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs // common int32_t streamSetParamForScanHistoryData(SStreamTask* pTask) { - void* exec = pTask->exec.pExecutor; - return qStreamSetParamForRecover(exec); + qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr); + return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor); } int32_t streamRestoreParam(SStreamTask* pTask) { - void* exec = pTask->exec.pExecutor; - return qStreamRestoreParam(exec); + qDebug("s-task:%s restore operator param after scan-history-data", pTask->id.idStr); + return qRestoreStreamOperatorOption(pTask->exec.pExecutor); } int32_t streamSetStatusNormal(SStreamTask* pTask) { @@ -258,19 +258,18 @@ int32_t streamSetStatusNormal(SStreamTask* pTask) { } // source -int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) { - void* exec = pTask->exec.pExecutor; - return qStreamSourceRecoverStep1(exec, pVerRange, pWindow); +int32_t streamSetParamForStreamScanner(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) { + return qStreamSourceScanParamForHistoryScan(pTask->exec.pExecutor, pVerRange, pWindow); } -int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq) { +int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq) { pReq->msgHead.vgId = pTask->info.nodeId; pReq->streamId = pTask->id.streamId; pReq->taskId = pTask->id.taskId; return 0; } -int32_t streamSourceRecoverScanStep1(SStreamTask* pTask) { +int32_t streamSourceScanHistoryData(SStreamTask* pTask) { return streamScanExec(pTask, 100); } @@ -393,7 +392,7 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask) { int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) { void* exec = pTask->exec.pExecutor; - if (qStreamRestoreParam(exec) < 0) { + if (qRestoreStreamOperatorOption(exec) < 0) { return -1; } if (qStreamRecoverFinish(exec) < 0) {