Merge pull request #26526 from taosdata/fix/3_liaohj

fix(stream):commit the update of table meta.
This commit is contained in:
Haojun Liao 2024-07-12 15:11:51 +08:00 committed by GitHub
commit 58b44de576
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 16 additions and 5 deletions

View File

@ -59,7 +59,6 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList);
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
//static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg);
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
@ -966,8 +965,8 @@ static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask
return -1;
}
code =
setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY, 0);
code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY,
TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != 0) {
taosMemoryFree(buf);
}
@ -2317,6 +2316,8 @@ static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList) {
}
taosHashCleanup(pHash);
mDebug("numOfNodes:%d for stream after extract nodeInfo from stream", (int32_t)taosArrayGetSize(pNodeList));
return TSDB_CODE_SUCCESS;
}
@ -2905,7 +2906,6 @@ void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
}
addAllStreamTasksIntoBuf(pMnode, pExecInfo);
extractNodeListFromStream(pMnode, pExecInfo->pNodeList);
pExecInfo->initTaskList = true;
}

View File

@ -255,7 +255,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
mndInitStreamExecInfo(pMnode, &execInfo);
if (!validateHbMsg(execInfo.pNodeList, req.vgId)) {
mError("invalid hbMsg from vgId:%d, discarded", req.vgId);
mError("vgId:%d not exists in nodeList buf, discarded", req.vgId);
terrno = TSDB_CODE_INVALID_MSG;
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);

View File

@ -182,6 +182,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
" recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
taosThreadMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return TSDB_CODE_SUCCESS;
}
@ -201,6 +203,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
taosThreadMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return TSDB_CODE_SUCCESS;
}
@ -210,6 +214,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
" discard",
id, vgId, pActiveInfo->activeId, checkpointId);
taosThreadMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return TSDB_CODE_SUCCESS;
} else { // checkpointId == pActiveInfo->activeId
if (pActiveInfo->allUpstreamTriggerRecv == 1) {
@ -218,6 +224,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
"checkpointId:%" PRId64 " transId:%d",
id, vgId, checkpointId, transId);
taosThreadMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return TSDB_CODE_SUCCESS;
}
@ -232,6 +239,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
taosThreadMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return TSDB_CODE_SUCCESS;
}
}
@ -455,6 +463,9 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
id, vgId, pReq->taskId, numOfTasks);
}
streamMetaWLock(pMeta);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
}
return TSDB_CODE_SUCCESS;