From 4802f59dfe4ee049351041be269962edb8d19c05 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 27 Oct 2024 15:49:40 +0800 Subject: [PATCH] fix(stream): use the refId in stream meta list, in order to avoid access already freed stream tasks. --- source/dnode/snode/src/snode.c | 8 +- source/dnode/vnode/src/sma/smaRollup.c | 15 +- source/dnode/vnode/src/tq/tq.c | 8 +- source/dnode/vnode/src/tq/tqRead.c | 17 +- source/dnode/vnode/src/tq/tqStreamTask.c | 4 +- source/dnode/vnode/src/tq/tqUtil.c | 14 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 93 +++-- source/libs/stream/src/streamCheckStatus.c | 106 +++--- source/libs/stream/src/streamCheckpoint.c | 101 +++--- source/libs/stream/src/streamDispatch.c | 164 ++++++--- source/libs/stream/src/streamHb.c | 86 +++-- source/libs/stream/src/streamMeta.c | 382 ++++++++++---------- source/libs/stream/src/streamSched.c | 67 ++-- source/libs/stream/src/streamStartHistory.c | 273 +++++++------- source/libs/stream/src/streamStartTask.c | 65 ++-- source/libs/stream/src/streamTimer.c | 10 +- 16 files changed, 769 insertions(+), 644 deletions(-) diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index d61f3d80d3..e8d4663bbb 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -45,17 +45,17 @@ int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProce char *p = streamTaskGetStatus(pTask).name; if (pTask->info.fillHistory) { - sndInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + sndInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms", - SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, + SNODE_HANDLE, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, (int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam); } else { - sndInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + sndInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms", - SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, + SNODE_HANDLE, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam); } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 4fdf299e50..80c04a3276 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -238,13 +238,18 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui } static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTaskId *pId) { - STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + SStreamTask *pTask = NULL; + streamMetaRLock(pMeta); - SStreamTask **ppTask = (SStreamTask **)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (ppTask && *ppTask) { - pItem->submitReqVer = (*ppTask)->chkInfo.checkpointVer; - pItem->fetchResultVer = (*ppTask)->info.delaySchedParam; + + int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask); + if (code == 0) { + pItem->submitReqVer = pTask->chkInfo.checkpointVer; + pItem->fetchResultVer = pTask->info.delaySchedParam; + streamMetaReleaseTask(pMeta, pTask); } + streamMetaRUnLock(pMeta); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b75baea08d..2929121029 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -774,19 +774,19 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus); if (pTask->info.fillHistory) { - tqInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + tqInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 " child id:%d, level:%d, cur-status:%s, next-status:%s fill-history:%d, related stream task:0x%x " "delaySched:%" PRId64 " ms, inputVer:%" PRId64, - vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, + vgId, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory, (int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam, nextProcessVer); } else { - tqInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 + tqInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64 " child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x " "delaySched:%" PRId64 " ms, inputVer:%" PRId64, - vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, + vgId, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory, (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index f2f85773b5..9d9e7c431a 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1115,13 +1115,18 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { break; } - SStreamTask* pTask = *(SStreamTask**)pIter; - if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) { - int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd); - if (code != 0) { - tqError("vgId:%d, s-task:%s update qualified table error for stream task", vgId, pTask->id.idStr); - continue; + int64_t refId = *(int64_t*)pIter; + SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, refId); + if (pTask != NULL) { + int32_t taskId = pTask->id.taskId; + + if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) { + int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd); + if (code != 0) { + tqError("vgId:%d, s-task:0x%x update qualified table error for stream task", vgId, taskId); + } } + taosReleaseRef(streamTaskRefPool, refId); } } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 3ec269ec22..24c892de8b 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -79,7 +79,7 @@ static void doStartScanWal(void* param, void* tmrId) { SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param; - SStreamMeta* pMeta = taosAcquireRef(streamMetaId, pParam->metaId); + SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId); if (pMeta == NULL) { tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId); taosMemoryFree(pParam); @@ -97,7 +97,7 @@ static void doStartScanWal(void* param, void* tmrId) { tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); } - code = taosReleaseRef(streamMetaId, pParam->metaId); + code = taosReleaseRef(streamMetaRefPool, pParam->metaId); if (code) { tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId, tstrerror(code)); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index b4866b8c65..5b51d6a94f 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -683,19 +683,21 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b continue; } - STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (ppTask == NULL) { + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + SStreamTask* pTask = NULL; + + code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask); + if (code != 0) { tqError("vgId:%d failed to acquire task:0x%x in retrieving progress", pMeta->vgId, pId->taskId); continue; } - if ((*ppTask)->info.taskLevel != TASK_LEVEL__SOURCE) { + if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { + streamMetaReleaseTask(pMeta, pTask); continue; } // here we get the required stream source task - SStreamTask* pTask = *ppTask; *fhFinished = !HAS_RELATED_FILLHISTORY_TASK(pTask); int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); @@ -711,6 +713,7 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b SWalReader* pReader = walOpenReader(pTask->exec.pWalReader->pWal, NULL, 0); if (pReader == NULL) { tqError("failed to open wal reader to extract exec progress, vgId:%d", pMeta->vgId); + streamMetaReleaseTask(pMeta, pTask); continue; } @@ -736,6 +739,7 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b } walCloseReader(pReader); + streamMetaReleaseTask(pMeta, pTask); } streamMetaRUnLock(pMeta); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index a00e92997c..e8d929e4aa 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -138,13 +138,15 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream // this is to process request from transaction, always return true. int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { - int32_t vgId = pMeta->vgId; - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; - int64_t st = taosGetTimestampMs(); - bool updated = false; - int32_t code = 0; + int32_t vgId = pMeta->vgId; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; + int64_t st = taosGetTimestampMs(); + bool updated = false; + int32_t code = 0; + SStreamTask* pTask = NULL; + SStreamTask* pHTask = NULL; SStreamTaskNodeUpdateMsg req = {0}; @@ -170,9 +172,9 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM streamMetaWLock(pMeta); // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status. - STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (ppTask == NULL || *ppTask == NULL) { + STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; + code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask); + if (code != 0) { tqError("vgId:%d failed to acquire task:0x%x when handling update task epset, it may have been dropped", vgId, req.taskId); rsp.code = TSDB_CODE_SUCCESS; @@ -181,12 +183,13 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM return rsp.code; } - SStreamTask* pTask = *ppTask; - const char* idstr = pTask->id.idStr; + const char* idstr = pTask->id.idStr; if (req.transId <= 0) { tqError("vgId:%d invalid update nodeEp task, transId:%d, discard", vgId, req.taskId); rsp.code = TSDB_CODE_SUCCESS; + + streamMetaReleaseTask(pMeta, pTask); streamMetaWUnLock(pMeta); taosArrayDestroy(req.pNodeList); @@ -227,24 +230,23 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); - SStreamTask** ppHTask = NULL; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); - if (ppHTask == NULL || *ppHTask == NULL) { + code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->hTaskInfo.id, &pHTask); + if (code != 0) { tqError( "vgId:%d failed to acquire fill-history task:0x%x when handling update, may have been dropped already, rel " "stream task:0x%x", vgId, (uint32_t)pTask->hTaskInfo.id.taskId, req.taskId); CLEAR_RELATED_FILLHISTORY_TASK(pTask); } else { - tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); - bool updateEpSet = streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); + tqDebug("s-task:%s fill-history task update nodeEp along with stream task", pHTask->id.idStr); + bool updateEpSet = streamTaskUpdateEpsetInfo(pHTask, req.pNodeList); if (updateEpSet) { updated = updateEpSet; } - streamTaskResetStatus(*ppHTask); - streamTaskStopMonitorCheckRsp(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr); + streamTaskResetStatus(pHTask); + streamTaskStopMonitorCheckRsp(&pHTask->taskCheckInfo, pHTask->id.idStr); } } @@ -256,8 +258,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM tqError("s-task:%s vgId:%d failed to save task, code:%s", idstr, vgId, tstrerror(code)); } - if (ppHTask != NULL) { - code = streamMetaSaveTask(pMeta, *ppHTask); + if (pHTask != NULL) { + code = streamMetaSaveTask(pMeta, pHTask); if (code) { tqError("s-task:%s vgId:%d failed to save related history task, code:%s", idstr, vgId, tstrerror(code)); } @@ -271,15 +273,17 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM tqError("s-task:%s vgId:%d failed to stop task, code:%s", idstr, vgId, tstrerror(code)); } - if (ppHTask != NULL) { - code = streamTaskStop(*ppHTask); + if (pHTask != NULL) { + code = streamTaskStop(pHTask); if (code) { tqError("s-task:%s vgId:%d failed to stop related history task, code:%s", idstr, vgId, tstrerror(code)); } } // keep info - streamMetaAddIntoUpdateTaskList(pMeta, pTask, (ppHTask != NULL) ? (*ppHTask) : NULL, req.transId, st); + streamMetaAddIntoUpdateTaskList(pMeta, pTask, (pHTask != NULL) ? (pHTask) : NULL, req.transId, st); + streamMetaReleaseTask(pMeta, pTask); + streamMetaReleaseTask(pMeta, pHTask); rsp.code = TSDB_CODE_SUCCESS; @@ -643,7 +647,6 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve if (code < 0) { tqError("failed to add s-task:0x%x into vgId:%d meta, existed:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code)); - tFreeStreamTask(pTask); return code; } @@ -673,7 +676,6 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve } } else { tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store, total:%d", vgId, taskId, numOfTasks); - tFreeStreamTask(pTask); } return code; @@ -681,25 +683,25 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; + int32_t code = 0; + int32_t vgId = pMeta->vgId; + STaskId hTaskId = {0}; + SStreamTask* pTask = NULL; - int32_t code = 0; - int32_t vgId = pMeta->vgId; - STaskId hTaskId = {0}; tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId); streamMetaWLock(pMeta); - STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId}; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if ((ppTask != NULL) && ((*ppTask) != NULL)) { - int32_t unusedRetRef = streamMetaAcquireOneTask(*ppTask); - SStreamTask* pTask = *ppTask; - + STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId}; + code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask); + if (code == 0) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { hTaskId.streamId = pTask->hTaskInfo.id.streamId; hTaskId.taskId = pTask->hTaskInfo.id.taskId; } + // clear the relationship, and then release the stream tasks, to avoid invalid accessing of already freed + // related stream(history) task streamTaskSetRemoveBackendFiles(pTask); code = streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt); streamMetaReleaseTask(pMeta, pTask); @@ -742,18 +744,19 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg) { SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg; + int32_t code = 0; + int32_t vgId = pMeta->vgId; + SStreamTask* pTask = NULL; - int32_t code = 0; - int32_t vgId = pMeta->vgId; tqDebug("vgId:%d receive msg to update-checkpoint-info for s-task:0x%x", vgId, pReq->taskId); streamMetaWLock(pMeta); - STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId}; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - - if (ppTask != NULL && (*ppTask) != NULL) { - code = streamTaskUpdateTaskCheckpointInfo(*ppTask, restored, pReq); + STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId}; + code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask); + if (code == 0) { + code = streamTaskUpdateTaskCheckpointInfo(pTask, restored, pReq); + streamMetaReleaseTask(pMeta, pTask); } else { // failed to get the task. int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); tqError( @@ -763,7 +766,6 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored } streamMetaWUnLock(pMeta); - // always return success when handling the requirement issued by mnode during transaction. return TSDB_CODE_SUCCESS; } @@ -789,11 +791,6 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d, ts:%" PRId64, vgId, pMeta->updateInfo.completeTransId, pMeta->updateInfo.completeTs); - while (streamMetaTaskInTimer(pMeta)) { - tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); - taosMsleep(100); - } - streamMetaWLock(pMeta); streamMetaClear(pMeta); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index c1c54b3c0b..60f8744448 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -299,13 +299,14 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { return; } - int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); // add task ref here streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs()); - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s start check-rsp monitor, ref:%d ", pTask->id.idStr, ref); - streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId, - "check-status-monitor"); + int64_t* pTaskRefId = NULL; + code = streamTaskAllocRefId(pTask, &pTaskRefId); + if (code == 0) { + streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTaskRefId, streamTimer, &pInfo->checkRspTmr, vgId, + "check-status-monitor"); + } streamMutexUnlock(&pInfo->checkInfoLock); } @@ -721,21 +722,45 @@ int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64 return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK); } +static void doCleanup(SStreamTask* pTask, SArray* pNotReadyList, SArray* pTimeoutList, void* param) { + streamMetaReleaseTask(pTask->pMeta, pTask); + + taosArrayDestroy(pNotReadyList); + taosArrayDestroy(pTimeoutList); + streamTaskFreeRefId(param); +} + // this function is executed in timer thread void rspMonitorFn(void* param, void* tmrId) { - SStreamTask* pTask = param; - SStreamMeta* pMeta = pTask->pMeta; - STaskCheckInfo* pInfo = &pTask->taskCheckInfo; - int32_t vgId = pTask->pMeta->vgId; - int64_t now = taosGetTimestampMs(); - int64_t timeoutDuration = now - pInfo->timeoutStartTs; - const char* id = pTask->id.idStr; int32_t numOfReady = 0; int32_t numOfFault = 0; int32_t numOfNotRsp = 0; int32_t numOfNotReady = 0; int32_t numOfTimeout = 0; - int32_t total = taosArrayGetSize(pInfo->pList); + int64_t taskRefId = *(int64_t*)param; + int64_t now = taosGetTimestampMs(); + SArray* pNotReadyList = NULL; + SArray* pTimeoutList = NULL; + SStreamMeta* pMeta = NULL; + STaskCheckInfo* pInfo = NULL; + int32_t vgId = -1; + int64_t timeoutDuration = 0; + const char* id = NULL; + int32_t total = 0; + + SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId); + if (pTask == NULL) { + stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId); + streamTaskFreeRefId(param); + return; + } + + pMeta = pTask->pMeta; + pInfo = &pTask->taskCheckInfo; + vgId = pTask->pMeta->vgId; + timeoutDuration = now - pInfo->timeoutStartTs; + id = pTask->id.idStr; + total = (int32_t) taosArrayGetSize(pInfo->pList); stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id); @@ -744,12 +769,10 @@ void rspMonitorFn(void* param, void* tmrId) { streamMutexUnlock(&pTask->lock); if (state.state == TASK_STATUS__STOP) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, state.name, vgId, ref); - + stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr", id, state.name, vgId); streamTaskCompleteCheckRsp(pInfo, true, id); - // not record the failed of the current task if try to close current vnode + // not record the failure of the current task if try to close current vnode // otherwise, the put of message operation may incur invalid read of message queue. if (!pMeta->closeFlag) { int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId); @@ -758,33 +781,30 @@ void rspMonitorFn(void* param, void* tmrId) { } } - streamMetaReleaseTask(pMeta, pTask); + doCleanup(pTask, pNotReadyList, pTimeoutList, param); return; } if (state.state == TASK_STATUS__DROPPING || state.state == TASK_STATUS__READY) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, state.name, vgId, ref); + stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr", id, state.name, vgId); streamTaskCompleteCheckRsp(pInfo, true, id); - streamMetaReleaseTask(pMeta, pTask); + doCleanup(pTask, pNotReadyList, pTimeoutList, param); return; } streamMutexLock(&pInfo->checkInfoLock); if (pInfo->notReadyTasks == 0) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, state.name, vgId, - ref); + stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr", id, state.name, vgId); streamTaskCompleteCheckRsp(pInfo, false, id); streamMutexUnlock(&pInfo->checkInfoLock); - streamMetaReleaseTask(pMeta, pTask); + doCleanup(pTask, pNotReadyList, pTimeoutList, param); return; } - SArray* pNotReadyList = taosArrayInit(4, sizeof(int64_t)); - SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t)); + pNotReadyList = taosArrayInit(4, sizeof(int64_t)); + pTimeoutList = taosArrayInit(4, sizeof(int64_t)); if (state.state == TASK_STATUS__UNINIT) { getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id); @@ -795,31 +815,25 @@ void rspMonitorFn(void* param, void* tmrId) { // fault tasks detected, not try anymore bool jumpOut = false; if ((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) != total) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stError( "s-task:%s vgId:%d internal error in handling the check downstream procedure, rsp number is inconsistent, " - "stop rspMonitor tmr, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", - id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + "stop rspMonitor tmr, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", + id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); jumpOut = true; } if (numOfFault > 0) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " - "detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", - id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + "detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", + id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); jumpOut = true; } if (jumpOut) { streamTaskCompleteCheckRsp(pInfo, false, id); streamMutexUnlock(&pInfo->checkInfoLock); - streamMetaReleaseTask(pMeta, pTask); - - taosArrayDestroy(pNotReadyList); - taosArrayDestroy(pTimeoutList); + doCleanup(pTask, pNotReadyList, pTimeoutList, param); return; } } else { // unexpected status @@ -828,11 +842,10 @@ void rspMonitorFn(void* param, void* tmrId) { // checking of downstream tasks has been stopped by other threads if (pInfo->stopCheckProcess == 1) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, total:%d, notRsp:%d, " - "notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", - id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + "notReady:%d, fault:%d, timeout:%d, ready:%d", + id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); streamTaskCompleteCheckRsp(pInfo, false, id); streamMutexUnlock(&pInfo->checkInfoLock); @@ -842,10 +855,7 @@ void rspMonitorFn(void* param, void* tmrId) { stError("s-task:%s failed to create async record start failed task, code:%s", id, tstrerror(code)); } - streamMetaReleaseTask(pMeta, pTask); - - taosArrayDestroy(pNotReadyList); - taosArrayDestroy(pTimeoutList); + doCleanup(pTask, pNotReadyList, pTimeoutList, param); return; } @@ -857,7 +867,7 @@ void rspMonitorFn(void* param, void* tmrId) { handleTimeoutDownstreamTasks(pTask, pTimeoutList); } - streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId, + streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, param, streamTimer, &pInfo->checkRspTmr, vgId, "check-status-monitor"); streamMutexUnlock(&pInfo->checkInfoLock); @@ -865,7 +875,5 @@ void rspMonitorFn(void* param, void* tmrId) { "s-task:%s vgId:%d continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, " "ready:%d", id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); - - taosArrayDestroy(pNotReadyList); - taosArrayDestroy(pTimeoutList); + doCleanup(pTask, pNotReadyList, pTimeoutList, NULL); } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 302090bb37..af2d8b559b 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -345,13 +345,15 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; int8_t old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1); if (old == 0) { - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref); + stDebug("s-task:%s start checkpoint-trigger monitor in 10s", pTask->id.idStr); - int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); - streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, - "trigger-recv-monitor"); - pTmrInfo->launchChkptId = pActiveInfo->activeId; + int64_t* pTaskRefId = NULL; + code = streamTaskAllocRefId(pTask, &pTaskRefId); + if (code == 0) { + streamTmrStart(checkpointTriggerMonitorFn, 200, pTaskRefId, streamTimer, &pTmrInfo->tmrHandle, vgId, + "trigger-recv-monitor"); + pTmrInfo->launchChkptId = pActiveInfo->activeId; + } } else { // already launched, do nothing stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr); } @@ -890,7 +892,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { return code; } -static int32_t doChkptStatusCheck(SStreamTask* pTask) { +static int32_t doChkptStatusCheck(SStreamTask* pTask, void* param) { const char* id = pTask->id.idStr; int32_t vgId = pTask->pMeta->vgId; SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; @@ -898,25 +900,24 @@ static int32_t doChkptStatusCheck(SStreamTask* pTask) { // checkpoint-trigger recv flag is set, quit if (pActiveInfo->allUpstreamTriggerRecv) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId, - ref); + streamCleanBeforeQuitTmr(pTmrInfo, param); + stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger", id, vgId); return -1; } if (pTmrInfo->launchChkptId != pActiveInfo->activeId) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + streamCleanBeforeQuitTmr(pTmrInfo, param); stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64 - ", quit, ref:%d", - id, vgId, pTmrInfo->launchChkptId, ref); + ", quit", + id, vgId, pTmrInfo->launchChkptId); return -1; } // active checkpoint info is cleared for now if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr, ref:%d", - id, vgId, ref); + streamCleanBeforeQuitTmr(pTmrInfo, param); + stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr", id, + vgId); return -1; } @@ -964,22 +965,22 @@ static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray** return 0; } -static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, SArray* pNotSendList) { +static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray* pNotSendList) { const char* id = pTask->id.idStr; SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; int32_t vgId = pTask->pMeta->vgId; - int32_t code = doChkptStatusCheck(pTask); + int32_t code = doChkptStatusCheck(pTask, param); if (code) { return code; } code = doFindNotSendUpstream(pTask, pList, &pNotSendList); if (code) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr, ref:%d", id, tstrerror(code), ref); + streamCleanBeforeQuitTmr(pTmrInfo, param); + stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr", id, tstrerror(code)); return code; } @@ -993,36 +994,48 @@ static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, SArray* pNotSen return code; } +static void doCleanup(SStreamTask* pTask, SArray* pList) { + streamMetaReleaseTask(pTask->pMeta, pTask); + taosArrayDestroy(pList); +} + void checkpointTriggerMonitorFn(void* param, void* tmrId) { - SStreamTask* pTask = param; - int32_t vgId = pTask->pMeta->vgId; - int64_t now = taosGetTimestampMs(); - const char* id = pTask->id.idStr; - SArray* pNotSendList = NULL; - SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg int32_t code = 0; int32_t numOfNotSend = 0; + SArray* pNotSendList = NULL; + int64_t taskRefId = *(int64_t*)param; + int64_t now = taosGetTimestampMs(); + SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId); + if (pTask == NULL) { + stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId); + streamTaskFreeRefId(param); + return; + } + + int32_t vgId = pTask->pMeta->vgId; + const char* id = pTask->id.idStr; + SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, ref:%d quit", id, ref); - streamMetaReleaseTask(pTask->pMeta, pTask); + streamCleanBeforeQuitTmr(pTmrInfo, param); + stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, quit", id); + doCleanup(pTask, pNotSendList); return; } // check the status every 100ms if (streamTaskShouldStop(pTask)) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); - streamMetaReleaseTask(pTask->pMeta, pTask); + streamCleanBeforeQuitTmr(pTmrInfo, param); + stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger", id, vgId); + doCleanup(pTask, pNotSendList); return; } if (++pTmrInfo->activeCounter < 50) { - streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + streamTmrStart(checkpointTriggerMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); return; } @@ -1035,20 +1048,19 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { streamMutexUnlock(&pTask->lock); if (state.state != TASK_STATUS__CK) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s vgId:%d status:%s not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, - vgId, state.name, ref); - streamMetaReleaseTask(pTask->pMeta, pTask); + streamCleanBeforeQuitTmr(pTmrInfo, param); + stDebug("s-task:%s vgId:%d status:%s not in checkpoint status, quit from monitor checkpoint-trigger", id, + vgId, state.name); + doCleanup(pTask, pNotSendList); return; } streamMutexLock(&pActiveInfo->lock); - code = chkptTriggerRecvMonitorHelper(pTask, pNotSendList); + code = chkptTriggerRecvMonitorHelper(pTask, param, pNotSendList); streamMutexUnlock(&pActiveInfo->lock); if (code != TSDB_CODE_SUCCESS) { - streamMetaReleaseTask(pTask->pMeta, pTask); - taosArrayDestroy(pNotSendList); + doCleanup(pTask, pNotSendList); return; } @@ -1056,15 +1068,14 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { numOfNotSend = taosArrayGetSize(pNotSendList); if (numOfNotSend > 0) { stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id); - streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + streamTmrStart(checkpointTriggerMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); } else { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref); - streamMetaReleaseTask(pTask->pMeta, pTask); + streamCleanBeforeQuitTmr(pTmrInfo, param); + stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr", id); } - taosArrayDestroy(pNotSendList); + doCleanup(pTask, pNotSendList); } int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 62d60ff664..ff41008759 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -518,45 +518,66 @@ static void doSendFailedDispatch(SStreamTask* pTask, SDispatchEntry* pEntry, int } } +static void cleanupInMonitor(int32_t taskId, int64_t taskRefId, void* param) { + int32_t ret = taosReleaseRef(streamTaskRefPool, taskRefId); + if (ret) { + stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, taskRefId); + } + streamTaskFreeRefId(param); +} + static void doMonitorDispatchData(void* param, void* tmrId) { - SStreamTask* pTask = param; - const char* id = pTask->id.idStr; - int32_t vgId = pTask->pMeta->vgId; - SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; - int32_t msgId = pMsgInfo->msgId; int32_t code = 0; int64_t now = taosGetTimestampMs(); bool inDispatch = true; + SStreamTask* pTask = NULL; + int64_t taskRefId = *(int64_t*)param; + const char* id = NULL; + int32_t vgId = -1; + SDispatchMsgInfo* pMsgInfo = NULL; + int32_t msgId = -1; - stDebug("s-task:%s start monitor dispatch data", id); + pTask = taosAcquireRef(streamTaskRefPool, taskRefId); + if (pTask == NULL) { + stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId); + streamTaskFreeRefId(param); + return; + } + + id = pTask->id.idStr; + vgId = pTask->pMeta->vgId; + pMsgInfo = &pTask->msgInfo; + msgId = pMsgInfo->msgId; + + stDebug("s-task:%s start to monitor dispatch data", id); if (streamTaskShouldStop(pTask)) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); + stDebug("s-task:%s should stop, abort from timer", pTask->id.idStr); setNotInDispatchMonitor(pMsgInfo); + cleanupInMonitor(pTask->id.taskId, taskRefId, param); return; } // slave task not handle the dispatch, downstream not ready will break the monitor timer // follower not handle the dispatch rsp if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stError("s-task:%s vgId:%d follower or downstream not ready, jump out of monitor tmr, ref:%d", id, vgId, ref); + stError("s-task:%s vgId:%d follower or downstream not ready, jump out of monitor tmr", id, vgId); setNotInDispatchMonitor(pMsgInfo); + cleanupInMonitor(pTask->id.taskId, taskRefId, param); return; } streamMutexLock(&pMsgInfo->lock); if (pTask->outputq.status == TASK_OUTPUT_STATUS__NORMAL) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s not in dispatch procedure, abort from timer, ref:%d", pTask->id.idStr, ref); - + stDebug("s-task:%s not in dispatch procedure, abort from timer", pTask->id.idStr); pMsgInfo->inMonitor = 0; inDispatch = false; } + streamMutexUnlock(&pMsgInfo->lock); if (!inDispatch) { + cleanupInMonitor(pTask->id.taskId, taskRefId, param); return; } @@ -564,6 +585,7 @@ static void doMonitorDispatchData(void* param, void* tmrId) { if (numOfFailed == 0) { stDebug("s-task:%s no error occurs, check again in %dms", id, DISPATCH_RETRY_INTERVAL_MS); streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); + cleanupInMonitor(pTask->id.taskId, taskRefId, param); return; } @@ -628,18 +650,23 @@ static void doMonitorDispatchData(void* param, void* tmrId) { } if (streamTaskShouldStop(pTask)) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); + stDebug("s-task:%s should stop, abort from timer", pTask->id.idStr); setNotInDispatchMonitor(pMsgInfo); } else { streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); } + + cleanupInMonitor(pTask->id.taskId, taskRefId, param); } void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration) { - int32_t vgId = pTask->pMeta->vgId; - streamTmrStart(doMonitorDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr, vgId, - "dispatch-monitor"); + int32_t vgId = pTask->pMeta->vgId; + int64_t* pTaskRefId = NULL; + int32_t code = streamTaskAllocRefId(pTask, &pTaskRefId); + if (code == 0) { + streamTmrStart(doMonitorDispatchData, waitDuration, pTaskRefId, streamTimer, &pTask->msgInfo.pRetryTmr, vgId, + "dispatch-monitor"); + } } static int32_t doAddDispatchBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, @@ -854,9 +881,9 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } else { streamMutexLock(&pTask->msgInfo.lock); if (pTask->msgInfo.inMonitor == 0) { - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s start dispatch monitor tmr in %dms, ref:%d, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS, - ref, tstrerror(code)); +// int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s start dispatch monitor tmr in %dms, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS, + tstrerror(code)); streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); pTask->msgInfo.inMonitor = 1; } else { @@ -911,31 +938,31 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32 return TSDB_CODE_SUCCESS; } -static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, int32_t num) { +static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, void* param, int32_t num) { SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; const char* id = pTask->id.idStr; int32_t vgId = pTask->pMeta->vgId; if (pTmrInfo->launchChkptId != pActiveInfo->activeId) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + streamCleanBeforeQuitTmr(pTmrInfo, param); stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64 - ", quit, ref:%d", - id, vgId, pTmrInfo->launchChkptId, ref); + ", quit", + id, vgId, pTmrInfo->launchChkptId); return -1; } // active checkpoint info is cleared for now if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr, ref:%d", id, vgId, ref); + streamCleanBeforeQuitTmr(pTmrInfo, param); + stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr", id, vgId); return -1; } if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr, ref:%d", id, - vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num, ref); + streamCleanBeforeQuitTmr(pTmrInfo, param); + stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr", id, + vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num); return -1; } @@ -1011,7 +1038,7 @@ static void doSendChkptReadyMsg(SStreamTask* pTask, SArray* pNotRspList, int64_t } } -static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, SArray* pNotRspList) { +static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray* pNotRspList) { SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; SArray* pList = pActiveInfo->pReadyMsgList; @@ -1021,16 +1048,15 @@ static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, SArray* pNotRspList) const char* id = pTask->id.idStr; int32_t notRsp = 0; - int32_t code = doTaskChkptStatusCheck(pTask, num); + int32_t code = doTaskChkptStatusCheck(pTask, param, num); if (code) { return code; } code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id); if (code) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr, ref:%d", id, - tstrerror(code), ref); + streamCleanBeforeQuitTmr(pTmrInfo, param); + stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr", id, tstrerror(code)); return code; } @@ -1045,26 +1071,41 @@ static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, SArray* pNotRspList) } static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) { - SStreamTask* pTask = param; - int32_t vgId = pTask->pMeta->vgId; - const char* id = pTask->id.idStr; - SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; - SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; SArray* pNotRspList = NULL; int32_t code = 0; int32_t notRsp = 0; + int64_t taskRefId = *(int64_t*)param; + int32_t vgId = -1; + const char* id = NULL; + SActiveCheckpointInfo* pActiveInfo = NULL; + SStreamTmrInfo* pTmrInfo = NULL; + + SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId); + if (pTask == NULL) { + stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId); + streamTaskFreeRefId(param); + return; + } + + vgId = pTask->pMeta->vgId; + id = pTask->id.idStr; + pActiveInfo = pTask->chkInfo.pActiveInfo; + pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; // check the status every 100ms if (streamTaskShouldStop(pTask)) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); + streamCleanBeforeQuitTmr(pTmrInfo, param); + stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger", id, vgId); streamMetaReleaseTask(pTask->pMeta, pTask); + taosArrayDestroy(pNotRspList); return; } if (++pTmrInfo->activeCounter < 50) { - streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + streamTmrStart(chkptReadyMsgSendMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId, "chkpt-ready-monitor"); + streamMetaReleaseTask(pTask->pMeta, pTask); + taosArrayDestroy(pNotRspList); return; } @@ -1078,15 +1119,16 @@ static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) { // 1. check status in the first place if (state.state != TASK_STATUS__CK) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready, ref:%d", id, vgId, - state.name, ref); + streamCleanBeforeQuitTmr(pTmrInfo, param); + stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready", id, vgId, + state.name); streamMetaReleaseTask(pTask->pMeta, pTask); + taosArrayDestroy(pNotRspList); return; } streamMutexLock(&pActiveInfo->lock); - code = chkptReadyMsgSendHelper(pTask, pNotRspList); + code = chkptReadyMsgSendHelper(pTask, param, pNotRspList); streamMutexUnlock(&pActiveInfo->lock); if (code != TSDB_CODE_SUCCESS) { @@ -1098,18 +1140,18 @@ static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) { notRsp = taosArrayGetSize(pNotRspList); if (notRsp > 0) { // send checkpoint-ready msg again stDebug("s-task:%s start to monitor checkpoint-ready msg recv status in 10s", id); - streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + streamTmrStart(chkptReadyMsgSendMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId, "chkpt-ready-monitor"); } else { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + streamCleanBeforeQuitTmr(pTmrInfo, param); stDebug( "s-task:%s vgId:%d checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg and quit " - "from timer, ref:%d", - id, vgId, ref); - // release should be the last execution, since pTask may be destroy after it immidiately. - streamMetaReleaseTask(pTask->pMeta, pTask); + "from timer", + id, vgId); } + // release should be the last execution, since pTask may be destroyed after it immediately. + streamMetaReleaseTask(pTask->pMeta, pTask); taosArrayDestroy(pNotRspList); } @@ -1160,15 +1202,17 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { int8_t old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1); if (old == 0) { - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref); - int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); + stDebug("s-task:%s start checkpoint-ready monitor in 10s", pTask->id.idStr); - streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, - "chkpt-ready-monitor"); + int64_t* pTaskRefId = NULL; + int32_t code = streamTaskAllocRefId(pTask, &pTaskRefId); + if (code == 0) { + streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTaskRefId, streamTimer, &pTmrInfo->tmrHandle, vgId, + "chkpt-ready-monitor"); - // mark the timer monitor checkpointId - pTmrInfo->launchChkptId = pActiveInfo->activeId; + // mark the timer monitor checkpointId + pTmrInfo->launchChkptId = pActiveInfo->activeId; + } } else { stError("s-task:%s previous checkpoint-ready monitor tmr is set, not start new one", pTask->id.idStr); } diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 19391bf7a0..96e853630e 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -21,7 +21,7 @@ #include "ttimer.h" #include "wal.h" -int32_t streamMetaId = 0; +int32_t streamMetaRefPool = 0; struct SMetaHbInfo { tmr_h hbTmr; @@ -123,17 +123,21 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { for(int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); - STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; - SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (pTask == NULL) { + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + SStreamTask* pTask = NULL; + + code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask); + if (code != 0) { continue; } - if ((*pTask)->info.fillHistory == 1) { + if (pTask->info.fillHistory == 1) { + streamMetaReleaseTask(pMeta, pTask); continue; } - epsetAssign(&epset, &(*pTask)->info.mnodeEpset); + epsetAssign(&epset, &pTask->info.mnodeEpset); + streamMetaReleaseTask(pMeta, pTask); break; } @@ -159,28 +163,30 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); - STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; - SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (pTask == NULL) { + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + SStreamTask* pTask = NULL; + code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask); + if (code != 0) { continue; } // not report the status of fill-history task - if ((*pTask)->info.fillHistory == 1) { + if (pTask->info.fillHistory == 1) { + streamMetaReleaseTask(pMeta, pTask); continue; } - streamMutexLock(&(*pTask)->lock); - STaskStatusEntry entry = streamTaskGetStatusEntry(*pTask); - streamMutexUnlock(&(*pTask)->lock); + streamMutexLock(&pTask->lock); + STaskStatusEntry entry = streamTaskGetStatusEntry(pTask); + streamMutexUnlock(&pTask->lock); entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE); - if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) { - entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate; - entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + entry.sinkQuota = pTask->outputInfo.pTokenBucket->quotaRate; + entry.sinkDataSize = SIZE_IN_MiB(pTask->execInfo.sink.dataSize); } - SActiveCheckpointInfo* p = (*pTask)->chkInfo.pActiveInfo; + SActiveCheckpointInfo* p = pTask->chkInfo.pActiveInfo; if (p->activeId != 0) { entry.checkpointInfo.failed = (p->failedId >= p->activeId) ? 1 : 0; entry.checkpointInfo.activeId = p->activeId; @@ -188,40 +194,42 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { if (entry.checkpointInfo.failed) { stInfo("s-task:%s set kill checkpoint trans in hbMsg, transId:%d, clear the active checkpointInfo", - (*pTask)->id.idStr, p->transId); + pTask->id.idStr, p->transId); - streamMutexLock(&(*pTask)->lock); - streamTaskClearCheckInfo((*pTask), true); - streamMutexUnlock(&(*pTask)->lock); + streamMutexLock(&pTask->lock); + streamTaskClearCheckInfo(pTask, true); + streamMutexUnlock(&pTask->lock); } } - streamMutexLock(&(*pTask)->lock); - entry.checkpointInfo.consensusChkptId = streamTaskCheckIfReqConsenChkptId(*pTask, pMsg->ts); + streamMutexLock(&pTask->lock); + entry.checkpointInfo.consensusChkptId = streamTaskCheckIfReqConsenChkptId(pTask, pMsg->ts); if (entry.checkpointInfo.consensusChkptId) { entry.checkpointInfo.consensusTs = pMsg->ts; } - streamMutexUnlock(&(*pTask)->lock); + streamMutexUnlock(&pTask->lock); - if ((*pTask)->exec.pWalReader != NULL) { - entry.processedVer = walReaderGetCurrentVer((*pTask)->exec.pWalReader) - 1; + if (pTask->exec.pWalReader != NULL) { + entry.processedVer = walReaderGetCurrentVer(pTask->exec.pWalReader) - 1; if (entry.processedVer < 0) { - entry.processedVer = (*pTask)->chkInfo.processedVer; + entry.processedVer = pTask->chkInfo.processedVer; } - walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer); + walReaderValidVersionRange(pTask->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer); } - addUpdateNodeIntoHbMsg(*pTask, pMsg); + addUpdateNodeIntoHbMsg(pTask, pMsg); p = taosArrayPush(pMsg->pTaskStatus, &entry); if (p == NULL) { - stError("failed to add taskInfo:0x%x in hbMsg, vgId:%d", (*pTask)->id.taskId, pMeta->vgId); + stError("failed to add taskInfo:0x%x in hbMsg, vgId:%d", pTask->id.taskId, pMeta->vgId); } if (!hasMnodeEpset) { - epsetAssign(&epset, &(*pTask)->info.mnodeEpset); + epsetAssign(&epset, &pTask->info.mnodeEpset); hasMnodeEpset = true; } + + streamMetaReleaseTask(pMeta, pTask); } pMsg->numOfTasks = taosArrayGetSize(pMsg->pTaskStatus); @@ -244,9 +252,10 @@ void streamMetaHbToMnode(void* param, void* tmrId) { int32_t vgId = 0; int32_t role = 0; - SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); + SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, rid); if (pMeta == NULL) { - stError("invalid rid:%" PRId64 " failed to acquired stream-meta", rid); + stError("invalid meta rid:%" PRId64 " failed to acquired stream-meta", rid); +// taosMemoryFree(param); return; } @@ -256,24 +265,26 @@ void streamMetaHbToMnode(void* param, void* tmrId) { // need to stop, stop now if (pMeta->closeFlag) { pMeta->pHbInfo->hbStart = 0; - code = taosReleaseRef(streamMetaId, rid); + code = taosReleaseRef(streamMetaRefPool, rid); if (code == TSDB_CODE_SUCCESS) { stDebug("vgId:%d jump out of meta timer", vgId); } else { stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid); } +// taosMemoryFree(param); return; } // not leader not send msg if (pMeta->role != NODE_ROLE_LEADER) { pMeta->pHbInfo->hbStart = 0; - code = taosReleaseRef(streamMetaId, rid); + code = taosReleaseRef(streamMetaRefPool, rid); if (code == TSDB_CODE_SUCCESS) { stInfo("vgId:%d role:%d not leader not send hb to mnode", vgId, role); } else { stError("vgId:%d role:%d not leader not send hb to mnodefailed to release the meta rid:%" PRId64, vgId, role, rid); } +// taosMemoryFree(param); return; } @@ -281,7 +292,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) { streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, vgId, "meta-hb-tmr"); - code = taosReleaseRef(streamMetaId, rid); + code = taosReleaseRef(streamMetaRefPool, rid); if (code) { stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, vgId, rid); } @@ -298,12 +309,13 @@ void streamMetaHbToMnode(void* param, void* tmrId) { if (code) { stError("vgId:%d failed to send hmMsg to mnode, try again in 5s, code:%s", pMeta->vgId, tstrerror(code)); } + streamMetaRUnLock(pMeta); streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, "meta-hb-tmr"); - code = taosReleaseRef(streamMetaId, rid); + code = taosReleaseRef(streamMetaRefPool, rid); if (code) { stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, vgId, rid); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7e9b60b61a..db46934e47 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#include "executor.h" #include "streamBackendRocksdb.h" #include "streamInt.h" #include "tmisce.h" @@ -28,6 +27,7 @@ static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; int32_t streamBackendId = 0; int32_t streamBackendCfWrapperId = 0; int32_t taskDbWrapperId = 0; +int32_t streamTaskRefPool = 0; static int32_t streamMetaBegin(SStreamMeta* pMeta); static void streamMetaCloseImpl(void* arg); @@ -41,14 +41,14 @@ SMetaRefMgt gMetaRefMgt; int32_t metaRefMgtInit(); void metaRefMgtCleanup(); -int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid); static void streamMetaEnvInit() { streamBackendId = taosOpenRef(64, streamBackendCleanup); streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup); taskDbWrapperId = taosOpenRef(64, taskDbDestroy2); - streamMetaId = taosOpenRef(64, streamMetaCloseImpl); + streamMetaRefPool = taosOpenRef(64, streamMetaCloseImpl); + streamTaskRefPool = taosOpenRef(64, tFreeStreamTask); int32_t code = metaRefMgtInit(); if (code) { @@ -72,7 +72,8 @@ void streamMetaInit() { void streamMetaCleanup() { taosCloseRef(streamBackendId); taosCloseRef(streamBackendCfWrapperId); - taosCloseRef(streamMetaId); + taosCloseRef(streamMetaRefPool); + taosCloseRef(streamTaskRefPool); metaRefMgtCleanup(); streamTimerCleanUp(); @@ -98,16 +99,21 @@ int32_t metaRefMgtInit() { void metaRefMgtCleanup() { void* pIter = taosHashIterate(gMetaRefMgt.pTable, NULL); while (pIter) { - SArray* list = *(SArray**)pIter; - for (int i = 0; i < taosArrayGetSize(list); i++) { - void* rid = taosArrayGetP(list, i); - taosMemoryFree(rid); - } - taosArrayDestroy(list); + int64_t* p = *(int64_t**) pIter; + stInfo("---------------free refId:%"PRId64", %p", *p, p); + + taosMemoryFree(p); + +// SArray* list = *(SArray**)pIter; +// for (int i = 0; i < taosArrayGetSize(list); i++) { +// void* rid = taosArrayGetP(list, i); +// taosMemoryFree(rid); +// } +// taosArrayDestroy(list); pIter = taosHashIterate(gMetaRefMgt.pTable, pIter); } - taosHashCleanup(gMetaRefMgt.pTable); + taosHashCleanup(gMetaRefMgt.pTable); streamMutexDestroy(&gMetaRefMgt.mutex); } @@ -117,35 +123,31 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) { streamMutexLock(&gMetaRefMgt.mutex); - p = taosHashGet(gMetaRefMgt.pTable, &vgId, sizeof(vgId)); + p = taosHashGet(gMetaRefMgt.pTable, &rid, sizeof(rid)); if (p == NULL) { - SArray* pList = taosArrayInit(8, POINTER_BYTES); - if (pList == NULL) { - return terrno; - } - - p = taosArrayPush(pList, &rid); - if (p == NULL) { - return terrno; - } - - code = taosHashPut(gMetaRefMgt.pTable, &vgId, sizeof(vgId), &pList, sizeof(void*)); + code = taosHashPut(gMetaRefMgt.pTable, &rid, sizeof(rid), &rid, sizeof(void*)); if (code) { stError("vgId:%d failed to put into metaRef table, rid:%" PRId64, (int32_t)vgId, *rid); return code; + } else { + stInfo("add refId:%"PRId64" vgId:%d, %p", *rid, (int32_t)vgId, rid); } } else { - SArray* list = *(SArray**)p; - void* px = taosArrayPush(list, &rid); - if (px == NULL) { - code = terrno; - } + // todo } streamMutexUnlock(&gMetaRefMgt.mutex); return code; } +void metaRefMgtRemove(int64_t* pRefId) { + streamMutexLock(&gMetaRefMgt.mutex); + + taosHashRemove(gMetaRefMgt.pTable, &pRefId, sizeof(pRefId)); + stInfo("remove refId from mgt, refId:%"PRId64", %p", *pRefId, pRefId); + streamMutexUnlock(&gMetaRefMgt.mutex); +} + int32_t streamMetaOpenTdb(SStreamMeta* pMeta) { if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0, 0, NULL) < 0) { stError("vgId:%d open file:%s failed, stream meta open failed", pMeta->vgId, pMeta->path); @@ -434,7 +436,7 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, pMeta->closeFlag = false; stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage); - pMeta->rid = taosAddRef(streamMetaId, pMeta); + pMeta->rid = taosAddRef(streamMetaRefPool, pMeta); // set the attribute when running on Linux OS TdThreadRwlockAttr attr; @@ -527,17 +529,20 @@ void streamMetaClear(SStreamMeta* pMeta) { // remove all existed tasks in this vnode void* pIter = NULL; while ((pIter = taosHashIterate(pMeta->pTasksMap, pIter)) != NULL) { - SStreamTask* p = *(SStreamTask**)pIter; + int64_t refId = *(int64_t*)pIter; + SStreamTask* p = taosAcquireRef(streamTaskRefPool, refId); + if (p == NULL) { + continue; + } // release the ref by timer if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) { // one more ref in timer stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt); streamTmrStop(p->schedInfo.pDelayTimer); p->info.delaySchedParam = 0; - streamMetaReleaseTask(pMeta, p); } - streamMetaReleaseTask(pMeta, p); + taosRemoveRef(streamTaskRefPool, refId); } if (pMeta->streamBackendRid != 0) { @@ -567,9 +572,9 @@ void streamMetaClose(SStreamMeta* pMeta) { if (pMeta == NULL) { return; } - int32_t code = taosRemoveRef(streamMetaId, pMeta->rid); + int32_t code = taosRemoveRef(streamMetaRefPool, pMeta->rid); if (code) { - stError("vgId:%d failed to remove ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code)); + stError("vgId:%d failed to remove meta ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code)); } } @@ -656,9 +661,16 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { code = tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn); if (code != TSDB_CODE_SUCCESS) { code = terrno; - stError("s-task:%s vgId:%d task meta save to disk failed, code:%s", pTask->id.idStr, vgId, tstrerror(terrno)); + stError("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk failed, remove ref, code:%s", pTask->id.idStr, + vgId, pTask->id.refId, tstrerror(code)); + + int64_t refId = pTask->id.refId; + int32_t ret = taosRemoveRef(streamTaskRefPool, pTask->id.refId); + if (ret != 0) { + stError("s-task:0x%x failed to remove ref, refId:%"PRId64, (int32_t) id[1], refId); + } } else { - stDebug("s-task:%s vgId:%d task meta save to disk", pTask->id.idStr, vgId); + stDebug("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk", pTask->id.idStr, vgId, pTask->id.refId); } taosMemoryFree(buf); @@ -683,34 +695,47 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa *pAdded = false; int32_t code = 0; + int64_t refId = 0; STaskId id = streamTaskGetTaskId(pTask); void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + if (p != NULL) { stDebug("s-task:%" PRIx64 " already exist in meta, no need to register", id.taskId); + tFreeStreamTask(pTask); return code; } if ((code = pMeta->buildTaskFn(pMeta->ahandle, pTask, ver)) != 0) { + tFreeStreamTask(pTask); return code; } p = taosArrayPush(pMeta->pTaskList, &pTask->id); if (p == NULL) { stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId); + tFreeStreamTask(pTask); return terrno; } - code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES); + pTask->id.refId = refId = taosAddRef(streamTaskRefPool, pTask); + code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t)); if (code) { stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId); + + int32_t ret = taosRemoveRef(streamTaskRefPool, refId); + if (ret != 0) { + stError("s-task:0x%x failed to remove ref, refId:%"PRId64, (int32_t) id.taskId, refId); + } return code; } if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) { + taosRemoveRef(streamTaskRefPool, refId); return code; } if ((code = streamMetaCommit(pMeta)) != 0) { + taosRemoveRef(streamTaskRefPool, refId); return code; } @@ -733,16 +758,72 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) { } int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) { - STaskId id = {.streamId = streamId, .taskId = taskId}; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (ppTask == NULL || streamTaskShouldStop(*ppTask)) { - *pTask = NULL; + QRY_PARAM_CHECK(pTask); + STaskId id = {.streamId = streamId, .taskId = taskId}; + int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + if (pTaskRefId == NULL) { return TSDB_CODE_STREAM_TASK_NOT_EXIST; } - int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1); - stTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref); - *pTask = *ppTask; + SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId); + if (p == NULL) { + stDebug("s-task:%x failed to acquire task refId:%"PRId64", may have been destoried", taskId, *pTaskRefId); + return TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + if (p->id.refId != *pTaskRefId) { + stFatal("s-task:%x inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, taskId, *pTaskRefId, + p->id.refId); + int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId); + if (ret) { + stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId); + } + + return TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + if (streamTaskShouldStop(p)) { + stDebug("s-task:%s is stopped, failed to acquire it now", p->id.idStr); + int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId); + if (ret) { + stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId); + } + return TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + stDebug("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId); + *pTask = p; + return TSDB_CODE_SUCCESS; +} + +int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask) { + QRY_PARAM_CHECK(pTask); + int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); + + if (pTaskRefId == NULL) { + return TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId); + if (p == NULL) { + stDebug("s-task:%" PRIx64 " failed to acquire task refId:%" PRId64 ", may have been destoried", pId->taskId, + *pTaskRefId); + return TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + if (p->id.refId != *pTaskRefId) { + stFatal("s-task:%" PRIx64 " inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, pId->taskId, + *pTaskRefId, p->id.refId); + int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId); + if (ret) { + stError("s-task:0x%" PRIx64 " failed to release task refId:%" PRId64, pId->taskId, *pTaskRefId); + } + + return TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + stDebug("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId); + *pTask = p; return TSDB_CODE_SUCCESS; } @@ -753,28 +834,17 @@ int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t task return code; } -int32_t streamMetaAcquireOneTask(SStreamTask* pTask) { - int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); - stTrace("s-task:%s acquire task, ref:%d", pTask->id.idStr, ref); - return ref; -} - void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) { if (pTask == NULL) { return; } int32_t taskId = pTask->id.taskId; - int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1); - - // not safe to use the pTask->id.idStr, since pTask may be released by other threads when print logs. - if (ref > 0) { - stTrace("s-task:0x%x release task, ref:%d", taskId, ref); - } else if (ref == 0) { - stTrace("s-task:0x%x all refs are gone, free it", taskId); - tFreeStreamTask(pTask); - } else if (ref < 0) { - stError("task ref is invalid, ref:%d, 0x%x", ref, taskId); + int64_t refId = pTask->id.refId; + stDebug("s-task:0x%x release task, refId:%" PRId64, taskId, pTask->id.refId); + int32_t ret = taosReleaseRef(streamTaskRefPool, pTask->id.refId); + if (ret) { + stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, refId); } } @@ -812,13 +882,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t int32_t code = 0; STaskId id = {.streamId = streamId, .taskId = taskId}; - // pre-delete operation streamMetaWLock(pMeta); - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (ppTask) { - pTask = *ppTask; - + code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask); + if (code == 0) { // desc the paused task counter if (streamTaskShouldPause(pTask)) { int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); @@ -830,43 +897,9 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t if (code) { stError("s-task:0x%" PRIx64 " failed to handle dropping event async, code:%s", id.taskId, tstrerror(code)); } - } else { - stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", vgId, taskId); - streamMetaWUnLock(pMeta); - return 0; - } - streamMetaWUnLock(pMeta); + stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, vgId); - stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, vgId); - - while (1) { - int32_t timerActive = 0; - - streamMetaRLock(pMeta); - ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (ppTask) { - // to make sure check status will not start the check downstream status when we start to check timerActive count. - streamMutexLock(&pTask->taskCheckInfo.checkInfoLock); - timerActive = (*ppTask)->status.timerActive; - streamMutexUnlock(&pTask->taskCheckInfo.checkInfoLock); - } - streamMetaRUnLock(pMeta); - - if (timerActive > 0) { - taosMsleep(100); - stDebug("s-task:0x%" PRIx64 " wait for quit from timer", id.taskId); - } else { - break; - } - } - - // let's do delete of stream task - streamMetaWLock(pMeta); - - ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (ppTask) { - pTask = *ppTask; // it is a fill-history task, remove the related stream task's id that points to it if (pTask->info.fillHistory == 0) { int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); @@ -884,21 +917,22 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t if (sizeInList != size) { stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", vgId, sizeInList, size); } - streamMetaWUnLock(pMeta); - - int32_t numOfTmr = pTask->status.timerActive; - if (numOfTmr != 0) { - stError("s-task:%s vgId:%d invalid timer Active record:%d, internal error", pTask->id.idStr, vgId, numOfTmr); - } if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt); streamTmrStop(pTask->schedInfo.pDelayTimer); pTask->info.delaySchedParam = 0; - streamMetaReleaseTask(pMeta, pTask); + } + + + int64_t refId = pTask->id.refId; + int32_t ret = taosRemoveRef(streamTaskRefPool, refId); + if (ret != 0) { + stError("s-task:0x%x failed to remove ref, refId:%"PRId64, (int32_t) id.taskId, refId); } streamMetaReleaseTask(pMeta, pTask); + streamMetaWUnLock(pMeta); } else { stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId); streamMetaWUnLock(pMeta); @@ -1008,13 +1042,13 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { return; } + vgId = pMeta->vgId; pRecycleList = taosArrayInit(4, sizeof(STaskId)); if (pRecycleList == NULL) { stError("vgId:%d failed prepare load all tasks, code:out of memory", vgId); return; } - vgId = pMeta->vgId; stInfo("vgId:%d load stream tasks from meta files", vgId); code = tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL); @@ -1058,9 +1092,9 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { int32_t taskId = pTask->id.taskId; - tFreeStreamTask(pTask); - STaskId id = streamTaskGetTaskId(pTask); + + tFreeStreamTask(pTask); void* px = taosArrayPush(pRecycleList, &id); if (px == NULL) { stError("s-task:0x%x failed record the task into recycle list due to out of memory", taskId); @@ -1096,13 +1130,22 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { continue; } - if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) != 0) { - stError("s-task:0x%x failed to put into hashTable, code:%s, continue", pTask->id.taskId, tstrerror(terrno)); - void* px = taosArrayPop(pMeta->pTaskList); - tFreeStreamTask(pTask); + pTask->id.refId = taosAddRef(streamTaskRefPool, pTask); + + if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t)) != 0) { + int64_t refId = pTask->id.refId; + stError("s-task:0x%x failed to put into hashTable, code:%s, remove task ref, refId:%" PRId64 " continue", + pTask->id.taskId, tstrerror(terrno), refId); + + void* px = taosArrayPop(pMeta->pTaskList); + int32_t ret = taosRemoveRef(streamTaskRefPool, refId); + if (ret != 0) { + stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId); + } continue; } + stInfo("s-task:0x%x vgId:%d set refId:%"PRId64, (int32_t) id.taskId, vgId, pTask->id.refId); if (pTask->info.fillHistory == 0) { int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); } @@ -1138,72 +1181,22 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { } } -bool streamMetaTaskInTimer(SStreamMeta* pMeta) { - bool inTimer = false; - streamMetaRLock(pMeta); - - void* pIter = NULL; - while (1) { - pIter = taosHashIterate(pMeta->pTasksMap, pIter); - if (pIter == NULL) { - break; - } - - SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->status.timerActive >= 1) { - stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart, set closing again", pTask->id.idStr, pMeta->vgId); - int32_t code = streamTaskStop(pTask); - if (code) { - stError("s-task:%s failed to stop task, code:%s", pTask->id.idStr, tstrerror(code)); - } - inTimer = true; - } - } - - streamMetaRUnLock(pMeta); - return inTimer; -} - void streamMetaNotifyClose(SStreamMeta* pMeta) { int32_t vgId = pMeta->vgId; int64_t startTs = 0; int32_t sendCount = 0; - streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount); + streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount); stInfo("vgId:%d notify all stream tasks that current vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d", vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount); // wait for the stream meta hb function stopping streamMetaWaitForHbTmrQuit(pMeta); - - streamMetaWLock(pMeta); - pMeta->closeFlag = true; - void* pIter = NULL; - while (1) { - pIter = taosHashIterate(pMeta->pTasksMap, pIter); - if (pIter == NULL) { - break; - } - - SStreamTask* pTask = *(SStreamTask**)pIter; - stDebug("vgId:%d s-task:%s set task closing flag", vgId, pTask->id.idStr); - int32_t code = streamTaskStop(pTask); - if (code) { - stError("vgId:%d failed to stop task:0x%x, code:%s", vgId, pTask->id.taskId, tstrerror(code)); - } - } - - streamMetaWUnLock(pMeta); stDebug("vgId:%d start to check all tasks for closing", vgId); int64_t st = taosGetTimestampMs(); - while (streamMetaTaskInTimer(pMeta)) { - stDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); - taosMsleep(100); - } - streamMetaRLock(pMeta); SArray* pTaskList = NULL; @@ -1211,14 +1204,34 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { if (code != TSDB_CODE_SUCCESS) { } - streamMetaRUnLock(pMeta); + int32_t numOfTasks = taosArrayGetSize(pTaskList); + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = NULL; - if (pTaskList != NULL) { - taosArrayDestroy(pTaskList); + code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); + if (code != TSDB_CODE_SUCCESS) { + continue; + } + + int64_t refId = pTask->id.refId; + int32_t ret = streamTaskStop(pTask); + if (ret) { + stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret)); + } + + streamMetaReleaseTask(pMeta, pTask); + ret = taosRemoveRef(streamTaskRefPool, refId); + if (ret) { + stError("vgId:%d failed to remove task:0x%x, refId:%"PRId64, pMeta->vgId, pTaskId->taskId, refId); + } } - int64_t el = taosGetTimestampMs() - st; - stDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el); + taosArrayDestroy(pTaskList); + + double el = (taosGetTimestampMs() - st) / 1000.0; + stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, numOfTasks, el); + streamMetaRUnLock(pMeta); } void streamMetaStartHb(SStreamMeta* pMeta) { @@ -1228,12 +1241,12 @@ void streamMetaStartHb(SStreamMeta* pMeta) { return; } + *pRid = pMeta->rid; int32_t code = metaRefMgtAdd(pMeta->vgId, pRid); if (code) { return; } - *pRid = pMeta->rid; streamMetaHbToMnode(pRid, NULL); } @@ -1308,13 +1321,15 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { for (int32_t i = 0; i < num; ++i) { SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (ppTask == NULL) { - continue; - } + SStreamTask* pTask = NULL; + int32_t code = streamMetaAcquireTaskUnsafe((SStreamMeta*)pMeta, &id, &pTask); - if ((*ppTask)->status.downstreamReady == 0) { - return false; + if (code == 0) { + if (pTask->status.downstreamReady == 0) { + streamMetaReleaseTask((SStreamMeta*)pMeta, pTask); + return false; + } + streamMetaReleaseTask((SStreamMeta*)pMeta, pTask); } } @@ -1331,10 +1346,13 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) { for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); - - STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; - SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - streamTaskResetStatus(*pTask); + STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; + SStreamTask* pTask = NULL; + int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask); + if (code == 0) { + streamTaskResetStatus(pTask); + streamMetaReleaseTask(pMeta, pTask); + } } return 0; @@ -1343,7 +1361,7 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) { void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId, int64_t startTs) { const char* id = pTask->id.idStr; - int32_t vgId = pTask->pMeta->vgId; + int32_t vgId = pMeta->vgId; int32_t code = 0; // keep the already updated info diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index cdaa603e38..7c77797ef9 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -22,13 +22,13 @@ static void streamTaskSchedHelper(void* param, void* tmrId); void streamSetupScheduleTrigger(SStreamTask* pTask) { int64_t delaySchema = pTask->info.delaySchedParam; if (delaySchema != 0 && pTask->info.fillHistory == 0) { - int32_t ref = streamMetaAcquireOneTask(pTask); - stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref, - pTask->info.delaySchedParam); - - streamTmrStart(streamTaskSchedHelper, (int32_t)delaySchema, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, - pTask->pMeta->vgId, "sched-tmr"); - pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; + int64_t* pTaskRefId = NULL; + int32_t code = streamTaskAllocRefId(pTask, &pTaskRefId); + if (code == 0) { + streamTmrStart(streamTaskSchedHelper, (int32_t)delaySchema, pTaskRefId, streamTimer, + &pTask->schedInfo.pDelayTimer, pTask->pMeta->vgId, "sched-tmr"); + pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; + } } } @@ -75,49 +75,65 @@ void streamTaskClearSchedIdleInfo(SStreamTask* pTask) { pTask->status.schedIdleT void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; } void streamTaskResumeInFuture(SStreamTask* pTask) { - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s task should idle, add into timer to retry in %dms, ref:%d", pTask->id.idStr, - pTask->status.schedIdleTime, ref); + stDebug("s-task:%s task should idle, add into timer to retry in %dms", pTask->id.idStr, + pTask->status.schedIdleTime); // add one ref count for task - int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); - streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer, - pTask->pMeta->vgId, "resume-task-tmr"); + int64_t* pTaskRefId = NULL; + int32_t code = streamTaskAllocRefId(pTask, &pTaskRefId); + if (code == 0) { + streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTaskRefId, streamTimer, + &pTask->schedInfo.pIdleTimer, pTask->pMeta->vgId, "resume-task-tmr"); + } } ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// void streamTaskResumeHelper(void* param, void* tmrId) { - SStreamTask* pTask = (SStreamTask*)param; + int32_t code = 0; + int64_t taskRefId = *(int64_t*)param; + SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId); + if (pTask == NULL) { + stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId); + streamTaskFreeRefId(param); + return; + } + SStreamTaskId* pId = &pTask->id; SStreamTaskState p = streamTaskGetStatus(pTask); - int32_t code = 0; if (p.state == TASK_STATUS__DROPPING || p.state == TASK_STATUS__STOP) { int8_t status = streamTaskSetSchedStatusInactive(pTask); TAOS_UNUSED(status); - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s not resume task, ref:%d", pId->idStr, p.name, ref); - + stDebug("s-task:%s status:%s not resume task", pId->idStr, p.name); streamMetaReleaseTask(pTask->pMeta, pTask); + streamTaskFreeRefId(param); return; } code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK); - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); if (code) { - stError("s-task:%s sched task failed, code:%s, ref:%d", pId->idStr, tstrerror(code), ref); + stError("s-task:%s sched task failed, code:%s", pId->idStr, tstrerror(code)); } else { - stDebug("trigger to resume s-task:%s after idled for %dms, ref:%d", pId->idStr, pTask->status.schedIdleTime, ref); + stDebug("trigger to resume s-task:%s after idled for %dms", pId->idStr, pTask->status.schedIdleTime); // release the task ref count streamTaskClearSchedIdleInfo(pTask); - streamMetaReleaseTask(pTask->pMeta, pTask); } + + streamMetaReleaseTask(pTask->pMeta, pTask); + streamTaskFreeRefId(param); } void streamTaskSchedHelper(void* param, void* tmrId) { - SStreamTask* pTask = (void*)param; + int64_t taskRefId = *(int64_t*)param; + SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId); + if (pTask == NULL) { + stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId); + streamTaskFreeRefId(param); + return; + } + const char* id = pTask->id.idStr; int32_t nextTrigger = (int32_t)pTask->info.delaySchedParam; int32_t vgId = pTask->pMeta->vgId; @@ -127,6 +143,8 @@ void streamTaskSchedHelper(void* param, void* tmrId) { if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { stDebug("s-task:%s should stop, jump out of schedTimer", id); + streamMetaReleaseTask(pTask->pMeta, pTask); + streamTaskFreeRefId(param); return; } @@ -171,6 +189,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) { } _end: - streamTmrStart(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, + streamTmrStart(streamTaskSchedHelper, nextTrigger, param, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr"); + streamMetaReleaseTask(pTask->pMeta, pTask); } diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index 4d7bf2ba87..54026f5db2 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -15,6 +15,7 @@ #include "streamInt.h" #include "streamsm.h" +#include "tref.h" #include "trpc.h" #include "ttimer.h" #include "wal.h" @@ -24,7 +25,7 @@ #define SCANHISTORY_IDLE_TICK ((SCANHISTORY_MAX_IDLE_TIME * 1000) / SCANHISTORY_IDLE_TIME_SLICE) typedef struct SLaunchHTaskInfo { - SStreamMeta* pMeta; + int64_t metaRid; STaskId id; STaskId hTaskId; } SLaunchHTaskInfo; @@ -89,7 +90,7 @@ void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) { // add ref for task SStreamTask* p = NULL; - int32_t code = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, &p); + int32_t code = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, &p); if (p == NULL || code != 0) { stError("s-task:0x%x failed to acquire task, status:%s, not exec scan-history data", pTask->id.taskId, streamTaskGetStatus(pTask).name); @@ -98,10 +99,13 @@ void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) { pTask->schedHistoryInfo.numOfTicks = numOfTicks; - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks * 0.1, ref); - streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, - &pTask->schedHistoryInfo.pTimer, vgId, "history-task"); + stDebug("s-task:%s scan-history resumed in %.2fs", pTask->id.idStr, numOfTicks * 0.1); + int64_t* pTaskRefId = NULL; + int32_t ret = streamTaskAllocRefId(pTask, &pTaskRefId); + if (ret == 0) { + streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTaskRefId, streamTimer, + &pTask->schedHistoryInfo.pTimer, vgId, "history-task"); + } } int32_t streamTaskStartScanHistory(SStreamTask* pTask) { @@ -220,42 +224,32 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { // Set the execution conditions, including the query time window and the version range streamMetaRLock(pMeta); - SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); + SStreamTask* pHisTask = NULL; + code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->hTaskInfo.id, &pHisTask); streamMetaRUnLock(pMeta); - if (pHTask != NULL) { // it is already added into stream meta store. - SStreamTask* pHisTask = NULL; - code = streamMetaAcquireTask(pMeta, hStreamId, hTaskId, &pHisTask); - if (pHisTask == NULL) { - stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr); - code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); + if (code == 0) { // it is already added into stream meta store. + if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing + stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr); + code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true); if (code) { stError("s-task:%s failed to record start task status, code:%s", idStr, tstrerror(code)); } - } else { - if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing - stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr); - code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true); - if (code) { - stError("s-task:%s failed to record start task status, code:%s", idStr, tstrerror(code)); - } - } else { // exist, but not ready, continue check downstream task status - if (pHisTask->pBackend == NULL) { - code = pMeta->expandTaskFn(pHisTask); - if (code != TSDB_CODE_SUCCESS) { - streamMetaAddFailedTaskSelf(pHisTask, now); - stError("s-task:%s failed to expand fill-history task, code:%s", pHisTask->id.idStr, tstrerror(code)); - } - } - - if (code == TSDB_CODE_SUCCESS) { - checkFillhistoryTaskStatus(pTask, pHisTask); + } else { // exist, but not ready, continue check downstream task status + if (pHisTask->pBackend == NULL) { + code = pMeta->expandTaskFn(pHisTask); + if (code != TSDB_CODE_SUCCESS) { + streamMetaAddFailedTaskSelf(pHisTask, now); + stError("s-task:%s failed to expand fill-history task, code:%s", pHisTask->id.idStr, tstrerror(code)); } } - streamMetaReleaseTask(pMeta, pHisTask); + if (code == TSDB_CODE_SUCCESS) { + checkFillhistoryTaskStatus(pTask, pHisTask); + } } + streamMetaReleaseTask(pMeta, pHisTask); return code; } else { return launchNotBuiltFillHistoryTask(pTask); @@ -296,14 +290,14 @@ void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, SStreamMeta* pMeta = pTask->pMeta; SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); +// int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false); if (code) { stError("s-task:%s failed to record the start task status, code:%s", pTask->id.idStr, tstrerror(code)); } else { - stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d", - pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId, ref); + stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x", + pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId); } pHTaskInfo->id.taskId = 0; @@ -315,9 +309,9 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; if (streamTaskShouldStop(pTask)) { // record the failure - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:0x%" PRIx64 " stopped, not launch rel history task:0x%" PRIx64 ", ref:%d", pInfo->id.taskId, - pInfo->hTaskId.taskId, ref); +// int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:0x%" PRIx64 " stopped, not launch rel history task:0x%" PRIx64, pInfo->id.taskId, + pInfo->hTaskId.taskId); int32_t code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false); if (code) { @@ -336,30 +330,60 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i } } +static void doCleanup(SStreamTask* pTask, int64_t metaRid, SLaunchHTaskInfo* pInfo) { + SStreamMeta* pMeta = pTask->pMeta; + int32_t vgId = pMeta->vgId; + + streamMetaReleaseTask(pMeta, pTask); + int32_t ret = taosReleaseRef(streamMetaRefPool, metaRid); + if (ret) { + stError("vgId:%d failed to release meta refId:%"PRId64, vgId, metaRid); + } + + if (pInfo != NULL) { + taosMemoryFree(pInfo); + } +} + void tryLaunchHistoryTask(void* param, void* tmrId) { SLaunchHTaskInfo* pInfo = param; - SStreamMeta* pMeta = pInfo->pMeta; + int64_t metaRid = pInfo->metaRid; int64_t now = taosGetTimestampMs(); int32_t code = 0; + SStreamTask* pTask = NULL; + int32_t vgId = 0; + + SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, metaRid); + if (pMeta == NULL) { + stError("invalid meta rid:%" PRId64 " failed to acquired stream-meta", metaRid); + taosMemoryFree(pInfo); + return; + } + + vgId = pMeta->vgId; streamMetaWLock(pMeta); - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id)); - if (ppTask == NULL || *ppTask == NULL) { + code = streamMetaAcquireTaskUnsafe(pMeta, &pInfo->id, &pTask); + if (code != 0) { stError("s-task:0x%x and rel fill-history task:0x%" PRIx64 " all have been destroyed, not launch", (int32_t)pInfo->id.taskId, pInfo->hTaskId.taskId); streamMetaWUnLock(pMeta); + int32_t ret = taosReleaseRef(streamMetaRefPool, metaRid); + if (ret) { + stError("vgId:%d failed to release meta refId:%"PRId64, vgId, metaRid); + } + // already dropped, no need to set the failure info into the stream task meta. taosMemoryFree(pInfo); return; } - if (streamTaskShouldStop(*ppTask)) { - char* p = streamTaskGetStatus(*ppTask).name; - int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1); - stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d", - (*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref); + if (streamTaskShouldStop(pTask)) { + char* p = streamTaskGetStatus(pTask).name; + stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d", pTask->id.idStr, p, + pTask->hTaskInfo.retryTimes); streamMetaWUnLock(pMeta); @@ -369,77 +393,54 @@ void tryLaunchHistoryTask(void* param, void* tmrId) { stError("s-task:0x%" PRId64 " failed to record the start task status, code:%s", pInfo->hTaskId.taskId, tstrerror(code)); } - taosMemoryFree(pInfo); + + doCleanup(pTask, metaRid, pInfo); return; } - SStreamTask* pTask = NULL; - code = streamMetaAcquireTaskNoLock(pMeta, pInfo->id.streamId, pInfo->id.taskId, &pTask); - if (code != TSDB_CODE_SUCCESS) { - // todo - } streamMetaWUnLock(pMeta); - if (pTask != NULL) { - SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; - - pHTaskInfo->tickCount -= 1; - if (pHTaskInfo->tickCount > 0) { - streamTmrStart(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer, - pTask->pMeta->vgId, " start-history-task-tmr"); - streamMetaReleaseTask(pMeta, pTask); - return; - } - - if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) { - notRetryLaunchFillHistoryTask(pTask, pInfo, now); - } else { // not reach the limitation yet, let's continue retrying launch related fill-history task. - streamTaskSetRetryInfoForLaunch(pHTaskInfo); - if (pTask->status.timerActive < 1) { - stError("s-task:%s invalid timerActive recorder:%d, abort timer", pTask->id.idStr, pTask->status.timerActive); - return; - } - - // abort the timer if intend to stop task - SStreamTask* pHTask = NULL; - code = streamMetaAcquireTask(pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId, &pHTask); - if (pHTask == NULL) { - doRetryLaunchFillHistoryTask(pTask, pInfo, now); - streamMetaReleaseTask(pMeta, pTask); - return; - } else { - if (pHTask->pBackend == NULL) { - code = pMeta->expandTaskFn(pHTask); - if (code != TSDB_CODE_SUCCESS) { - streamMetaAddFailedTaskSelf(pHTask, now); - stError("failed to expand fill-history task:%s, code:%s", pHTask->id.idStr, tstrerror(code)); - } - } - - if (code == TSDB_CODE_SUCCESS) { - checkFillhistoryTaskStatus(pTask, pHTask); - // not in timer anymore - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId, - pHTaskInfo->retryTimes, ref); - } - streamMetaReleaseTask(pMeta, pHTask); - } - } - - streamMetaReleaseTask(pMeta, pTask); - } else { - code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false); - if (code) { - stError("s-task:%s failed to record the start task status, code:%s", pTask->id.idStr, tstrerror(code)); - } - - int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1); - stError("s-task:0x%x rel fill-history task:0x%" PRIx64 " may have been destroyed, not launch, ref:%d", - (int32_t)pInfo->id.taskId, pInfo->hTaskId.taskId, ref); + SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; + pHTaskInfo->tickCount -= 1; + if (pHTaskInfo->tickCount > 0) { + streamTmrStart(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer, + pTask->pMeta->vgId, " start-history-task-tmr"); + doCleanup(pTask, metaRid, NULL); + return; } - taosMemoryFree(pInfo); + if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) { + notRetryLaunchFillHistoryTask(pTask, pInfo, now); + } else { // not reach the limitation yet, let's continue retrying launch related fill-history task. + streamTaskSetRetryInfoForLaunch(pHTaskInfo); + + // abort the timer if intend to stop task + SStreamTask* pHTask = NULL; + code = streamMetaAcquireTask(pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId, &pHTask); + if (pHTask == NULL) { + doRetryLaunchFillHistoryTask(pTask, pInfo, now); + doCleanup(pTask, metaRid, NULL); + return; + } else { + if (pHTask->pBackend == NULL) { + code = pMeta->expandTaskFn(pHTask); + if (code != TSDB_CODE_SUCCESS) { + streamMetaAddFailedTaskSelf(pHTask, now); + stError("failed to expand fill-history task:%s, code:%s", pHTask->id.idStr, tstrerror(code)); + } + } + + if (code == TSDB_CODE_SUCCESS) { + checkFillhistoryTaskStatus(pTask, pHTask); + // not in timer anymore + stDebug("s-task:0x%x fill-history task launch completed, retry times:%d", (int32_t)pInfo->id.taskId, + pHTaskInfo->retryTimes); + } + streamMetaReleaseTask(pMeta, pHTask); + } + } + + doCleanup(pTask, metaRid, pInfo); } int32_t createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStreamId, int32_t hTaskId, @@ -455,7 +456,7 @@ int32_t createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStr (*pInfo)->hTaskId.streamId = hStreamId; (*pInfo)->hTaskId.taskId = hTaskId; - (*pInfo)->pMeta = pMeta; + (*pInfo)->metaRid = pMeta->rid; return TSDB_CODE_SUCCESS; } @@ -485,12 +486,10 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { // check for the timer if (pTask->hTaskInfo.pTimer == NULL) { - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer); if (pTask->hTaskInfo.pTimer == NULL) { - ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", idStr, ref); + stError("s-task:%s failed to start timer, related fill-history task not launched", idStr); taosMemoryFree(pInfo); code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); @@ -500,18 +499,8 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { return terrno; } - if (ref < 1) { - stError("s-task:%s invalid timerActive recorder:%d, abort timer", pTask->id.idStr, pTask->status.timerActive); - return TSDB_CODE_STREAM_INTERNAL_ERROR; - } - - stDebug("s-task:%s set timer active flag, ref:%d", idStr, ref); + stDebug("s-task:%s set timer active flag", idStr); } else { // timer exists - if (pTask->status.timerActive < 1) { - stError("s-task:%s invalid timerActive recorder:%d, abort timer", pTask->id.idStr, pTask->status.timerActive); - return TSDB_CODE_STREAM_INTERNAL_ERROR; - } - stDebug("s-task:%s set timer active flag, task timer not null", idStr); streamTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); @@ -590,15 +579,22 @@ int32_t streamTaskSetRangeStreamCalc(SStreamTask* pTask) { } void doExecScanhistoryInFuture(void* param, void* tmrId) { - SStreamTask* pTask = param; + int64_t taskRefId = *(int64_t*) param; + + SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId); + if (pTask == NULL) { + stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId); + streamTaskFreeRefId(param); + return; + } + pTask->schedHistoryInfo.numOfTicks -= 1; SStreamTaskState p = streamTaskGetStatus(pTask); if (p.state == TASK_STATUS__DROPPING || p.state == TASK_STATUS__STOP) { - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p.name, ref); - + stDebug("s-task:%s status:%s not start scan-history again", pTask->id.idStr, p.name); streamMetaReleaseTask(pTask->pMeta, pTask); + streamTaskFreeRefId(param); return; } @@ -608,16 +604,19 @@ void doExecScanhistoryInFuture(void* param, void* tmrId) { stError("s-task:%s async start history task failed", pTask->id.idStr); } - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr, - pTask->info.fillHistory, ref); - - // release the task. - streamMetaReleaseTask(pTask->pMeta, pTask); + stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr", pTask->id.idStr, + pTask->info.fillHistory); } else { - streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, - &pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); + int64_t* pTaskRefId = NULL; + int32_t code = streamTaskAllocRefId(pTask, &pTaskRefId); + if (code == 0) { + streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTaskRefId, streamTimer, + &pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); + } } + + streamMetaReleaseTask(pTask->pMeta, pTask); + streamTaskFreeRefId(param); } int32_t doStartScanHistoryTask(SStreamTask* pTask) { diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c index 0858f57414..ee9117ddc8 100644 --- a/source/libs/stream/src/streamStartTask.c +++ b/source/libs/stream/src/streamStartTask.c @@ -196,19 +196,17 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 STaskId id = {.streamId = streamId, .taskId = taskId}; int32_t vgId = pMeta->vgId; bool allRsp = true; + SStreamTask* p = NULL; streamMetaWLock(pMeta); - SStreamTask** p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (p == NULL) { // task does not exists in current vnode, not record the complete info + int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &p); + if (code != 0) { // task does not exist in current vnode, not record the complete info stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId); streamMetaWUnLock(pMeta); return 0; } - // clear the send consensus-checkpointId flag -// streamMutexLock(&(*p)->lock); -// (*p)->status.sendConsensusChkptId = false; -// streamMutexUnlock(&(*p)->lock); + streamMetaReleaseTask(pMeta, p); if (pStartInfo->startAllTasks != 1) { int64_t el = endTs - startTs; @@ -222,7 +220,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; - int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); + code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); if (code) { if (code == TSDB_CODE_DUP_KEY) { stError("vgId:%d record start task result failed, s-task:0x%" PRIx64 @@ -296,13 +294,14 @@ void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) { while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) { STaskInitTs* pInfo = pIter; void* key = taosHashGetKey(pIter, &keyLen); - - SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId)); - if (pTask1 == NULL) { - stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed"); + SStreamTask* pTask = NULL; + int32_t code = streamMetaAcquireTaskUnsafe(pMeta, key, &pTask); + if (code == 0) { + stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", pTask->id.idStr, + pTask->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed"); + streamMetaReleaseTask(pMeta, pTask); } else { - stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr, - (*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed"); + stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed"); } } } @@ -417,8 +416,10 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { streamMetaRLock(pMeta); + SArray* pTaskList = NULL; int32_t num = taosArrayGetSize(pMeta->pTaskList); stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num); + if (num == 0) { stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num); streamMetaRUnLock(pMeta); @@ -428,14 +429,12 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { int64_t st = taosGetTimestampMs(); // send hb msg to mnode before closing all tasks. - SArray* pTaskList = NULL; int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList); if (code != TSDB_CODE_SUCCESS) { return code; } int32_t numOfTasks = taosArrayGetSize(pTaskList); - for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); SStreamTask* pTask = NULL; @@ -445,11 +444,17 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { continue; } + int64_t refId = pTask->id.refId; int32_t ret = streamTaskStop(pTask); if (ret) { stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret)); } + streamMetaReleaseTask(pMeta, pTask); + ret = taosRemoveRef(streamTaskRefPool, refId); + if (ret) { + stError("vgId:%d failed to remove task:0x%x, refId:%"PRId64, pMeta->vgId, pTaskId->taskId, refId); + } } taosArrayDestroy(pTaskList); @@ -466,6 +471,7 @@ int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts) { int32_t vgId = pTask->pMeta->vgId; if (pConChkptInfo->status == TASK_CONSEN_CHKPT_REQ) { + // mark the sending of req consensus checkpoint request. pConChkptInfo->status = TASK_CONSEN_CHKPT_SEND; pConChkptInfo->statusTs = ts; stDebug("s-task:%s vgId:%d set requiring consensus-chkptId in hbMsg, ts:%" PRId64, pTask->id.idStr, @@ -473,6 +479,8 @@ int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts) { return 1; } else { int32_t el = (ts - pConChkptInfo->statusTs) / 1000; + + // not recv consensus-checkpoint rsp for 60sec, send it again in hb to mnode if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && el > 60) { pConChkptInfo->statusTs = ts; @@ -492,7 +500,7 @@ void streamTaskSetConsenChkptIdRecv(SStreamTask* pTask, int32_t transId, int64_t pInfo->status = TASK_CONSEN_CHKPT_RECV; pInfo->statusTs = ts; - stDebug("s-task:%s set recv consen-checkpointId, transId:%d", pTask->id.idStr, transId); + stInfo("s-task:%s set recv consen-checkpointId, transId:%d", pTask->id.idStr, transId); } void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) { @@ -507,23 +515,24 @@ void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) { } int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { - int32_t code = TSDB_CODE_SUCCESS; - int64_t now = taosGetTimestampMs(); - int64_t startTs = 0; - bool hasFillhistoryTask = false; - STaskId hId = {0}; + int32_t code = TSDB_CODE_SUCCESS; + int64_t now = taosGetTimestampMs(); + int64_t startTs = 0; + bool hasFillhistoryTask = false; + STaskId hId = {0}; + STaskId id = {.streamId = streamId, .taskId = taskId}; + SStreamTask* pTask = NULL; stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId); streamMetaRLock(pMeta); - STaskId id = {.streamId = streamId, .taskId = taskId}; - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - - if (ppTask != NULL) { - startTs = (*ppTask)->taskCheckInfo.startTs; - hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(*ppTask); - hId = (*ppTask)->hTaskInfo.id; + code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask); + if (code == 0) { + startTs = pTask->taskCheckInfo.startTs; + hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(pTask); + hId = pTask->hTaskInfo.id; + streamMetaReleaseTask(pMeta, pTask); streamMetaRUnLock(pMeta); diff --git a/source/libs/stream/src/streamTimer.c b/source/libs/stream/src/streamTimer.c index 0da9acfd1d..848e9c874e 100644 --- a/source/libs/stream/src/streamTimer.c +++ b/source/libs/stream/src/streamTimer.c @@ -66,15 +66,9 @@ void streamTmrStop(tmr_h tmrId) { } } -int32_t streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, SStreamTask* pTask) { +void streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, void* param) { pInfo->activeCounter = 0; pInfo->launchChkptId = 0; atomic_store_8(&pInfo->isActive, 0); - - int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - if (ref < 0) { - stFatal("invalid task timer ref value:%d, %s", ref, pTask->id.idStr); - } - - return ref; + streamTaskFreeRefId(param); } \ No newline at end of file