diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 068f0a282e..5327428f5b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -771,6 +771,8 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask); // timer tmr_h streamTimerGetInstance(); +void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId, + const char* pMsg); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 84eb6b319f..bcfe2c4243 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -54,7 +54,7 @@ static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) { static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) { SStreamMeta* pMeta = pTask->pMeta; - taosThreadMutexLock(&pTask->lock); + (void) taosThreadMutexLock(&pTask->lock); int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList); for (int j = 0; j < num; ++j) { @@ -62,14 +62,18 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) { bool exist = existInHbMsg(pMsg, pTaskEpset); if (!exist) { - taosArrayPush(pMsg->pUpdateNodes, &pTaskEpset->nodeId); + void* p = taosArrayPush(pMsg->pUpdateNodes, &pTaskEpset->nodeId); + if (p == NULL) { + stError("failed to set the updateNode info in hbMsg, vgId:%d", pMeta->vgId); + } + stDebug("vgId:%d nodeId:%d added into hbMsg update list, total:%d", pMeta->vgId, pTaskEpset->nodeId, (int32_t)taosArrayGetSize(pMsg->pUpdateNodes)); } } taosArrayClear(pTask->outputInfo.pNodeEpsetUpdateList); - taosThreadMutexUnlock(&pTask->lock); + (void) taosThreadMutexUnlock(&pTask->lock); } static int32_t doSendHbMsgInfo(SStreamHbMsg* pMsg, SStreamMeta* pMeta, SEpSet* pEpset) { @@ -101,9 +105,7 @@ static int32_t doSendHbMsgInfo(SStreamHbMsg* pMsg, SStreamMeta* pMeta, SEpSet* p SRpcMsg msg = {0}; initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); - tmsgSendReq(pEpset, &msg); - - return TSDB_CODE_SUCCESS; + return tmsgSendReq(pEpset, &msg); } // NOTE: this task should be executed within the SStreamMeta lock region. @@ -112,6 +114,7 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { bool hasMnodeEpset = false; int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); SMetaHbInfo* pInfo = pMeta->pHbInfo; + int32_t code = 0; // not recv the hb msg rsp yet, send current hb msg again if (pInfo->msgSendTs > 0) { @@ -135,8 +138,7 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { } pInfo->msgSendTs = taosGetTimestampMs(); - doSendHbMsgInfo(&pInfo->hbMsg, pMeta, &epset); - return TSDB_CODE_SUCCESS; + return doSendHbMsgInfo(&pInfo->hbMsg, pMeta, &epset); } SStreamHbMsg* pMsg = &pInfo->hbMsg; @@ -168,9 +170,9 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { continue; } - taosThreadMutexLock(&(*pTask)->lock); + (void) taosThreadMutexLock(&(*pTask)->lock); STaskStatusEntry entry = streamTaskGetStatusEntry(*pTask); - taosThreadMutexUnlock(&(*pTask)->lock); + (void) taosThreadMutexUnlock(&(*pTask)->lock); entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE); if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) { @@ -188,9 +190,9 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { stInfo("s-task:%s set kill checkpoint trans in hbMsg, transId:%d, clear the active checkpointInfo", (*pTask)->id.idStr, p->transId); - taosThreadMutexLock(&(*pTask)->lock); + (void) taosThreadMutexLock(&(*pTask)->lock); streamTaskClearCheckInfo((*pTask), true); - taosThreadMutexUnlock(&(*pTask)->lock); + (void) taosThreadMutexUnlock(&(*pTask)->lock); } } @@ -210,7 +212,11 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { } addUpdateNodeIntoHbMsg(*pTask, pMsg); - taosArrayPush(pMsg->pTaskStatus, &entry); + p = taosArrayPush(pMsg->pTaskStatus, &entry); + if (p == NULL) { + stError("failed to add taskInfo:0x%x in hbMsg, vgId:%d", (*pTask)->id.taskId, pMeta->vgId); + } + if (!hasMnodeEpset) { epsetAssign(&epset, &(*pTask)->info.mnodeEpset); hasMnodeEpset = true; @@ -221,18 +227,19 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { if (hasMnodeEpset) { pInfo->msgSendTs = taosGetTimestampMs(); - doSendHbMsgInfo(pMsg, pMeta, &epset); + code = doSendHbMsgInfo(pMsg, pMeta, &epset); } else { stDebug("vgId:%d no tasks or no mnd epset, not send stream hb to mnode", pMeta->vgId); tCleanupStreamHbMsg(&pInfo->hbMsg); pInfo->msgSendTs = -1; } - return TSDB_CODE_SUCCESS; + return code; } void streamMetaHbToMnode(void* param, void* tmrId) { int64_t rid = *(int64_t*)param; + int32_t code = 0; SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); if (pMeta == NULL) { @@ -243,15 +250,25 @@ void streamMetaHbToMnode(void* param, void* tmrId) { // need to stop, stop now if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) { // todo refactor: not need this now, use closeFlag in Meta pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP; - stDebug("vgId:%d jump out of meta timer", pMeta->vgId); - taosReleaseRef(streamMetaId, rid); + code = taosReleaseRef(streamMetaId, rid); + if (code == TSDB_CODE_SUCCESS) { + stDebug("vgId:%d jump out of meta timer", pMeta->vgId); + } else { + stError("vgId:%d jump out of meta timer, failed to release the meta rid:%d", pMeta->vgId, rid); + } return; } // not leader not send msg if (pMeta->role != NODE_ROLE_LEADER) { - stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role); - taosReleaseRef(streamMetaId, rid); + code = taosReleaseRef(streamMetaId, rid); + if (code == TSDB_CODE_SUCCESS) { + stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role); + } else { + stError("vgId:%d role:%d not leader not send hb to mnodefailed to release the meta rid:%d", pMeta->vgId, + pMeta->role, rid); + } + pMeta->pHbInfo->hbStart = 0; return; } @@ -262,17 +279,30 @@ void streamMetaHbToMnode(void* param, void* tmrId) { } if (!waitForEnoughDuration(pMeta->pHbInfo)) { - taosTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); - taosReleaseRef(streamMetaId, rid); + streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, + "meta-hb-tmr"); + + code = taosReleaseRef(streamMetaId, rid); + if (code) { + stError("vgId:%d in meta timer, failed to release the meta rid:%d", pMeta->vgId, rid); + } return; } streamMetaRLock(pMeta); - streamMetaSendHbHelper(pMeta); - streamMetaRUnLock(pMeta); + code = streamMetaSendHbHelper(pMeta); + if (code) { + stError("vgId:%d failed to send hmMsg to mnode, try again in 5s, code:%s", pMeta->vgId, strerror(code)); + } - taosTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); - taosReleaseRef(streamMetaId, rid); + streamMetaRUnLock(pMeta); + streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, + "meta-hb-tmr"); + + code = taosReleaseRef(streamMetaId, rid); + if (code) { + stError("vgId:%d in meta timer, failed to release the meta rid:%d", pMeta->vgId, rid); + } } int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes) { @@ -297,7 +327,7 @@ void destroyMetaHbInfo(SMetaHbInfo* pInfo) { tCleanupStreamHbMsg(&pInfo->hbMsg); if (pInfo->hbTmr != NULL) { - taosTmrStop(pInfo->hbTmr); + (void) taosTmrStop(pInfo->hbTmr); pInfo->hbTmr = NULL; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a2b1d39e0c..0a627034b1 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1106,7 +1106,12 @@ int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, voi int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) { *pList = NULL; + int32_t code = 0; SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL); + if (pTaskList == NULL) { + stError("failed to generate the task list during send hbMsg to mnode, vgId:%d, code: out of memory", pMeta->vgId); + return TSDB_CODE_OUT_OF_MEMORY; + } bool sendMsg = pMeta->sendMsgBeforeClosing; if (!sendMsg) { @@ -1122,8 +1127,8 @@ int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); SStreamTask* pTask = NULL; - int32_t code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); - if (code != TSDB_CODE_SUCCESS) { + code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask); + if (code != TSDB_CODE_SUCCESS) { // this error is ignored continue; } @@ -1140,9 +1145,9 @@ int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) { streamMetaReleaseTask(pMeta, pTask); } - streamMetaSendHbHelper(pMeta); + code = streamMetaSendHbHelper(pMeta); pMeta->sendMsgBeforeClosing = false; - return TSDB_CODE_SUCCESS; + return code; } void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) { diff --git a/source/libs/stream/src/streamTimer.c b/source/libs/stream/src/streamTimer.c index 6e956e2682..c76ec92e33 100644 --- a/source/libs/stream/src/streamTimer.c +++ b/source/libs/stream/src/streamTimer.c @@ -38,3 +38,14 @@ void streamTimerCleanUp() { tmr_h streamTimerGetInstance() { return streamTimer; } + +void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId, + const char* pMsg) { + while (1) { + bool ret = taosTmrReset(fp, mseconds, param, handle, pTmrId); + if (ret) { + break; + } + stError("vgId:%d failed to reset %s, try again", vgId, pMsg); + } +}