diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 29759bc561..cb10aeb6a0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -801,7 +801,7 @@ void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts); // timer int32_t streamTimerGetInstance(tmr_h* pTmr); -void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId, +void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void* pHandle, tmr_h* pTmrId, int32_t vgId, const char* pMsg); void streamTmrStop(tmr_h tmrId); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 76e74db33f..2688617823 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -303,13 +303,8 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start check-rsp monitor, ref:%d ", pTask->id.idStr, ref); - - if (pInfo->checkRspTmr == NULL) { - pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer); - } else { - streamTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId, - "check-status-monitor"); - } + streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId, + "check-status-monitor"); streamMutexUnlock(&pInfo->checkInfoLock); } @@ -860,7 +855,7 @@ void rspMonitorFn(void* param, void* tmrId) { handleTimeoutDownstreamTasks(pTask, pTimeoutList); } - streamTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId, + streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId, "check-status-monitor"); streamMutexUnlock(&pInfo->checkInfoLock); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 1a8b5c1028..769116264d 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -347,12 +347,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref); streamMetaAcquireOneTask(pTask); - - if (pTmrInfo->tmrHandle == NULL) { - pTmrInfo->tmrHandle = taosTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer); - } else { - streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); - } + streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + "trigger-recv-monitor"); pTmrInfo->launchChkptId = pActiveInfo->activeId; } else { // already launched, do nothing stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr); @@ -893,48 +889,11 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { return code; } -void checkpointTriggerMonitorFn(void* param, void* tmrId) { - SStreamTask* pTask = param; - int32_t vgId = pTask->pMeta->vgId; - int64_t now = taosGetTimestampMs(); - const char* id = pTask->id.idStr; - +static int32_t doChkptStatusCheck(SStreamTask* pTask) { + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; - SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; - - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, ref:%d quit", id, ref); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - // check the status every 100ms - if (streamTaskShouldStop(pTask)) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - if (++pTmrInfo->activeCounter < 50) { - streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); - return; - } - - pTmrInfo->activeCounter = 0; - stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now); - - streamMutexLock(&pTask->lock); - SStreamTaskState pState = streamTaskGetStatus(pTask); - if (pState.state != TASK_STATUS__CK) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); - - streamMutexUnlock(&pTask->lock); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; // checkpoint-trigger recv flag is set, quit if (pActiveInfo->allUpstreamTriggerRecv) { @@ -942,48 +901,44 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); - streamMutexUnlock(&pTask->lock); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - streamMutexUnlock(&pTask->lock); - - streamMutexLock(&pActiveInfo->lock); - - // send msg to retrieve checkpoint trigger msg - SArray* pList = pTask->upstreamInfo.pList; - SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo)); - if (pNotSendList == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno)); - streamMutexUnlock(&pActiveInfo->lock); - - stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id); - streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); - return; +// streamMutexUnlock(&pTask->lock); +// streamMetaReleaseTask(pTask->pMeta, pTask); + return -1; } if ((pTmrInfo->launchChkptId != pActiveInfo->activeId) || (pActiveInfo->activeId == 0)) { - streamMutexUnlock(&pActiveInfo->lock); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64 ", quit, ref:%d", id, vgId, pTmrInfo->launchChkptId, ref); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; +// streamMutexUnlock(&pActiveInfo->lock); +// streamMetaReleaseTask(pTask->pMeta, pTask); + return -1; } // active checkpoint info is cleared for now if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) { - streamMutexUnlock(&pActiveInfo->lock); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr, ref:%d", id, vgId, ref); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; +// streamMutexUnlock(&pActiveInfo->lock); +// streamMetaReleaseTask(pTask->pMeta, pTask); + return -1; + } + + return 0; +} + +static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray** ppNotSendList) { + const char* id = pTask->id.idStr; + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + + SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo)); + if (pNotSendList == NULL) { + stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno)); + return terrno; } for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { @@ -1007,13 +962,87 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { void* px = taosArrayPush(pNotSendList, pInfo); if (px == NULL) { stError("s-task:%s failed to record not send info, code: out of memory", id); + taosArrayDestroy(pNotSendList); + return terrno; } } } + *ppNotSendList = pNotSendList; + return 0; +} + +void checkpointTriggerMonitorFn(void* param, void* tmrId) { + SStreamTask* pTask = param; + int32_t vgId = pTask->pMeta->vgId; + int64_t now = taosGetTimestampMs(); + const char* id = pTask->id.idStr; + + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; + + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, ref:%d quit", id, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + // check the status every 100ms + if (streamTaskShouldStop(pTask)) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + if (++pTmrInfo->activeCounter < 50) { + streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); + return; + } + + pTmrInfo->activeCounter = 0; + stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now); + + streamMutexLock(&pTask->lock); + SStreamTaskState state = streamTaskGetStatus(pTask); + streamMutexUnlock(&pTask->lock); + + if (state.state != TASK_STATUS__CK) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stDebug("s-task:%s vgId:%d status:%s not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, + vgId, state.name, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + streamMutexLock(&pActiveInfo->lock); + + int32_t code = doChkptStatusCheck(pTask); + if (code) { + streamMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + // send msg to retrieve checkpoint trigger msg + SArray* pList = pTask->upstreamInfo.pList; + SArray* pNotSendList = NULL; + + code = doFindNotSendUpstream(pTask, pList, &pNotSendList); + if (code) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr, ref:%d", id, tstrerror(code), ref); + streamMutexUnlock(&pActiveInfo->lock); + streamMetaReleaseTask(pTask->pMeta, pTask); + + taosArrayDestroy(pNotSendList); + return; + } + // do send retrieve checkpoint trigger msg to upstream int32_t size = taosArrayGetSize(pNotSendList); - int32_t code = doSendRetrieveTriggerMsg(pTask, pNotSendList); + code = doSendRetrieveTriggerMsg(pTask, pNotSendList); if (code) { stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code)); } @@ -1023,7 +1052,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { // check every 100ms if (size > 0) { stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id); - streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); + streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); } else { int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 5f7b8dd39e..0f0015c7d9 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -634,12 +634,8 @@ static void doMonitorDispatchData(void* param, void* tmrId) { void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration) { int32_t vgId = pTask->pMeta->vgId; - if (pTask->msgInfo.pRetryTmr != NULL) { - streamTmrReset(doMonitorDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr, vgId, - "dispatch-monitor-tmr"); - } else { - pTask->msgInfo.pRetryTmr = taosTmrStart(doMonitorDispatchData, waitDuration, pTask, streamTimer); - } + streamTmrStart(doMonitorDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr, vgId, + "dispatch-monitor"); } int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int64_t groupId, @@ -888,77 +884,42 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32 return TSDB_CODE_SUCCESS; } -static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { - SStreamTask* pTask = param; - int32_t vgId = pTask->pMeta->vgId; - const char* id = pTask->id.idStr; +static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, int32_t num) { SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; + const char* id = pTask->id.idStr; + int32_t vgId = pTask->pMeta->vgId; - // check the status every 100ms - if (streamTaskShouldStop(pTask)) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - if (++pTmrInfo->activeCounter < 50) { - streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, - "chkpt-ready-monitor"); - return; - } - - pTmrInfo->activeCounter = 0; - stDebug("s-task:%s in sending checkpoint-ready msg monitor tmr", id); - - streamMutexLock(&pTask->lock); - SStreamTaskState pState = streamTaskGetStatus(pTask); - if (pState.state != TASK_STATUS__CK) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId, - pState.name, ref); - streamMutexUnlock(&pTask->lock); - streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - streamMutexUnlock(&pTask->lock); - - streamMutexLock(&pActiveInfo->lock); - - SArray* pList = pActiveInfo->pReadyMsgList; - int32_t num = taosArrayGetSize(pList); if (pTmrInfo->launchChkptId != pActiveInfo->activeId) { - streamMutexUnlock(&pActiveInfo->lock); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64 ", quit, ref:%d", id, vgId, pTmrInfo->launchChkptId, ref); - - streamMetaReleaseTask(pTask->pMeta, pTask); - return; + return -1; } // active checkpoint info is cleared for now if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) { - streamMutexUnlock(&pActiveInfo->lock); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr, ref:%d", id, vgId, ref); - - streamMetaReleaseTask(pTask->pMeta, pTask); - return; + return -1; } - SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t)); - if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) { - streamMutexUnlock(&pActiveInfo->lock); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr, ref:%d", id, vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num, ref); + return -1; + } - streamMetaReleaseTask(pTask->pMeta, pTask); - return; + return 0; +} + +static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, int32_t num, int32_t vgId, int32_t level, + const char* id) { + SArray* pTmp = taosArrayInit(4, sizeof(int32_t)); + if (pTmp == NULL) { + return terrno; } for (int32_t i = 0; i < num; ++i) { @@ -971,63 +932,138 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { continue; } - void* p = taosArrayPush(pNotRspList, &pInfo->upstreamTaskId); + void* p = taosArrayPush(pTmp, &pInfo->upstreamTaskId); if (p == NULL) { stError("s-task:%s vgId:%d failed to record not rsp task, code: out of memory", id, vgId); + return terrno; } else { stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId, - pTask->info.taskLevel, pInfo->upstreamTaskId); + level, pInfo->upstreamTaskId); } } - int32_t checkpointId = pActiveInfo->activeId; + *ppNotRspList = pTmp; + return 0; +} - int32_t notRsp = taosArrayGetSize(pNotRspList); - if (notRsp > 0) { // send checkpoint-ready msg again - for (int32_t i = 0; i < taosArrayGetSize(pNotRspList); ++i) { - int32_t* pTaskId = taosArrayGet(pNotRspList, i); - if (pTaskId == NULL) { +static void doSendChkptReadyMsg(SStreamTask* pTask, SArray* pNotRspList, int64_t checkpointId, SArray* pReadyList) { + int32_t code = 0; + int32_t num = taosArrayGetSize(pReadyList); + const char* id = pTask->id.idStr; + + for (int32_t i = 0; i < taosArrayGetSize(pNotRspList); ++i) { + int32_t* pTaskId = taosArrayGet(pNotRspList, i); + if (pTaskId == NULL) { + continue; + } + + for (int32_t j = 0; j < num; ++j) { + STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pReadyList, j); + if (pReadyInfo == NULL) { continue; } - for (int32_t j = 0; j < num; ++j) { - STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pList, j); - if (pReadyInfo == NULL) { - continue; - } + if (*pTaskId == pReadyInfo->upstreamTaskId) { // send msg again - if (*pTaskId == pReadyInfo->upstreamTaskId) { // send msg again - - SRpcMsg msg = {0}; - int32_t code = initCheckpointReadyMsg(pTask, pReadyInfo->upstreamNodeId, pReadyInfo->upstreamTaskId, - pReadyInfo->childId, checkpointId, &msg); + SRpcMsg msg = {0}; + code = initCheckpointReadyMsg(pTask, pReadyInfo->upstreamNodeId, pReadyInfo->upstreamTaskId, + pReadyInfo->childId, checkpointId, &msg); + if (code == TSDB_CODE_SUCCESS) { + code = tmsgSendReq(&pReadyInfo->upstreamNodeEpset, &msg); if (code == TSDB_CODE_SUCCESS) { - code = tmsgSendReq(&pReadyInfo->upstreamNodeEpset, &msg); - if (code == TSDB_CODE_SUCCESS) { - stDebug("s-task:%s level:%d checkpoint-ready msg sent to upstream:0x%x again", id, pTask->info.taskLevel, - pReadyInfo->upstreamTaskId); - } else { - stError("s-task:%s failed to send checkpoint-ready msg, try nex time in 10s", id); - } + stDebug("s-task:%s level:%d checkpoint-ready msg sent to upstream:0x%x again", id, pTask->info.taskLevel, + pReadyInfo->upstreamTaskId); } else { - stError("s-task:%s failed to prepare the checkpoint-ready msg, try nex time in 10s", id); + stError("s-task:%s failed to send checkpoint-ready msg, try nex time in 10s", id); } + } else { + stError("s-task:%s failed to prepare the checkpoint-ready msg, try nex time in 10s", id); } } } + } +} - streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, +static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { + SStreamTask* pTask = param; + int32_t vgId = pTask->pMeta->vgId; + const char* id = pTask->id.idStr; + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; + SArray* pNotRspList = NULL; + + // check the status every 100ms + if (streamTaskShouldStop(pTask)) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + if (++pTmrInfo->activeCounter < 50) { + streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + "chkpt-ready-monitor"); + return; + } + + // reset tmr + pTmrInfo->activeCounter = 0; + stDebug("s-task:%s in sending checkpoint-ready msg monitor tmr", id); + + streamMutexLock(&pTask->lock); + SStreamTaskState state = streamTaskGetStatus(pTask); + streamMutexUnlock(&pTask->lock); + + // 1. check status in the first place + if (state.state != TASK_STATUS__CK) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready, ref:%d", id, vgId, + state.name, ref); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + streamMutexLock(&pActiveInfo->lock); + + SArray* pList = pActiveInfo->pReadyMsgList; + int32_t num = taosArrayGetSize(pList); + int32_t code = doTaskChkptStatusCheck(pTask, num); + if (code) { + streamMutexUnlock(&pActiveInfo->lock); + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } + + code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id); + if (code) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr, ref:%d", id, + tstrerror(code), ref); + streamMutexUnlock(&pActiveInfo->lock); + streamMetaReleaseTask(pTask->pMeta, pTask); + + taosArrayDestroy(pNotRspList); + return; + } + + int32_t checkpointId = pActiveInfo->activeId; + int32_t notRsp = taosArrayGetSize(pNotRspList); + doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList); + + if (notRsp > 0) { // send checkpoint-ready msg again + streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "chkpt-ready-monitor"); streamMutexUnlock(&pActiveInfo->lock); } else { int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug( - "s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg " - "and quit from timer, ref:%d", + "s-task:%s vgId:%d checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg and quit " + "from timer, ref:%d", id, vgId, ref); streamClearChkptReadyMsg(pActiveInfo); streamMutexUnlock(&pActiveInfo->lock); + // release should be the last execution, since pTask may be destroy after it immidiately. streamMetaReleaseTask(pTask->pMeta, pTask); } @@ -1085,12 +1121,8 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref); streamMetaAcquireOneTask(pTask); - if (pTmrInfo->tmrHandle == NULL) { - pTmrInfo->tmrHandle = taosTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer); - } else { - streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, - "chkpt-ready-monitor"); - } + streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + "chkpt-ready-monitor"); // mark the timer monitor checkpointId pTmrInfo->launchChkptId = pActiveInfo->activeId; diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 703e6a3256..72d2e89936 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -279,7 +279,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) { } if (!waitForEnoughDuration(pMeta->pHbInfo)) { - streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, vgId, + streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, vgId, "meta-hb-tmr"); code = taosReleaseRef(streamMetaId, rid); @@ -301,7 +301,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) { } streamMetaRUnLock(pMeta); - streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, + streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, "meta-hb-tmr"); code = taosReleaseRef(streamMetaId, rid); @@ -317,7 +317,7 @@ int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes) { return terrno; } - pInfo->hbTmr = taosTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer); + streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer, &pInfo->hbTmr, 0, "stream-hb"); pInfo->tickCounter = 0; pInfo->msgSendTs = -1; pInfo->hbCount = 0; diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 2d54547aa2..63e24b0975 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -20,13 +20,14 @@ static void streamTaskResumeHelper(void* param, void* tmrId); static void streamTaskSchedHelper(void* param, void* tmrId); void streamSetupScheduleTrigger(SStreamTask* pTask) { - if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { + int64_t delaySchema = pTask->info.delaySchedParam; + if (delaySchema != 0 && pTask->info.fillHistory == 0) { int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref, pTask->info.delaySchedParam); - pTask->schedInfo.pDelayTimer = - taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.delaySchedParam, pTask, streamTimer); + streamTmrStart(streamTaskSchedHelper, (int32_t)delaySchema, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, + pTask->pMeta->vgId, "sched-tmr"); pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; } } @@ -76,13 +77,8 @@ void streamTaskResumeInFuture(SStreamTask* pTask) { // add one ref count for task streamMetaAcquireOneTask(pTask); - - if (pTask->schedInfo.pIdleTimer == NULL) { - pTask->schedInfo.pIdleTimer = taosTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer); - } else { - streamTmrReset(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, - &pTask->schedInfo.pIdleTimer, pTask->pMeta->vgId, "resume-task-tmr"); - } + streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer, + pTask->pMeta->vgId, "resume-task-tmr"); } ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -125,7 +121,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) { stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger); if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { - stDebug("s-task:%s jump out of schedTimer", id); + stDebug("s-task:%s should stop, jump out of schedTimer", id); return; } @@ -139,9 +135,8 @@ void streamTaskSchedHelper(void* param, void* tmrId) { if (code) { stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", nextTrigger); - streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr"); terrno = code; - return; + goto _end; } pTrigger->type = STREAM_INPUT__GET_RES; @@ -149,10 +144,9 @@ void streamTaskSchedHelper(void* param, void* tmrId) { if (pTrigger->pBlock == NULL) { taosFreeQitem(pTrigger); - stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", + stError("s-task:%s failed to build retrieve data trigger, code:out of memory, try again in %dms", id, nextTrigger); - streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr"); - return; + goto _end; } atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE); @@ -160,8 +154,8 @@ void streamTaskSchedHelper(void* param, void* tmrId) { code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger); if (code != TSDB_CODE_SUCCESS) { - streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr"); - return; + stError("s-task:%s failed to put retrieve block into trigger, code:%s", pTask->id.idStr, tstrerror(code)); + goto _end; } code = streamTrySchedExec(pTask); @@ -171,5 +165,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) { } } - streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr"); +_end: + streamTmrStart(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, + "sched-run-tmr"); } diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index 21f0168434..1290aef6a3 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -80,6 +80,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { } void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) { + int32_t vgId = pTask->pMeta->vgId; int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE; if (numOfTicks <= 0) { numOfTicks = 1; @@ -100,14 +101,8 @@ void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks * 0.1, ref); - - if (pTask->schedHistoryInfo.pTimer == NULL) { - pTask->schedHistoryInfo.pTimer = - taosTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer); - } else { - streamTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, - &pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); - } + streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, + &pTask->schedHistoryInfo.pTimer, vgId, "history-task"); } int32_t streamTaskStartScanHistory(SStreamTask* pTask) { @@ -337,7 +332,7 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d", pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes); - streamTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer, + streamTmrStart(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); } } @@ -391,7 +386,7 @@ void tryLaunchHistoryTask(void* param, void* tmrId) { pHTaskInfo->tickCount -= 1; if (pHTaskInfo->tickCount > 0) { - streamTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer, + streamTmrStart(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); streamMetaReleaseTask(pMeta, pTask); return; @@ -519,7 +514,7 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { } stDebug("s-task:%s set timer active flag, task timer not null", idStr); - streamTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer, + streamTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); } @@ -621,8 +616,8 @@ void doExecScanhistoryInFuture(void* param, void* tmrId) { // release the task. streamMetaReleaseTask(pTask->pMeta, pTask); } else { - streamTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, - &pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); + streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, + &pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); } } diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 06286479a3..3709c4dfbd 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -491,8 +491,8 @@ static void keepPrevInfo(SStreamTaskSM* pSM) { int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param) { SStreamTask* pTask = pSM->pTask; - const char* id = pTask->id.idStr; - int32_t code = 0; + const char* id = pTask->id.idStr; + int32_t code = 0; // do update the task status streamMutexLock(&pTask->lock); diff --git a/source/libs/stream/src/streamTimer.c b/source/libs/stream/src/streamTimer.c index 5e12f51c9d..8b77fe7cb1 100644 --- a/source/libs/stream/src/streamTimer.c +++ b/source/libs/stream/src/streamTimer.c @@ -40,11 +40,23 @@ int32_t streamTimerGetInstance(tmr_h* pTmr) { return TSDB_CODE_SUCCESS; } -void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId, +void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void* pHandle, tmr_h* pTmrId, int32_t vgId, const char* pMsg) { - bool ret = taosTmrReset(fp, mseconds, param, handle, pTmrId); - if (ret) { + if (*pTmrId == NULL) { + *pTmrId = taosTmrStart(fp, mseconds, pParam, pHandle); + if (*pTmrId == NULL) { + stError("vgId:%d start %s tmr failed, code:%s", vgId, pMsg, tstrerror(terrno)); + return; + } + } else { + bool ret = taosTmrReset(fp, mseconds, pParam, pHandle, pTmrId); + if (ret) { + stError("vgId:%d start %s tmr failed, code:%s", vgId, pMsg, tstrerror(terrno)); + return; + } } + + stDebug("vgId:%d start %s tmr succ", vgId, pMsg); } void streamTmrStop(tmr_h tmrId) {