fix(stream): fix error.
This commit is contained in:
parent
8ed2836a2b
commit
c3e9590740
|
@ -82,8 +82,24 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
|
||||||
|
|
||||||
streamTaskResetUpstreamStageInfo(pTask);
|
streamTaskResetUpstreamStageInfo(pTask);
|
||||||
streamSetupScheduleTrigger(pTask);
|
streamSetupScheduleTrigger(pTask);
|
||||||
qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE,
|
|
||||||
pTask->id.idStr, pTask->chkInfo.checkpointVer, pTask->info.selfChildId, pTask->info.taskLevel);
|
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
|
||||||
|
// checkpoint ver is the kept version, handled data should be the next version.
|
||||||
|
if (pTask->chkInfo.checkpointId != 0) {
|
||||||
|
pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1;
|
||||||
|
qInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
|
||||||
|
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer);
|
||||||
|
} else {
|
||||||
|
if (pTask->chkInfo.currentVer == -1) {
|
||||||
|
pTask->chkInfo.currentVer = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
qInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
|
||||||
|
" child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
|
||||||
|
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer,
|
||||||
|
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||||
|
pTask->info.fillHistory, pTask->triggerParam);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -107,6 +123,10 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo fix it: send msg to mnode to rollback to an existed checkpoint, and broadcast the rollback msg to all other
|
||||||
|
// computing nodes.
|
||||||
|
pSnode->pMeta->stage = 0;
|
||||||
|
|
||||||
return pSnode;
|
return pSnode;
|
||||||
|
|
||||||
FAIL:
|
FAIL:
|
||||||
|
|
|
@ -270,7 +270,7 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
initRpcMsg(&msg, TDMT_VND_STREAM_TASK_CHECK, buf, tlen + sizeof(SMsgHead));
|
initRpcMsg(&msg, TDMT_VND_STREAM_TASK_CHECK, buf, tlen + sizeof(SMsgHead));
|
||||||
qDebug("s-task:%s (level:%d) dispatch check msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
|
qDebug("s-task:%s (level:%d) send check msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
|
||||||
pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId);
|
pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId);
|
||||||
|
|
||||||
tmsgSendReq(pEpSet, &msg);
|
tmsgSendReq(pEpSet, &msg);
|
||||||
|
|
|
@ -97,8 +97,9 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
|
||||||
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
||||||
if (pTask->info.fillHistory) {
|
if (pTask->info.fillHistory) {
|
||||||
streamSetParamForScanHistory(pTask);
|
streamSetParamForScanHistory(pTask);
|
||||||
|
streamTaskEnablePause(pTask);
|
||||||
}
|
}
|
||||||
streamTaskEnablePause(pTask);
|
|
||||||
streamTaskScanHistoryPrepare(pTask);
|
streamTaskScanHistoryPrepare(pTask);
|
||||||
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||||
qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
|
qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
|
||||||
|
@ -204,14 +205,15 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
|
||||||
ASSERT(pInfo != NULL);
|
ASSERT(pInfo != NULL);
|
||||||
|
|
||||||
if (stage == -1) {
|
if (stage == -1) {
|
||||||
qDebug("s-task:%s receive msg from upstream task:0x%x, invalid stageId:%" PRId64 ", not ready", pTask->id.idStr,
|
qDebug("s-task:%s receive check msg from upstream task:0x%x, invalid stageId:%" PRId64 ", not ready", pTask->id.idStr,
|
||||||
upstreamTaskId, stage);
|
upstreamTaskId, stage);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->stage == -1) {
|
if (pInfo->stage == -1) {
|
||||||
pInfo->stage = stage;
|
pInfo->stage = stage;
|
||||||
qDebug("s-task:%s receive msg from upstream task:0x%x, init stage value:%"PRId64, pTask->id.idStr, upstreamTaskId, stage);
|
qDebug("s-task:%s receive check msg from upstream task:0x%x, init stage value:%" PRId64, pTask->id.idStr,
|
||||||
|
upstreamTaskId, stage);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->stage < stage) {
|
if (pInfo->stage < stage) {
|
||||||
|
@ -424,14 +426,14 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
|
||||||
ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK);
|
ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK);
|
||||||
|
|
||||||
if (pTask->status.taskStatus != TASK_STATUS__SCAN_HISTORY) {
|
if (pTask->status.taskStatus != TASK_STATUS__SCAN_HISTORY) {
|
||||||
qError("s-task:%s not in scan-history status, return upstream:0x%x scan-history finish directly", pTask->id.idStr,
|
qError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly",
|
||||||
pReq->upstreamTaskId);
|
pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->upstreamTaskId);
|
||||||
|
|
||||||
void* pBuf = NULL;
|
void* pBuf = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
SRpcMsg msg = {0};
|
|
||||||
|
|
||||||
streamTaskBuildScanhistoryRspMsg(pTask, pReq, &pBuf, &len);
|
streamTaskBuildScanhistoryRspMsg(pTask, pReq, &pBuf, &len);
|
||||||
|
|
||||||
|
SRpcMsg msg = {.info = *pRpcInfo};
|
||||||
initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len);
|
initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len);
|
||||||
|
|
||||||
tmsgSendRsp(&msg);
|
tmsgSendRsp(&msg);
|
||||||
|
@ -466,7 +468,10 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
|
||||||
streamTaskEnablePause(pTask);
|
streamTaskEnablePause(pTask);
|
||||||
int32_t code = streamTaskScanHistoryDataComplete(pTask);
|
int32_t code = streamTaskScanHistoryDataComplete(pTask);
|
||||||
} else { // for sink task, set normal
|
} else { // for sink task, set normal
|
||||||
streamSetStatusNormal(pTask);
|
if (pTask->status.taskStatus != TASK_STATUS__PAUSE && pTask->status.taskStatus != TASK_STATUS__STOP &&
|
||||||
|
pTask->status.taskStatus != TASK_STATUS__DROPPING) {
|
||||||
|
streamSetStatusNormal(pTask);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
|
qDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
|
||||||
|
|
Loading…
Reference in New Issue