diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0fa32acfbb..1b6202f09b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -789,6 +789,7 @@ void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta); bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId); void streamMetaRLock(SStreamMeta* pMeta); +int32_t streamMetaTryRlock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); void streamMetaWUnLock(SStreamMeta* pMeta); diff --git a/source/dnode/mnode/impl/src/mndConfig.c b/source/dnode/mnode/impl/src/mndConfig.c index 099fff7aee..1e69ae2b5a 100644 --- a/source/dnode/mnode/impl/src/mndConfig.c +++ b/source/dnode/mnode/impl/src/mndConfig.c @@ -299,6 +299,8 @@ _OVER: } sdbRelease(pMnode->pSdb, vObj); cfgArrayCleanUp(array); + + tFreeSConfigReq(&configReq); return code; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 4c2d0d7531..239a8e65b5 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -1418,6 +1418,7 @@ int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_ code = setDstTableDataUid(pVnode, pTask, pDataBlock, stbFullName, &tbData); if (code != TSDB_CODE_SUCCESS) { tqError("vgId:%d s-task:%s dst-table not exist, stb:%s discard stream results", vgId, id, stbFullName); + tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); return code; } @@ -1429,12 +1430,14 @@ int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_ tbData.pCreateTbReq = NULL; } + tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); return code; } void* p = taosArrayPush(submitReq.aSubmitTbData, &tbData); if (p == NULL) { tqDebug("vgId:%d, s-task:%s failed to build submit msg, code:%s, data lost", vgId, id, tstrerror(terrno)); + tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); return terrno; } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 7c65f805e9..d9aec27759 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -148,9 +148,13 @@ static void doStartScanWal(void* param, void* tmrId) { return; } - streamMetaRLock(pMeta); - numOfTasks = taosArrayGetSize(pMeta->pTaskList); - streamMetaRUnLock(pMeta); + code = streamMetaTryRlock(pMeta); + if (code == 0) { + numOfTasks = taosArrayGetSize(pMeta->pTaskList); + streamMetaRUnLock(pMeta); + } else { + numOfTasks = 0; + } if (numOfTasks == 0) { goto _end; @@ -169,7 +173,6 @@ static void doStartScanWal(void* param, void* tmrId) { } _end: - streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal"); tqDebug("vgId:%d scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 387010d0a6..eb8f2c741a 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -1585,10 +1585,13 @@ int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) { pTask->pBackend = NULL; } + streamMetaWLock(pTask->pMeta); if (pTask->exec.pExecutor != NULL) { qDestroyTask(pTask->exec.pExecutor); pTask->exec.pExecutor = NULL; } + streamMetaWUnLock(pTask->pMeta); + return 0; } diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 90ebd47ac6..ca5b6630fd 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -363,13 +363,24 @@ void streamMetaHbToMnode(void* param, void* tmrId) { pMeta->pHbInfo->hbStart = taosGetTimestampMs(); } - streamMetaRLock(pMeta); - code = streamMetaSendHbHelper(pMeta); - if (code) { - stError("vgId:%d failed to send hmMsg to mnode, try again in 5s, code:%s", pMeta->vgId, tstrerror(code)); + // NOTE: stream task in restart procedure. not generate the hb now, try to acquire the lock may cause stuck this timer. + int32_t count = 30; + bool send = false; + while ((--count) >= 0) { + int32_t ret = streamMetaTryRlock(pMeta); + if (ret != 0) { + taosMsleep(10); + } else { + send = true; + code = streamMetaSendHbHelper(pMeta); + streamMetaRUnLock(pMeta); + break; + } } - streamMetaRUnLock(pMeta); + if (!send) { + stError("vgId:%d failed to send hmMsg to mnode, retry again in 5s, code:%s", pMeta->vgId, tstrerror(code)); + } streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, "meta-hb-tmr"); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index d8a1c68c07..7c7ef83d6d 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -144,21 +144,11 @@ void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem // let's try the ordinary input q pQueue->qItem = NULL; - int32_t code = taosGetQitem(pQueue->qall, &pQueue->qItem); - if (code) { - stError("s-task:%s failed to get item in inputq, code:%s", id, tstrerror(code)); - } + int32_t num = taosGetQitem(pQueue->qall, &pQueue->qItem); if (pQueue->qItem == NULL) { - code = taosReadAllQitems(pQueue->pQueue, pQueue->qall); - if (code) { - stError("s-task:%s failed to get all items in inputq, code:%s", id, tstrerror(code)); - } - - code = taosGetQitem(pQueue->qall, &pQueue->qItem); - if (code) { - stError("s-task:%s failed to get item in inputq, code:%s", id, tstrerror(code)); - } + num = taosReadAllQitems(pQueue->pQueue, pQueue->qall); + num = taosGetQitem(pQueue->qall, &pQueue->qItem); } *pItem = streamQueueCurItem(pQueue); diff --git a/source/libs/stream/src/streamUtil.c b/source/libs/stream/src/streamUtil.c index 4c481e6041..11e291c876 100644 --- a/source/libs/stream/src/streamUtil.c +++ b/source/libs/stream/src/streamUtil.c @@ -54,6 +54,15 @@ void streamMetaRUnLock(SStreamMeta* pMeta) { } } +int32_t streamMetaTryRlock(SStreamMeta* pMeta) { + int32_t code = taosThreadRwlockTryRdlock(&pMeta->lock); + if (code) { + stError("vgId:%d try meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code)); + } + + return code; +} + void streamMetaWLock(SStreamMeta* pMeta) { // stTrace("vgId:%d meta-wlock", pMeta->vgId); int32_t code = taosThreadRwlockWrlock(&pMeta->lock);