diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index 6ed97ac547..223fd3c3b7 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -339,7 +339,7 @@ struct SStreamTask {
SStreamState* pState; // state backend
// the followings attributes don't be serialized
- int32_t recoverTryingDownstream;
+ int32_t notReadyTasks;
int32_t numOfWaitingUpstream;
int64_t checkReqId;
SArray* checkReqIds; // shuffle
@@ -576,7 +576,7 @@ int32_t streamTaskStartHistoryTask(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
// common
-int32_t streamSetParamForRecover(SStreamTask* pTask);
+int32_t streamSetParamForScanHistoryData(SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status);
@@ -585,8 +585,7 @@ const char* streamGetTaskStatusStr(int32_t status);
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq);
int32_t streamSourceRecoverScanStep1(SStreamTask* pTask);
-int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq);
-int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
+//int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c
index 41fb50a0df..42bb606c4e 100644
--- a/source/dnode/snode/src/snode.c
+++ b/source/dnode/snode/src/snode.c
@@ -165,7 +165,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
// 3.go through recover steps to fill history
if (pTask->info.fillHistory) {
- streamSetParamForRecover(pTask);
+ streamSetParamForScanHistoryData(pTask);
streamAggRecoverPrepare(pTask);
}
diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h
index b3a28aea43..aeb35d9ad5 100644
--- a/source/libs/stream/inc/streamInc.h
+++ b/source/libs/stream/inc/streamInc.h
@@ -49,7 +49,7 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData);
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
-int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
+int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c
index 95f68cce2d..d48bfbb189 100644
--- a/source/libs/stream/src/streamDispatch.c
+++ b/source/libs/stream/src/streamDispatch.c
@@ -13,7 +13,7 @@
* along with this program. If not, see .
*/
-#include
+#include "ttimer.h"
#include "streamInc.h"
#define MAX_BLOCK_NAME_NUM 1024
@@ -274,8 +274,8 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
return 0;
}
-int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
- SEpSet* pEpSet) {
+int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId,
+ SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
SRpcMsg msg = {0};
diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c
index 82b9941c96..f59ffa68a0 100644
--- a/source/libs/stream/src/streamRecover.c
+++ b/source/libs/stream/src/streamRecover.c
@@ -33,9 +33,9 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
qDebug("s-task:%s vgId:%d task status:%s and start to scan-history-data task, ver:%" PRId64 " - %" PRId64,
pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus),
- pTask->dataRange.range.minVer, pTask->dataRange.range.maxVer);
+ pRange->minVer, pRange->maxVer);
- streamSetParamForRecover(pTask);
+ streamSetParamForScanHistoryData(pTask);
streamSourceRecoverPrepareStep1(pTask, pRange, &pTask->dataRange.window);
SStreamRecoverStep1Req req;
@@ -69,11 +69,11 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
}
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
streamSetStatusNormal(pTask);
- streamSetParamForRecover(pTask);
+ streamSetParamForScanHistoryData(pTask);
streamAggRecoverPrepare(pTask);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
streamSetStatusNormal(pTask);
- qDebug("s-task:%s sink task convert to normal status immediately", pTask->id.idStr);
+ qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr);
}
return 0;
@@ -107,7 +107,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
- pTask->recoverTryingDownstream = numOfVgs;
+ pTask->notReadyTasks = numOfVgs;
pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
for (int32_t i = 0; i < numOfVgs; i++) {
@@ -130,7 +130,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
return 0;
}
-int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
+int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
SStreamTaskCheckReq req = {
.reqId = pRsp->reqId,
.streamId = pRsp->streamId,
@@ -186,7 +186,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
return -1;
}
- int32_t left = atomic_sub_fetch_32(&pTask->recoverTryingDownstream, 1);
+ int32_t left = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
ASSERT(left >= 0);
if (left == 0) {
@@ -203,8 +203,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
streamGetTaskStatusStr(pTask->status.taskStatus));
}
} else {
- qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, remain not ready:%d", id,
- pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, left);
+ int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
+ qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
+ pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
}
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
if (pRsp->reqId != pTask->checkReqId) {
@@ -212,8 +213,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
}
ASSERT(pTask->status.checkDownstream == 0);
-
pTask->status.checkDownstream = 1;
+
ASSERT(pTask->status.taskStatus != TASK_STATUS__HALT);
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
@@ -233,17 +234,18 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
pRsp->downstreamNodeId);
taosMsleep(100);
- streamRecheckOneDownstream(pTask, pRsp);
+ streamRecheckDownstream(pTask, pRsp);
}
return 0;
}
// common
-int32_t streamSetParamForRecover(SStreamTask* pTask) {
+int32_t streamSetParamForScanHistoryData(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
return qStreamSetParamForRecover(exec);
}
+
int32_t streamRestoreParam(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
return qStreamRestoreParam(exec);
@@ -272,13 +274,6 @@ int32_t streamSourceRecoverScanStep1(SStreamTask* pTask) {
return streamScanExec(pTask, 100);
}
-int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq) {
- pReq->msgHead.vgId = pTask->info.nodeId;
- pReq->streamId = pTask->id.streamId;
- pReq->taskId = pTask->id.taskId;
- return 0;
-}
-
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
void* exec = pTask->exec.pExecutor;
const char* id = pTask->id.idStr;
@@ -305,7 +300,7 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
pTask->fixedEpDispatcher.taskId, streamGetTaskStatusStr(pTask->status.taskStatus));
req.taskId = pTask->fixedEpDispatcher.taskId;
- streamDoDispatchRecoverFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
+ streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
@@ -315,7 +310,7 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.taskId = pVgInfo->taskId;
- streamDoDispatchRecoverFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
+ streamDoDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
}