diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 4f485eb2ee..cf7cda4f34 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -97,6 +97,11 @@ enum { STREAM_QUEUE__PROCESSING, }; +enum { + STREAM_META_WILL_STOP = 1, + STREAM_META_OK_TO_STOP = 2, +}; + typedef struct { int8_t type; } SStreamQueueItem; @@ -389,8 +394,8 @@ typedef struct SStreamMeta { SHashObj* pTaskBackendUnique; TdThreadMutex backendMutex; tmr_h hbTmr; -// SMgmtInfo mgmtInfo; + int32_t killed; int32_t closedTask; int32_t chkptNotReadyTasks; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index c72ddbf8b9..5961821445 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -738,7 +738,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; // if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 251e8bc37d..5fe2bd448c 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2088,7 +2088,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange mDebug("stream:0x%" PRIx64 " involved node changed, create update trans", pStream->uid); int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo); if (code != TSDB_CODE_SUCCESS) { - // todo + return code; } } @@ -2160,6 +2160,7 @@ static void doExtractTasksFromStream(SMnode *pMnode) { // this function runs by only one thread, so it is not multi-thread safe static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { + int32_t code = 0; int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1); if (old != 0) { mDebug("still in checking node change"); @@ -2189,16 +2190,18 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { - mndProcessVgroupChange(pMnode, &changeInfo); + code = mndProcessVgroupChange(pMnode, &changeInfo); } taosArrayDestroy(changeInfo.pUpdateNodeList); taosHashCleanup(changeInfo.pDBMap); // keep the new vnode snapshot - taosArrayDestroy(execNodeList.pNodeEntryList); - execNodeList.pNodeEntryList = pNodeSnapshot; - execNodeList.ts = ts; + if (code == TSDB_CODE_SUCCESS) { + taosArrayDestroy(execNodeList.pNodeEntryList); + execNodeList.pNodeEntryList = pNodeSnapshot; + execNodeList.ts = ts; + } mDebug("end to do stream task node change checking"); atomic_store_32(&mndNodeCheckSentinel, 0); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5ef750b2b0..96e1385ce2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -190,7 +190,6 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) { } taosWUnLockLatch(&pMeta->lock); - return inTimer; } @@ -219,6 +218,12 @@ void tqNotifyClose(STQ* pTq) { taosWUnLockLatch(&pMeta->lock); + pMeta->killed = STREAM_META_WILL_STOP; + while(pMeta->killed != STREAM_META_OK_TO_STOP) { + taosMsleep(100); + tqDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); + } + tqDebug("vgId:%d start to check all tasks", vgId); int64_t st = taosGetTimestampMs(); @@ -1432,7 +1437,6 @@ int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { return code; } -// NOTE: the rsp msg should be kept in WAL file. int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -1461,20 +1465,6 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " "completed msg", pTask->id.idStr, req.downstreamId); - - // the scan-history finish status should be recorded in the WAL files. So the transfer of the task status from - // scan-history - // to normal should be executed by write thread of each vnode. - -// void* buf = NULL; -// int32_t tlen = 0; -// // encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen); -// -// SRpcMsg msg = {.msgType = TDMT_VND_CREATE_TABLE, .pCont = buf, .contLen = tlen}; -// if (tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { -// tqError("failed to put into write-queue since %s", terrstr()); -// } - streamProcessScanHistoryFinishRsp(pTask); } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index bb9146468f..2e175de0b8 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -691,7 +691,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) } void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { - // blockDebugShowDataBlocks(data, __func__); tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data); } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 032ce1d608..3d5e0a8f29 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -362,7 +362,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { streamTaskEnablePause(pStreamTask); if (taosQueueEmpty(pStreamTask->inputQueue->queue)) { SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); - ; + SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); pDelBlock->info.rows = 0; pDelBlock->info.version = 0; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 4cf67c255d..ac2f7e02df 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -607,6 +607,12 @@ void metaHbToMnode(void* param, void* tmrId) { SStreamMeta* pMeta = param; SStreamHbMsg hbMsg = {0}; + if (pMeta->killed == STREAM_META_WILL_STOP) { + pMeta->killed = STREAM_META_OK_TO_STOP; + qDebug("vgId:%d jump out of meta timer", pMeta->vgId); + return; + } + taosRLockLatch(&pMeta->lock); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c33c5f40cd..029a71ed7b 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -477,7 +477,7 @@ int32_t streamTaskStop(SStreamTask* pTask) { pTask->status.taskStatus = TASK_STATUS__STOP; qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); - while (pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE) { + while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */!streamTaskIsIdle(pTask)) { qDebug("s-task:%s level:%d wait for task to be idle, check again in 100ms", id, pTask->info.taskLevel); taosMsleep(100); }