From e5e57923cde80a4edabe978a7e4099c841322b54 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 23 Jan 2025 13:18:56 +0800 Subject: [PATCH 1/4] fix(stream): add upper bound for the time of waiting for creating table. --- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 5 +++-- source/dnode/vnode/src/tq/tqSink.c | 8 ++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 6d5e117181..199c0411b8 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -395,8 +395,9 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); tqNotifyClose(pVnode->pImpl->pTq); - dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ); - while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); + dInfo("vgId:%d, wait for vnode stream queue:%p is empty, %d remains, ignore and continue close", pVnode->vgId, + pVnode->pStreamQ, taosQueueItemSize(pVnode->pStreamQ)); +// while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 7ba77cf813..77d80269e6 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -861,6 +861,8 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI int32_t vgId = TD_VID(pVnode); int64_t suid = pTask->outputInfo.tbSink.stbUid; const char* id = pTask->id.idStr; + int32_t timeout = 600; // 10min + int64_t start = taosGetTimestampSec(); while (pTableSinkInfo->uid == 0) { if (streamTaskShouldStop(pTask)) { @@ -868,6 +870,12 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI return TSDB_CODE_STREAM_EXEC_CANCELLED; } + int64_t waitingDuration = taosGetTimestampSec() - start; + if (waitingDuration > timeout) { + tqError("s-task:%s wait for table-creating:%s more than %dsec, failed", id, dstTableName, timeout); + return TSDB_CODE_PAR_TABLE_NOT_EXIST; + } + // wait for the table to be created SMetaReader mr = {0}; metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK); From 6c4dc665e0a4520fa71e0fca804a444ca31f576a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 23 Jan 2025 13:37:06 +0800 Subject: [PATCH 2/4] fix(stream): wait for queue to be empty. --- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 199c0411b8..e7e536e2ac 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -395,9 +395,9 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); tqNotifyClose(pVnode->pImpl->pTq); - dInfo("vgId:%d, wait for vnode stream queue:%p is empty, %d remains, ignore and continue close", pVnode->vgId, + dInfo("vgId:%d, wait for vnode stream queue:%p is empty, %d remains", pVnode->vgId, pVnode->pStreamQ, taosQueueItemSize(pVnode->pStreamQ)); -// while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId); From 6cc3b5da0e3a78dc3dbdfa029fbcf6c38912d766 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 23 Jan 2025 13:37:41 +0800 Subject: [PATCH 3/4] fix(stream): add upper bound for the time of waiting for creating table. --- source/dnode/vnode/src/tq/tqSink.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 77d80269e6..1b2ff8cb57 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -861,7 +861,7 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI int32_t vgId = TD_VID(pVnode); int64_t suid = pTask->outputInfo.tbSink.stbUid; const char* id = pTask->id.idStr; - int32_t timeout = 600; // 10min + int32_t timeout = 300; // 5min int64_t start = taosGetTimestampSec(); while (pTableSinkInfo->uid == 0) { From 963e397984e2f02e8bb8e737558c70878327cda0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 23 Jan 2025 14:45:47 +0800 Subject: [PATCH 4/4] fix(stream): update logs. --- source/dnode/vnode/src/tq/tqSink.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 1b2ff8cb57..4c2d0d7531 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -422,7 +422,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S reqs.nReqs = taosArrayGetSize(reqs.pArray); code = tqPutReqToQueue(pVnode, &reqs, encodeCreateChildTableForRPC, TDMT_VND_CREATE_TABLE); if (code != TSDB_CODE_SUCCESS) { - tqError("s-task:%s failed to send create table msg", id); + tqError("s-task:%s failed to send create table msg, code:%s", id, tstrerror(code)); } _end: