Merge pull request #26381 from taosdata/fix/3_liaohj

fix(stream): set the initial checkpointId for a new created stream.
This commit is contained in:
Haojun Liao 2024-07-02 22:40:49 +08:00 committed by GitHub
commit 8bacca6644
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 18 additions and 1 deletions

View File

@ -153,6 +153,7 @@ bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo *pInfo, int32_t numO
void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo);
int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo *pInfo, SRpcMsg *pMsg, int64_t checkpointId);
int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId);
int32_t mndRegisterConsensusChkptId(SHashObj* pHash, int64_t streamId);
#ifdef __cplusplus
}

View File

@ -801,8 +801,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
// add into buffer firstly
// to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
taosThreadMutexLock(&execInfo.lock);
mDebug("stream stream:%s start to register tasks into task nodeList", createReq.name);
mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name);
saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
mndRegisterConsensusChkptId(execInfo.pStreamConsensus, streamObj.uid);
taosThreadMutexUnlock(&execInfo.lock);
// execute creation

View File

@ -930,6 +930,8 @@ bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo* pInfo, int32_t numOfTa
int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo* pInfo, SStreamObj* pStream) {
if (pInfo->genTs > 0) { // there is no checkpoint ever generated if the checkpointId is 0.
mDebug("existed consensus-checkpointId:%" PRId64 " for stream:0x%" PRIx64 " %s exist, and return",
pInfo->checkpointId, pStream->uid, pStream->name);
return pInfo->checkpointId;
}
@ -965,3 +967,16 @@ int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) {
numOfStreams);
return TSDB_CODE_SUCCESS;
}
int32_t mndRegisterConsensusChkptId(SHashObj* pHash, int64_t streamId) {
void* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId));
ASSERT(pInfo == NULL);
SCheckpointConsensusInfo p = {.genTs = taosGetTimestampMs(), .checkpointId = 0, .pTaskList = NULL};
taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p));
SCheckpointConsensusInfo* pChkptInfo = (SCheckpointConsensusInfo*)taosHashGet(pHash, &streamId, sizeof(streamId));
ASSERT(pChkptInfo->genTs > 0 && pChkptInfo->checkpointId == 0);
mDebug("s-task:0x%" PRIx64 " set the initial consensus-checkpointId:0", streamId);
return TSDB_CODE_SUCCESS;
}