fix(stream):add ref for task in check rsp monitor timer.

This commit is contained in:
Haojun Liao 2024-04-26 19:11:57 +08:00
parent 6d06188b9f
commit 2d1e9ba631
3 changed files with 31 additions and 8 deletions

View File

@ -848,6 +848,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
bool streamTaskIsSinkTask(const SStreamTask* pTask); bool streamTaskIsSinkTask(const SStreamTask* pTask);
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
int32_t streamTaskPrepareMonitorCheckRsp(SStreamTask* pTask);
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask); int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask);
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id); int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id);
void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo);

View File

@ -55,7 +55,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
// serialize streamProcessScanHistoryFinishRsp // serialize streamProcessScanHistoryFinishRsp
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
streamTaskStartMonitorCheckRsp(pTask); streamTaskPrepareMonitorCheckRsp(pTask);
STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher; STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
@ -69,8 +69,9 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamTaskStartMonitorCheckRsp(pTask); streamTaskStartMonitorCheckRsp(pTask);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamTaskPrepareMonitorCheckRsp(pTask);
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
@ -89,6 +90,9 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId); idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
} }
// the check rsp monitor timer must be invoked here
streamTaskStartMonitorCheckRsp(pTask);
} else { // for sink task, set it ready directly. } else { // for sink task, set it ready directly.
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId); stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId);
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr); streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr);
@ -161,18 +165,27 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
return 0; return 0;
} }
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { int32_t streamTaskPrepareMonitorCheckRsp(SStreamTask* pTask) {
/*SStreamTask* p = */streamMetaAcquireOneTask(pTask); // add task ref here
STaskCheckInfo* pInfo = &pTask->taskCheckInfo; STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
taosThreadMutexLock(&pInfo->checkInfoLock); taosThreadMutexLock(&pInfo->checkInfoLock);
int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr); int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);
streamMetaReleaseTask(pTask->pMeta, pTask);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs()); streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
taosThreadMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
taosThreadMutexLock(&pInfo->checkInfoLock);
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref); stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref);
@ -183,7 +196,7 @@ int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
} }
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);
return 0; return TSDB_CODE_SUCCESS;
} }
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) { int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
@ -329,7 +342,7 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
} }
stDebug("s-task:%s set the in-check-procedure flag", id); stDebug("s-task:%s set the in-check-procedure flag", id);
return 0; return TSDB_CODE_SUCCESS;
} }
int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id) { int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id) {
@ -519,6 +532,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
void rspMonitorFn(void* param, void* tmrId) { void rspMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param; SStreamTask* pTask = param;
SStreamMeta* pMeta = pTask->pMeta;
SStreamTaskState* pStat = streamTaskGetStatus(pTask); SStreamTaskState* pStat = streamTaskGetStatus(pTask);
STaskCheckInfo* pInfo = &pTask->taskCheckInfo; STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
int32_t vgId = pTask->pMeta->vgId; int32_t vgId = pTask->pMeta->vgId;
@ -546,6 +560,8 @@ void rspMonitorFn(void* param, void* tmrId) {
STaskId* pHId = &pTask->hTaskInfo.id; STaskId* pHId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
} }
streamMetaReleaseTask(pMeta, pTask);
return; return;
} }
@ -554,6 +570,7 @@ void rspMonitorFn(void* param, void* tmrId) {
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
streamTaskCompleteCheckRsp(pInfo, true, id); streamTaskCompleteCheckRsp(pInfo, true, id);
streamMetaReleaseTask(pMeta, pTask);
return; return;
} }
@ -565,6 +582,7 @@ void rspMonitorFn(void* param, void* tmrId) {
streamTaskCompleteCheckRsp(pInfo, false, id); streamTaskCompleteCheckRsp(pInfo, false, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);
streamMetaReleaseTask(pMeta, pTask);
return; return;
} }
@ -591,6 +609,7 @@ void rspMonitorFn(void* param, void* tmrId) {
streamTaskCompleteCheckRsp(pInfo, false, id); streamTaskCompleteCheckRsp(pInfo, false, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);
streamMetaReleaseTask(pMeta, pTask);
taosArrayDestroy(pNotReadyList); taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList); taosArrayDestroy(pTimeoutList);
@ -609,12 +628,14 @@ void rspMonitorFn(void* param, void* tmrId) {
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);
// add the not-ready tasks into the final task status result buf, along with related fill-history task if exists. // add the not-ready tasks into the final task status result buf, along with related fill-history task if exists.
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); streamMetaAddTaskLaunchResult(pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pHId = &pTask->hTaskInfo.id; STaskId* pHId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); streamMetaAddTaskLaunchResult(pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
} }
streamMetaReleaseTask(pMeta, pTask);
taosArrayDestroy(pNotReadyList); taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList); taosArrayDestroy(pTimeoutList);
return; return;

View File

@ -85,9 +85,10 @@ void doExecScanhistoryInFuture(void* param, void* tmrId) {
SStreamTaskState* p = streamTaskGetStatus(pTask); SStreamTaskState* p = streamTaskGetStatus(pTask);
if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) { if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) {
streamMetaReleaseTask(pTask->pMeta, pTask);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p->name, ref); stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p->name, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return; return;
} }