diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 47f3047364..169dcaf628 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -732,6 +732,8 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa int8_t isSucceed); int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int8_t isSucceed); +void streamMetaNotifyClose(SStreamMeta* pMeta); + #ifdef __cplusplus } #endif diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 46ccec9f9e..b2325e41f0 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -116,6 +116,7 @@ FAIL: } void sndClose(SSnode *pSnode) { + streamMetaNotifyClose(pSnode->pMeta); streamMetaCommit(pSnode->pMeta); streamMetaClose(pSnode->pMeta); taosMemoryFree(pSnode->path); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5733d06770..e892274839 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -171,70 +171,11 @@ void tqClose(STQ* pTq) { taosMemoryFree(pTq); } -static bool hasStreamTaskInTimer(SStreamMeta* pMeta) { - bool inTimer = false; - - taosWLockLatch(&pMeta->lock); - - void* pIter = NULL; - while (1) { - pIter = taosHashIterate(pMeta->pTasks, pIter); - if (pIter == NULL) { - break; - } - - SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->status.timerActive >= 1) { - inTimer = true; - } - } - - taosWUnLockLatch(&pMeta->lock); - return inTimer; -} - void tqNotifyClose(STQ* pTq) { if (pTq == NULL) { return; } - - SStreamMeta* pMeta = pTq->pStreamMeta; - int32_t vgId = pMeta->vgId; - - tqDebug("vgId:%d notify all stream tasks that the vnode is closing", vgId); - taosWLockLatch(&pMeta->lock); - - void* pIter = NULL; - while (1) { - pIter = taosHashIterate(pMeta->pTasks, pIter); - if (pIter == NULL) { - break; - } - - SStreamTask* pTask = *(SStreamTask**)pIter; - tqDebug("vgId:%d s-task:%s set closing flag", vgId, pTask->id.idStr); - streamTaskStop(pTask); - } - - taosWUnLockLatch(&pMeta->lock); - - // wait for the stream meta hb function stopping - pMeta->hbInfo.stopFlag = STREAM_META_WILL_STOP; - while(pMeta->hbInfo.stopFlag != STREAM_META_OK_TO_STOP) { - taosMsleep(100); - tqDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); - } - - tqDebug("vgId:%d start to check all tasks", vgId); - int64_t st = taosGetTimestampMs(); - - while (hasStreamTaskInTimer(pMeta)) { - tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); - taosMsleep(100); - } - - int64_t el = taosGetTimestampMs() - st; - tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el); + streamMetaNotifyClose(pTq->pStreamMeta); } int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { @@ -258,26 +199,6 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { return 0; } -// int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) { -// SMqDataRsp dataRsp = {0}; -// dataRsp.head.consumerId = pHandle->consumerId; -// dataRsp.head.epoch = pHandle->epoch; -// dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; -// -// int64_t sver = 0, ever = 0; -// walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); -// tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, -// ever); -// -// char buf1[TSDB_OFFSET_LEN] = {0}; -// char buf2[TSDB_OFFSET_LEN] = {0}; -// tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset); -// tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset); -// tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId, -// dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); -// return 0; -// } - int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type, int32_t vgId) { int64_t sver = 0, ever = 0; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2da669ec46..83b9570db1 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -775,4 +775,65 @@ void metaHbToMnode(void* param, void* tmrId) { tmsgSendReq(&epset, &msg); taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr); taosReleaseRef(streamMetaId, rid); +} + +static bool hasStreamTaskInTimer(SStreamMeta* pMeta) { + bool inTimer = false; + + taosWLockLatch(&pMeta->lock); + + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(pMeta->pTasks, pIter); + if (pIter == NULL) { + break; + } + + SStreamTask* pTask = *(SStreamTask**)pIter; + if (pTask->status.timerActive >= 1) { + inTimer = true; + } + } + + taosWUnLockLatch(&pMeta->lock); + return inTimer; +} + +void streamMetaNotifyClose(SStreamMeta* pMeta) { + int32_t vgId = pMeta->vgId; + + qDebug("vgId:%d notify all stream tasks that the vnode is closing", vgId); + taosWLockLatch(&pMeta->lock); + + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(pMeta->pTasks, pIter); + if (pIter == NULL) { + break; + } + + SStreamTask* pTask = *(SStreamTask**)pIter; + qDebug("vgId:%d s-task:%s set closing flag", vgId, pTask->id.idStr); + streamTaskStop(pTask); + } + + taosWUnLockLatch(&pMeta->lock); + + // wait for the stream meta hb function stopping + pMeta->hbInfo.stopFlag = STREAM_META_WILL_STOP; + while(pMeta->hbInfo.stopFlag != STREAM_META_OK_TO_STOP) { + taosMsleep(100); + qDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); + } + + qDebug("vgId:%d start to check all tasks", vgId); + int64_t st = taosGetTimestampMs(); + + while (hasStreamTaskInTimer(pMeta)) { + qDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); + taosMsleep(100); + } + + int64_t el = taosGetTimestampMs() - st; + qDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el); } \ No newline at end of file