refactor(stream): allow ready stream to start checkpoint procedure.

This commit is contained in:
Haojun Liao 2025-02-10 16:08:18 +08:00
parent 9ad6fd170b
commit 6c7a8e9774
1 changed files with 25 additions and 39 deletions

View File

@ -1158,51 +1158,22 @@ int32_t extractStreamNodeList(SMnode *pMnode) {
}
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
bool ready = true;
int32_t code = 0;
if (mndStreamNodeIsUpdated(pMnode)) {
TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
streamMutexLock(&execInfo.lock);
if (taosArrayGetSize(execInfo.pNodeList) == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing");
if (taosArrayGetSize(execInfo.pTaskList) != 0) {
streamMutexUnlock(&execInfo.lock);
mError("stream task node change checking done, no vgroups exist, but task list is not empty");
return TSDB_CODE_FAILED;
}
}
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
if (p == NULL) {
continue;
}
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
if (pEntry == NULL) {
continue;
}
if (pEntry->status != TASK_STATUS__READY) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId,
(int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
ready = false;
break;
}
if (pEntry->hTaskId != 0) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64
" exists, checkpoint not issued",
pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status),
pEntry->hTaskId);
ready = false;
break;
code = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
}
streamMutexUnlock(&execInfo.lock);
return ready ? 0 : -1;
return code;
}
int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
@ -1216,7 +1187,22 @@ int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
continue;
}
if (pEntry->status == TASK_STATUS__READY && ts < pEntry->startTime) {
// -1 denote not ready now or never ready till now
if (pEntry->hTaskId != 0) {
mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s related fill-history task:0x%" PRIx64
" exists, checkpoint not issued",
pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status),
pEntry->hTaskId);
return -1;
}
if (pEntry->status != TASK_STATUS__READY) {
mInfo("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, not ready for checkpoint", pEntry->id.streamId,
(int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
return -1;
}
if (ts < pEntry->startTime) {
ts = pEntry->startTime;
taskId = pEntry->id.taskId;
}
@ -1249,11 +1235,11 @@ static bool isStreamReadyHelp(int64_t now, SStreamObj* pStream) {
int64_t lastReadyTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
if ((lastReadyTs == -1) || ((lastReadyTs != -1) && ((now - lastReadyTs) < tsStreamCheckpointInterval * 1000))) {
if (lastReadyTs != -1) {
mInfo("not start checkpoint, stream:0x%"PRIx64" last ready ts:%"PRId64" ready duration:%"PRId64" less than threshold",
pStream->uid, lastReadyTs, now - lastReadyTs);
} else {
mInfo("not start checkpoint, stream:0x%"PRIx64" not ready now", pStream->uid);
mInfo("not start checkpoint, stream:0x%" PRIx64 " last ready ts:%" PRId64 " ready duration:%" PRId64
"ms less than threshold",
pStream->uid, lastReadyTs, (now - lastReadyTs));
}
ready = false;
@ -1274,7 +1260,7 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
int32_t numOfCheckpointTrans = 0;
if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
TAOS_RETURN(TSDB_CODE_STREAM_TASK_IVLD_STATUS);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
SArray *pList = taosArrayInit(4, sizeof(SCheckpointInterval));