diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 925bfa24c6..9a72f785ae 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -602,7 +602,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask); // agg level int32_t streamAggRecoverPrepare(SStreamTask* pTask); -int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId); +int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId); void streamMetaInit(); void streamMetaCleanup(); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 7146ba3f5b..d2fcf166b4 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -292,7 +292,7 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { return -1; } // do process request - if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) { + if (streamProcessRecoverFinishReq(pTask, req.taskId, req.childId) < 0) { streamMetaReleaseTask(pSnode->pMeta, pTask); return -1; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f3e0bcaa0c..d474f0ca90 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -894,8 +894,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { streamSetupTrigger(pTask); - tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, - pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel); + tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d, scan-history:%d", + vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel, + pTask->info.fillHistory); // next valid version will add one pTask->chkInfo.version += 1; @@ -1072,7 +1073,9 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } // do recovery step 1 - tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus)); + const char* pId = pTask->id.idStr; + tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pId, + streamGetTaskStatusStr(pTask->status.taskStatus)); int64_t st = taosGetTimestampMs(); int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, @@ -1085,15 +1088,16 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) { streamSourceScanHistoryData(pTask); } + if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { - tqDebug("s-task:%s is dropped or paused, abort recover in step1", pTask->id.idStr); + tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); streamMetaReleaseTask(pMeta, pTask); return 0; } double el = (taosGetTimestampMs() - st) / 1000.0; - tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el); + tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", pId, el); if (pTask->info.fillHistory) { SVersionRange* pRange = NULL; @@ -1118,7 +1122,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // now we can stop the stream task execution pStreamTask->status.taskStatus = TASK_STATUS__HALT; tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr, - pStreamTask->info.taskLevel, pTask->id.idStr); + pStreamTask->info.taskLevel, pId); // if it's an source task, extract the last version in wal. int64_t ver = pTask->dataRange.range.maxVer + 1; @@ -1135,14 +1139,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (pRange->minVer == pRange->maxVer) { streamTaskRecoverSetAllStepFinished(pTask); tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no new data ingest", - pTask->id.idStr); + pId); } } if (!streamTaskRecoverScanStep1Finished(pTask)) { tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " do secondary scan-history-data after halt the related stream task:%s", - pTask->id.idStr, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pStreamTask->id.idStr); + pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pStreamTask->id.idStr); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); st = taosGetTimestampMs(); @@ -1153,7 +1157,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if(!streamTaskRecoverScanStep2Finished(pTask)) { streamSourceScanHistoryData(pTask); if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { - tqDebug("s-task:%s is dropped or paused, abort recover in step1", pTask->id.idStr); + tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId); streamMetaReleaseTask(pMeta, pTask); return 0; } @@ -1161,12 +1165,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } el = (taosGetTimestampMs() - st) / 1000.0; - tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pTask->id.idStr, el); + tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pId, el); if (!pTask->status.transferState) { // 3. notify the downstream tasks to transfer executor state after handle all history blocks. pTask->status.transferState = true; - code = streamDispatchTransferStateMsg(pTask); if (code != TSDB_CODE_SUCCESS) { // todo handle error @@ -1178,7 +1181,9 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamTryExec(pTask); pTask->status.taskStatus = TASK_STATUS__DROPPING; - tqDebug("s-task:%s set status to be dropping", pTask->id.idStr); + + tqDebug("s-task:%s set status to be dropping", pId); + // transfer the ownership of executor state streamTaskReleaseState(pTask); streamTaskReloadState(pStreamTask); @@ -1195,19 +1200,25 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // todo update the chkInfo version for current task. // this task has an associated history stream task, so we need to scan wal from the end version of // history scan. The current version of chkInfo.current is not updated during the history scan + STimeWindow* pWindow = &pTask->dataRange.window; + if (pTask->historyTaskId.taskId == 0) { - pTask->dataRange.window.ekey = INT64_MAX; - pTask->dataRange.window.skey = INT64_MIN; - tqDebug("s-task:%s without associated stream task, reset the time window:%"PRId64" - %"PRId64, pTask->id.idStr, - pTask->dataRange.window.skey, pTask->dataRange.window.ekey); + *pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; + tqDebug("s-task:%s without associated stream task, reset the time window:%" PRId64 " - %" PRId64, pId, + pWindow->skey, pWindow->ekey); } else { tqDebug("s-task:%s history data scan completed, now start to scan data from wal, start ver:%" PRId64 - ", window:%" PRId64 " - %" PRId64, - pTask->id.idStr, pTask->chkInfo.currentVer, pTask->dataRange.window.skey, pTask->dataRange.window.ekey); + ", window:%" PRId64 " - %" PRId64, pId, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey); } code = streamTaskScanHistoryDataComplete(pTask); streamMetaReleaseTask(pMeta, pTask); + + // let's start the stream task by extracting data from wal + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + tqStartStreamTasks(pTq); + } + return code; } @@ -1311,7 +1322,7 @@ int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) { return -1; } // do process request - if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) { + if (streamProcessRecoverFinishReq(pTask, req.taskId, req.childId) < 0) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); return -1; } @@ -1402,9 +1413,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { pTask->chkInfo.version); streamProcessRunReq(pTask); } else { - if (streamTaskShouldPause(&pTask->status)) { +// if (streamTaskShouldPause(&pTask->status)) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - } +// } tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId, pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); @@ -1543,19 +1554,23 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms } int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { - char* msgStr = pMsg->pCont; - char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + char* msgStr = pMsg->pCont; + char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + SDecoder decoder; + SStreamRetrieveReq req; - SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecodeStreamRetrieveReq(&decoder, &req); tDecoderClear(&decoder); + int32_t taskId = req.dstTaskId; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); + if (pTask) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; streamProcessRetrieveReq(pTask, &req, &rsp); + streamMetaReleaseTask(pTq->pStreamMeta, pTask); tDeleteStreamRetrieveReq(&req); return 0; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 5043486135..a1cc74054a 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -170,21 +170,18 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, // enqueue if (pData != NULL) { - qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x, reqId %" PRId64, pTask->id.idStr, pTask->info.selfChildId, + qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x, reqId:0x%" PRIx64, pTask->id.idStr, pTask->info.selfChildId, pReq->srcTaskId, pReq->reqId); pData->type = STREAM_INPUT__DATA_RETRIEVE; pData->srcVgId = 0; - // decode - /*pData->blocks = pReq->data;*/ - /*pBlock->sourceVer = pReq->sourceVer;*/ streamRetrieveReqToData(pReq, pData); if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) { status = TASK_INPUT_STATUS__NORMAL; } else { status = TASK_INPUT_STATUS__FAILED; } - } else { + } else { // todo handle oom /*streamTaskInputFail(pTask);*/ /*status = TASK_INPUT_STATUS__FAILED;*/ } @@ -199,6 +196,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, pRsp->pCont = buf; pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp); tmsgSendRsp(pRsp); + return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index c42320ad13..358932cf20 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -314,7 +314,10 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRe msg.info.noResp = 1; tmsgSendReq(pEpSet, &msg); - qDebug("s-task:%s dispatch scan-history-data finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pReq->taskId, vgId); + + const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + qDebug("s-task:%s status:%s dispatch scan-history-data finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus, + pReq->taskId, vgId); return 0; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 9f2d6b5908..a025d01004 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -86,8 +86,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { // check status int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { SHistDataRange* pRange = &pTask->dataRange; - qDebug("s-task:%s check downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, - pTask->id.idStr, pRange->range.minVer, pRange->range.maxVer, pRange->window.skey, pRange->window.ekey); + STimeWindow* pWindow = &pRange->window; SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, @@ -98,13 +97,16 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { // serialize if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + qDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64 + "-%" PRId64, + pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer, + pWindow->skey, pWindow->ekey); + req.reqId = tGenIdPI64(); req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId; req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; pTask->checkReqId = req.reqId; - qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d)", pTask->id.idStr, pTask->info.nodeId, req.downstreamTaskId, - req.downstreamNodeId); streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; @@ -113,14 +115,17 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { pTask->notReadyTasks = numOfVgs; pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t)); + qDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64, + pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey); + for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); req.reqId = tGenIdPI64(); taosArrayPush(pTask->checkReqIds, &req.reqId); req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; - qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle)", pTask->id.idStr, pTask->info.nodeId, - req.downstreamTaskId, req.downstreamNodeId); + qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle), idx:%d", pTask->id.idStr, pTask->info.nodeId, + req.downstreamTaskId, req.downstreamNodeId, i); streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { @@ -303,9 +308,6 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { // serialize if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - qDebug("s-task:%s send scan-history-data complete msg to downstream (fix-dispatch) to taskId:0x%x, status:%s", pTask->id.idStr, - pTask->fixedEpDispatcher.taskId, streamGetTaskStatusStr(pTask->status.taskStatus)); - req.taskId = pTask->fixedEpDispatcher.taskId; streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -393,7 +395,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { // agg int32_t streamAggRecoverPrepare(SStreamTask* pTask) { pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList); - qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete fill history procedure", pTask->id.idStr, + qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete scan-history procedure", pTask->id.idStr, pTask->numOfWaitingUpstream); return 0; } @@ -412,7 +414,7 @@ int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) { return 0; } -int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) { +int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId) { if (pTask->info.taskLevel == TASK_LEVEL__AGG) { int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1); ASSERT(left >= 0); @@ -422,7 +424,8 @@ int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) { qDebug("s-task:%s all %d upstream tasks finish scan-history data", pTask->id.idStr, numOfTasks); streamAggUpstreamScanHistoryFinish(pTask); } else { - qDebug("s-task:%s remain unfinished upstream tasks:%d", pTask->id.idStr, left); + qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", + pTask->id.idStr, taskId, childId, left); } } @@ -563,11 +566,12 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { } ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY); - /*code = */streamSetStatusNormal(pTask); + // ready to process data from inputQ + streamSetStatusNormal(pTask); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - // todo check rsp + // todo check rsp, commit data streamMetaSaveTask(pMeta, pTask); return 0; } diff --git a/tests/script/tsim/stream/sliding.sim b/tests/script/tsim/stream/sliding.sim index 05eb7dacba..7617233dc6 100644 --- a/tests/script/tsim/stream/sliding.sim +++ b/tests/script/tsim/stream/sliding.sim @@ -576,7 +576,6 @@ $loop_count = 0 print step 7 - sql create database test3 vgroups 6; sql use test3; sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);