Merge pull request #29777 from taosdata/fix/disp_lost

fix(stream): add upper bound for the time of waiting for creating table.
This commit is contained in:
Shengliang Guan 2025-02-14 14:45:21 +08:00 committed by GitHub
commit ea16e5bed5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 11 additions and 2 deletions

View File

@ -396,7 +396,8 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal,
tqNotifyClose(pVnode->pImpl->pTq);
dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
dInfo("vgId:%d, wait for vnode stream queue:%p is empty, %d remains", pVnode->vgId,
pVnode->pStreamQ, taosQueueItemSize(pVnode->pStreamQ));
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
dInfo("vgId:%d, wait for vnode stream ctrl queue:%p is empty", pVnode->vgId, pVnode->pStreamCtrlQ);

View File

@ -422,7 +422,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
reqs.nReqs = taosArrayGetSize(reqs.pArray);
code = tqPutReqToQueue(pVnode, &reqs, encodeCreateChildTableForRPC, TDMT_VND_CREATE_TABLE);
if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s failed to send create table msg", id);
tqError("s-task:%s failed to send create table msg, code:%s", id, tstrerror(code));
}
_end:
@ -861,6 +861,8 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI
int32_t vgId = TD_VID(pVnode);
int64_t suid = pTask->outputInfo.tbSink.stbUid;
const char* id = pTask->id.idStr;
int32_t timeout = 300; // 5min
int64_t start = taosGetTimestampSec();
while (pTableSinkInfo->uid == 0) {
if (streamTaskShouldStop(pTask)) {
@ -868,6 +870,12 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI
return TSDB_CODE_STREAM_EXEC_CANCELLED;
}
int64_t waitingDuration = taosGetTimestampSec() - start;
if (waitingDuration > timeout) {
tqError("s-task:%s wait for table-creating:%s more than %dsec, failed", id, dstTableName, timeout);
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
}
// wait for the table to be created
SMetaReader mr = {0};
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);