fix(stream): set the initial checkpointId for a new created stream.
This commit is contained in:
parent
c7d4f45b7f
commit
c73a003ee3
|
@ -153,6 +153,7 @@ bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo *pInfo, int32_t numO
|
|||
void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo);
|
||||
int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo *pInfo, SRpcMsg *pMsg, int64_t checkpointId);
|
||||
int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId);
|
||||
int32_t mndRegisterConsensusChkptId(SHashObj* pHash, int64_t streamId);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -801,8 +801,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
// add into buffer firstly
|
||||
// to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
|
||||
taosThreadMutexLock(&execInfo.lock);
|
||||
mDebug("stream stream:%s start to register tasks into task nodeList", createReq.name);
|
||||
mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name);
|
||||
saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
|
||||
mndRegisterConsensusChkptId(execInfo.pStreamConsensus, streamObj.uid);
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
|
||||
// execute creation
|
||||
|
|
|
@ -930,6 +930,8 @@ bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo* pInfo, int32_t numOfTa
|
|||
|
||||
int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo* pInfo, SStreamObj* pStream) {
|
||||
if (pInfo->genTs > 0) { // there is no checkpoint ever generated if the checkpointId is 0.
|
||||
mDebug("existed consensus-checkpointId:%" PRId64 " for stream:0x%" PRIx64 " %s exist, and return",
|
||||
pInfo->checkpointId, pStream->uid, pStream->name);
|
||||
return pInfo->checkpointId;
|
||||
}
|
||||
|
||||
|
@ -965,3 +967,16 @@ int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) {
|
|||
numOfStreams);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t mndRegisterConsensusChkptId(SHashObj* pHash, int64_t streamId) {
|
||||
void* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId));
|
||||
ASSERT(pInfo == NULL);
|
||||
|
||||
SCheckpointConsensusInfo p = {.genTs = taosGetTimestampMs(), .checkpointId = 0, .pTaskList = NULL};
|
||||
taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p));
|
||||
|
||||
SCheckpointConsensusInfo* pChkptInfo = (SCheckpointConsensusInfo*)taosHashGet(pHash, &streamId, sizeof(streamId));
|
||||
ASSERT(pChkptInfo->genTs > 0 && pChkptInfo->checkpointId == 0);
|
||||
mDebug("s-task:0x%" PRIx64 " set the initial consensus-checkpointId:0", streamId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
Loading…
Reference in New Issue