fix(stream): not restart for reset task status.
This commit is contained in:
parent
998421e5ad
commit
db4a00c74e
|
@ -2701,8 +2701,8 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mInfo("vgId:%d meta-stored checkpointId for stream:0x%" PRIx64 " %s is:%" PRId64, req.nodeId, req.streamId,
|
mInfo("vgId:%d stream:0x%" PRIx64 " %s meta-stored checkpointId:%" PRId64 " stream:0x%" PRIx64 " %s", req.nodeId,
|
||||||
pStream->name, pStream->checkpointId);
|
req.streamId, pStream->name, pStream->checkpointId);
|
||||||
|
|
||||||
int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
|
int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
|
||||||
if ((pStream != NULL) && (pStream->checkpointId == 0)) { // not generated checkpoint yet, return 0 directly
|
if ((pStream != NULL) && (pStream->checkpointId == 0)) { // not generated checkpoint yet, return 0 directly
|
||||||
|
@ -2730,8 +2730,8 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (chkId == req.checkpointId) {
|
if (chkId == req.checkpointId) {
|
||||||
mDebug("vgId:%d stream:0x%" PRIx64 " %s consensus-checkpointId is:%" PRId64, req.nodeId, req.streamId,
|
mDebug("vgId:%d stream:0x%" PRIx64 " %s consensus-checkpointId is:%" PRId64 ", meta-stored checkpointId:%" PRId64,
|
||||||
pStream->name, pStream->checkpointId);
|
req.nodeId, req.streamId, pStream->name, chkId, pStream->checkpointId);
|
||||||
mndSendQuickConsensusChkptIdRsp(&req, TSDB_CODE_SUCCESS, req.streamId, chkId, &pMsg->info);
|
mndSendQuickConsensusChkptIdRsp(&req, TSDB_CODE_SUCCESS, req.streamId, chkId, &pMsg->info);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
|
|
@ -871,7 +871,6 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
|
tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
|
||||||
|
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
|
||||||
streamTaskClearCheckInfo(pTask, true);
|
streamTaskClearCheckInfo(pTask, true);
|
||||||
|
|
||||||
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
|
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
|
||||||
|
@ -886,9 +885,10 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
streamTaskSetStatusReady(pTask);
|
streamTaskSetStatusReady(pTask);
|
||||||
} else if (pState->state == TASK_STATUS__UNINIT) {
|
} else if (pState->state == TASK_STATUS__UNINIT) {
|
||||||
tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
|
// tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
// ASSERT(pTask->status.downstreamReady == 0);
|
||||||
tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);
|
// tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);
|
||||||
|
tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name);
|
||||||
} else {
|
} else {
|
||||||
tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name);
|
tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name);
|
||||||
}
|
}
|
||||||
|
|
|
@ -668,6 +668,7 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
streamTaskCompleteCheckRsp(pInfo, true, id);
|
streamTaskCompleteCheckRsp(pInfo, true, id);
|
||||||
|
|
||||||
// not record the failed of the current task if try to close current vnode
|
// not record the failed of the current task if try to close current vnode
|
||||||
|
// otherwise, the put of message operation may incur invalid read of message queue.
|
||||||
if (!pMeta->closeFlag) {
|
if (!pMeta->closeFlag) {
|
||||||
addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
|
addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
|
||||||
}
|
}
|
||||||
|
@ -676,7 +677,7 @@ void rspMonitorFn(void* param, void* tmrId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY || state == TASK_STATUS__PAUSE) {
|
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) {
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
|
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue