fix(stream): add upper bound for the time of waiting for creating table.

This commit is contained in:
Haojun Liao 2025-01-23 13:18:56 +08:00
parent 9f2c448e36
commit e5e57923cd
2 changed files with 11 additions and 2 deletions

View File

@ -395,8 +395,9 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal,
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
tqNotifyClose(pVnode->pImpl->pTq);
dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
dInfo("vgId:%d, wait for vnode stream queue:%p is empty, %d remains, ignore and continue close", pVnode->vgId,
pVnode->pStreamQ, taosQueueItemSize(pVnode->pStreamQ));
// while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);

View File

@ -861,6 +861,8 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI
int32_t vgId = TD_VID(pVnode);
int64_t suid = pTask->outputInfo.tbSink.stbUid;
const char* id = pTask->id.idStr;
int32_t timeout = 600; // 10min
int64_t start = taosGetTimestampSec();
while (pTableSinkInfo->uid == 0) {
if (streamTaskShouldStop(pTask)) {
@ -868,6 +870,12 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI
return TSDB_CODE_STREAM_EXEC_CANCELLED;
}
int64_t waitingDuration = taosGetTimestampSec() - start;
if (waitingDuration > timeout) {
tqError("s-task:%s wait for table-creating:%s more than %dsec, failed", id, dstTableName, timeout);
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
}
// wait for the table to be created
SMetaReader mr = {0};
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);