fix(stream): add lock for when set checkpoint dispatch msg.
This commit is contained in:
parent
9ca84091df
commit
24559725da
|
@ -850,12 +850,18 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
|
tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
|
||||||
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
|
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
|
||||||
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) {
|
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) {
|
||||||
|
tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d",
|
||||||
|
pTask->id.idStr, pTask->chkInfo.checkpointingId, pTask->chkInfo.transId);
|
||||||
streamTaskClearCheckInfo(pTask, true);
|
streamTaskClearCheckInfo(pTask, true);
|
||||||
streamTaskSetStatusReady(pTask);
|
streamTaskSetStatusReady(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -947,9 +947,14 @@ void streamClearChkptReadyMsg(SStreamTask* pTask) {
|
||||||
// this message has been sent successfully, let's try next one.
|
// this message has been sent successfully, let's try next one.
|
||||||
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
|
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
|
||||||
stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData);
|
stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData);
|
||||||
|
|
||||||
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
||||||
if (delayDispatch) {
|
if (delayDispatch) {
|
||||||
pTask->chkInfo.dispatchCheckpointTrigger = true;
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) {
|
||||||
|
pTask->chkInfo.dispatchCheckpointTrigger = true;
|
||||||
|
}
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
clearBufferedDispatchMsg(pTask);
|
clearBufferedDispatchMsg(pTask);
|
||||||
|
|
|
@ -543,8 +543,6 @@ void streamTaskSetStatusReady(SStreamTask* pTask) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexLock(&pTask->lock);
|
|
||||||
|
|
||||||
pSM->prev.state = pSM->current;
|
pSM->prev.state = pSM->current;
|
||||||
pSM->prev.evt = 0;
|
pSM->prev.evt = 0;
|
||||||
|
|
||||||
|
@ -552,8 +550,6 @@ void streamTaskSetStatusReady(SStreamTask* pTask) {
|
||||||
pSM->startTs = taosGetTimestampMs();
|
pSM->startTs = taosGetTimestampMs();
|
||||||
pSM->pActiveTrans = NULL;
|
pSM->pActiveTrans = NULL;
|
||||||
taosArrayClear(pSM->pWaitingEventList);
|
taosArrayClear(pSM->pWaitingEventList);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn,
|
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn,
|
||||||
|
|
Loading…
Reference in New Issue