From aed99da6c1dfac8bc547b6098778b39b063e6c23 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Jul 2023 12:14:53 +0800 Subject: [PATCH] refactor: refactor the stream task starting order. --- include/libs/stream/tstream.h | 12 +- source/dnode/snode/src/snode.c | 6 +- source/dnode/vnode/src/tq/tq.c | 119 +++++++------- source/dnode/vnode/src/tq/tqRestore.c | 10 +- source/libs/stream/src/streamMeta.c | 4 +- source/libs/stream/src/streamRecover.c | 206 +++++++++++++++++-------- 6 files changed, 224 insertions(+), 133 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index bbeeb33523..2ce640a2bc 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -46,7 +46,7 @@ enum { TASK_STATUS__STOP, TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused - TASK_STATUS__PAUSE, + TASK_STATUS__PAUSE, // pause }; enum { @@ -315,7 +315,7 @@ struct SStreamTask { SArray* pUpstreamEpInfoList; // SArray, // children info int32_t nextCheckId; SArray* checkpointInfo; // SArray - + int64_t initTs; // output union { STaskDispatcherFixedEp fixedEpDispatcher; @@ -572,12 +572,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); // recover and fill history -void streamPrepareNdoCheckDownstream(SStreamTask* pTask); -int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask); +void streamTaskCheckDownstreamTasks(SStreamTask* pTask); +int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask); int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask); +int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, + SRpcHandleInfo* pRpcInfo, int32_t taskId); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); -int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask); +int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 700b0191a5..3b19c7ae4a 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -72,6 +72,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { return -1; } + pTask->initTs = taosGetTimestampMs(); pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pSnode->msgCb; @@ -166,11 +167,10 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta); taosWUnLockLatch(&pSnode->pMeta->lock); - - streamPrepareNdoCheckDownstream(pTask); qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr, - streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); + streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); + streamTaskCheckDownstreamTasks(pTask); return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 97aa69bab6..c550b06acb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -817,11 +817,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { return -1; } + pTask->initTs = taosGetTimestampMs(); pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMeta = pTq->pStreamMeta; + // backup the initial status, and set it to be TASK_STATUS__INIT pTask->chkInfo.version = ver; pTask->chkInfo.currentVer = ver; @@ -880,7 +882,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { if (pTask->exec.pExecutor == NULL) { return -1; } - qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } @@ -963,28 +964,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } - SEncoder encoder; - int32_t code; - int32_t len; - - tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code); - if (code < 0) { - tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId); - return -1; - } - - void* buf = rpcMallocCont(sizeof(SMsgHead) + len); - ((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId); - - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncoderInit(&encoder, (uint8_t*)abuf, len); - tEncodeStreamTaskCheckRsp(&encoder, &rsp); - tEncoderClear(&encoder); - - SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info}; - - tmsgSendRsp(&rspMsg); - return 0; + return streamSendCheckRsp(pTq->pStreamMeta, &req, &rsp, &pMsg->info, pTask->id.taskId); } int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) { @@ -1062,13 +1042,11 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms } taosWUnLockLatch(&pStreamMeta->lock); - - // 3. It's an fill history task, do nothing. wait for the main task to start it - streamPrepareNdoCheckDownstream(pTask); - tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); + // 3. It's an fill history task, do nothing. wait for the main task to start it + streamTaskCheckDownstreamTasks(pTask); return 0; } @@ -1087,8 +1065,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } // do recovery step 1 - const char* pId = pTask->id.idStr; - tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pId, + const char* id = pTask->id.idStr; + tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus)); int64_t st = taosGetTimestampMs(); @@ -1104,14 +1082,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { - tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId); + tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); streamMetaReleaseTask(pMeta, pTask); return 0; } double el = (taosGetTimestampMs() - st) / 1000.0; - tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", pId, el); + tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", id, el); if (pTask->info.fillHistory) { SVersionRange* pRange = NULL; @@ -1125,7 +1103,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pTask->streamTaskId.taskId, pTask->id.idStr); pTask->status.taskStatus = TASK_STATUS__DROPPING; - tqDebug("s-task:%s scan-history-task set status to be dropping", pId); + tqDebug("s-task:%s scan-history-task set status to be dropping", id); streamMetaSaveTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask); @@ -1139,14 +1117,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { tqDebug( "s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms", - pId, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus)); + id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus)); taosMsleep(100); } // now we can stop the stream task execution pStreamTask->status.taskStatus = TASK_STATUS__HALT; tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr, - pStreamTask->info.taskLevel, pId); + pStreamTask->info.taskLevel, id); // if it's an source task, extract the last version in wal. streamHistoryTaskSetVerRangeStep2(pTask); @@ -1154,7 +1132,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (!streamTaskRecoverScanStep1Finished(pTask)) { tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s", - pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pId); + id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, id); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); st = taosGetTimestampMs(); @@ -1165,7 +1143,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamSourceScanHistoryData(pTask); if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { - tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId); + tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); streamMetaReleaseTask(pMeta, pTask); return 0; } @@ -1174,7 +1152,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } el = (taosGetTimestampMs() - st) / 1000.0; - tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pId, el); + tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", id, el); // 3. notify downstream tasks to transfer executor state after handle all history blocks. if (!pTask->status.transferState) { @@ -1191,7 +1169,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamTryExec(pTask); pTask->status.taskStatus = TASK_STATUS__DROPPING; - tqDebug("s-task:%s scan-history-task set status to be dropping", pId); + tqDebug("s-task:%s scan-history-task set status to be dropping", id); streamMetaSaveTask(pMeta, pTask); streamMetaSaveTask(pMeta, pStreamTask); @@ -1212,13 +1190,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (pTask->historyTaskId.taskId == 0) { *pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; - tqDebug("s-task:%s no related scan-history-data task, reset the time window:%" PRId64 " - %" PRId64, pId, - pWindow->skey, pWindow->ekey); + tqDebug( + "s-task:%s scan history in current time window completed, no related fill history task, reset the time " + "window:%" PRId64 " - %" PRId64, + id, pWindow->skey, pWindow->ekey); } else { tqDebug( - "s-task:%s history data in current time window scan completed, now start to handle data from WAL, start " + "s-task:%s scan history in current time window completed, now start to handle data from WAL, start " "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64, - pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey); + id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey); } // notify the downstream agg tasks that upstream tasks are ready to processing the WAL data, update the @@ -1452,31 +1432,56 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL return 0; } -int32_t tqProcessTaskPauseImpl(SStreamMeta* pStreamMeta, SStreamTask* pTask) { - if (pTask) { - if (!streamTaskShouldPause(&pTask->status)) { - tqDebug("vgId:%d s-task:%s set pause flag", pStreamMeta->vgId, pTask->id.idStr); - atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); - } - streamMetaReleaseTask(pStreamMeta, pTask); - } else { - return -1; +// todo rule out the status when pause not suitable. +static int32_t tqProcessTaskPauseImpl(SStreamMeta* pStreamMeta, SStreamTask* pTask) { + if (!streamTaskShouldPause(&pTask->status)) { + tqDebug("vgId:%d s-task:%s set pause flag", pStreamMeta->vgId, pTask->id.idStr); + atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); } + return 0; } int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); - int32_t code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pTask); + + SStreamMeta* pMeta = pTq->pStreamMeta; + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); + if (pTask == NULL) { + tqError("vgId:%d failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, + pReq->taskId); + return TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr); + + int32_t code = tqProcessTaskPauseImpl(pMeta, pTask); if (code != 0) { + streamMetaReleaseTask(pMeta, pTask); return code; } - SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId); - if (pHistoryTask) { - code = tqProcessTaskPauseImpl(pTq->pStreamMeta, pHistoryTask); + + SStreamTask* pHistoryTask = NULL; + if (pTask->historyTaskId.taskId != 0) { + pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); + if (pHistoryTask == NULL) { + tqError("vgId:%d failed to acquire fill-history task:0x%x, it may have been dropped already", pMeta->vgId, + pTask->historyTaskId.taskId); + + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + tqDebug("s-task:%s fill-history task handle pause along with related stream task", pHistoryTask->id.idStr); + code = tqProcessTaskPauseImpl(pMeta, pHistoryTask); } + + streamMetaReleaseTask(pMeta, pTask); + if (pHistoryTask != NULL) { + streamMetaReleaseTask(pMeta, pHistoryTask); + } + return code; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 5db3e735cc..0d9edbe5f4 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -80,11 +80,17 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) { continue; } - streamTaskCheckDownstreamTasks(pTask); + if (pTask->info.fillHistory == 1) { + tqDebug("s-task:%s fill-history task, wait for related stream task:0x%x to launch it", pTask->id.idStr, + pTask->streamTaskId.taskId); + continue; + } + + streamTaskDoCheckDownstreamTasks(pTask); streamMetaReleaseTask(pMeta, pTask); } - taosArrayDestroy(pTaskList); + taosArrayDestroy(pTaskList); return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a2b5d0e396..0ef9807f8a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -266,7 +266,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { if (!streamTaskShouldStop(&(*ppTask)->status)) { int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1); taosRUnLockLatch(&pMeta->lock); - qDebug("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref); + qTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref); return *ppTask; } } @@ -278,7 +278,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1); if (ref > 0) { - qDebug("s-task:%s release task, ref:%d", pTask->id.idStr, ref); + qTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref); } else if (ref == 0) { ASSERT(streamTaskShouldStop(&pTask->status)); tFreeStreamTask(pTask); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 43449a1a62..852622e5a1 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -17,6 +17,18 @@ #include "ttimer.h" #include "wal.h" +static void launchFillHistoryTask(SStreamTask* pTask); +static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); + +static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) { + ASSERT(pTask->status.downstreamReady == 0); + pTask->status.downstreamReady = 1; + int64_t el = (taosGetTimestampMs() - pTask->initTs); + + qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s", + pTask->id.idStr, numOfReqs, el, streamGetTaskStatusStr(pTask->status.taskStatus)); +} + int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) { SStreamScanHistoryReq req; streamBuildSourceRecover1Req(pTask, &req, igUntreated); @@ -51,9 +63,9 @@ const char* streamGetTaskStatusStr(int32_t status) { static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { SVersionRange* pRange = &pTask->dataRange.range; - qDebug("s-task:%s vgId:%d status:%s, start scan-history-data task, verRange:%" PRId64 " - %" PRId64, pTask->id.idStr, - pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), pRange->minVer, pRange->maxVer); - +// qDebug("s-task:%s vgId:%d status:%s, start scan-history-data task, verRange:%" PRId64 " - %" PRId64, pTask->id.idStr, +// pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), pRange->minVer, pRange->maxVer); +// streamSetParamForScanHistory(pTask); streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window); @@ -84,7 +96,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { } // check status -int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { +int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) { SHistDataRange* pRange = &pTask->dataRange; STimeWindow* pWindow = &pRange->window; @@ -129,11 +141,13 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { - pTask->status.downstreamReady = 1; - qDebug("s-task:%s (vgId:%d) no downstream tasks, set downstream checked, try to launch scan-history-data, status:%s", - pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus)); + qDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); + + streamTaskSetForReady(pTask, 0); + streamTaskSetRangeStreamCalc(pTask); streamTaskLaunchScanHistory(pTask); + launchFillHistoryTask(pTask); } return 0; @@ -171,7 +185,28 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p } int32_t streamTaskCheckStatus(SStreamTask* pTask) { - return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL? 1:0; + return (pTask->status.downstreamReady == 1)? 1:0; +} + +static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { + streamTaskSetForReady(pTask, numOfReqs); + const char* id = pTask->id.idStr; + + int8_t status = pTask->status.taskStatus; + const char* str = streamGetTaskStatusStr(status); + + ASSERT(status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__NORMAL); + streamTaskSetRangeStreamCalc(pTask); + + if (status == TASK_STATUS__SCAN_HISTORY) { + qDebug("s-task:%s enter into scan-history-data stage, status:%s", id, numOfReqs, str); + streamTaskLaunchScanHistory(pTask); + } else { + qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); + } + + // when current stream task is ready, check the related fill history task. + launchFillHistoryTask(pTask); } int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { @@ -201,17 +236,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs if (left == 0) { taosArrayDestroy(pTask->checkReqIds); pTask->checkReqIds = NULL; - pTask->status.downstreamReady = 1; - if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { - qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id, - numOfReqs, streamGetTaskStatusStr(pTask->status.taskStatus)); - streamTaskLaunchScanHistory(pTask); - } else { - ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL); - qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id, - streamGetTaskStatusStr(pTask->status.taskStatus)); - } + doProcessDownstreamReadyRsp(pTask, numOfReqs); } else { int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, @@ -223,19 +249,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return -1; } - // set the downstream tasks have been checked flag - ASSERT(pTask->status.downstreamReady == 0); - pTask->status.downstreamReady = 1; - - ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY || pTask->status.taskStatus == TASK_STATUS__NORMAL); - if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { - qDebug("s-task:%s fixed downstream task is ready, now enter into scan-history-data stage, status:%s", id, - streamGetTaskStatusStr(pTask->status.taskStatus)); - streamTaskLaunchScanHistory(pTask); - } else { - qDebug("s-task:%s fixed downstream task is ready, ready for data from inputQ, status:%s", id, - streamGetTaskStatusStr(pTask->status.taskStatus)); - } + doProcessDownstreamReadyRsp(pTask, 1); } } else { // not ready, wait for 100ms and retry qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId, @@ -248,6 +262,32 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return 0; } +int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, + SRpcHandleInfo *pRpcInfo, int32_t taskId) { + SEncoder encoder; + int32_t code; + int32_t len; + + tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code); + if (code < 0) { + qError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId); + return -1; + } + + void* buf = rpcMallocCont(sizeof(SMsgHead) + len); + ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId); + + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tEncoderInit(&encoder, (uint8_t*)abuf, len); + tEncodeStreamTaskCheckRsp(&encoder, pRsp); + tEncoderClear(&encoder); + + SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo}; + + tmsgSendRsp(&rspMsg); + return 0; +} + // common int32_t streamSetParamForScanHistory(SStreamTask* pTask) { qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr); @@ -434,7 +474,7 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { } // check if downstream tasks have been ready - streamTaskCheckDownstreamTasks(pHTask); + streamTaskDoCheckDownstreamTasks(pHTask); } typedef struct SStreamTaskRetryInfo { @@ -500,7 +540,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { // todo fix the bug: 2. race condition // an fill history task needs to be started. -int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask) { +int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; int32_t hTaskId = pTask->historyTaskId.taskId; @@ -675,40 +715,78 @@ int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistory return 0; } -// todo handle race condition, this task may be destroyed -void streamPrepareNdoCheckDownstream(SStreamTask* pTask) { - if (pTask->info.fillHistory) { - qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); +void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { + if (pTask->historyTaskId.taskId == 0) { + SHistDataRange* pRange = &pTask->dataRange; + qDebug("s-task:%s no related fill-history task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64 + " - %" PRId64, + pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); } else { - // calculate the correct start time window, and start the handle the history data for the main task. - if (pTask->historyTaskId.taskId != 0) { - // check downstream tasks for associated scan-history-data tasks - streamCheckHistoryTaskDownstream(pTask); + SHistDataRange* pRange = &pTask->dataRange; - // launch current task - SHistDataRange* pRange = &pTask->dataRange; - int64_t ekey = pRange->window.ekey + 1; - int64_t ver = pRange->range.minVer; + int64_t ekey = pRange->window.ekey + 1; + int64_t ver = pRange->range.minVer; - pRange->window.skey = ekey; - pRange->window.ekey = INT64_MAX; - pRange->range.minVer = 0; - pRange->range.maxVer = ver; + pRange->window.skey = ekey; + pRange->window.ekey = INT64_MAX; + pRange->range.minVer = 0; + pRange->range.maxVer = ver; - qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64 - ", ver range:%" PRId64 " - %" PRId64, - pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, - pRange->range.maxVer); - } else { - SHistDataRange* pRange = &pTask->dataRange; - qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64 - ", ver range:%" PRId64 " - %" PRId64, - pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); - } - - ASSERT(pTask->status.downstreamReady == 0); - - // check downstream tasks for itself - streamTaskCheckDownstreamTasks(pTask); + qDebug("s-task:%s level:%d related-fill-history task exists, update stream calc time window:%" PRId64 " - %" PRId64 + ", verRang:%" PRId64 " - %" PRId64, + pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, + pRange->range.maxVer); } } + +void launchFillHistoryTask(SStreamTask* pTask) { + int32_t tId = pTask->historyTaskId.taskId; + if (tId == 0) { + return; + } + + ASSERT(pTask->status.downstreamReady == 1); + qDebug("s-task:%s start to launch related fill-history task:0x%x", pTask->id.idStr, tId); + + // launch associated fill history task + streamLaunchFillHistoryTask(pTask); +} + +// todo handle race condition, this task may be destroyed +void streamTaskCheckDownstreamTasks(SStreamTask* pTask) { + if (pTask->info.fillHistory) { + qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); + return; + } + + // calculate the correct start time window, and start the handle the history data for the main task. +/* if (pTask->historyTaskId.taskId != 0) { + // check downstream tasks for associated scan-history-data tasks + streamLaunchFillHistoryTask(pTask); + + // launch current task + SHistDataRange* pRange = &pTask->dataRange; + int64_t ekey = pRange->window.ekey + 1; + int64_t ver = pRange->range.minVer; + + pRange->window.skey = ekey; + pRange->window.ekey = INT64_MAX; + pRange->range.minVer = 0; + pRange->range.maxVer = ver; + + qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64 + ", ver range:%" PRId64 " - %" PRId64, + pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, + pRange->range.maxVer); + } else { + SHistDataRange* pRange = &pTask->dataRange; + qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64 + " - %" PRId64, + pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); + }*/ + + ASSERT(pTask->status.downstreamReady == 0); + + // check downstream tasks for itself + streamTaskDoCheckDownstreamTasks(pTask); +}