refactor(stream): delay checkpointInterval to generate the checkpoint after stream started.
This commit is contained in:
parent
4cfecfcf28
commit
674acd0e9f
|
@ -1134,6 +1134,7 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
|
|||
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId,
|
||||
(int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
|
||||
ready = false;
|
||||
break;
|
||||
}
|
||||
|
||||
if (pEntry->hTaskId != 0) {
|
||||
|
@ -1153,6 +1154,27 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
|
|||
return ready ? 0 : -1;
|
||||
}
|
||||
|
||||
int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) {
|
||||
int64_t ts = -1;
|
||||
int32_t taskId = -1;
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
|
||||
STaskId *p = taosArrayGet(pTaskList, i);
|
||||
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
|
||||
if (pEntry == NULL || pEntry->id.streamId != streamId) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pEntry->status == TASK_STATUS__READY && ts < pEntry->startTime) {
|
||||
ts = pEntry->startTime;
|
||||
taskId = pEntry->id.taskId;
|
||||
}
|
||||
}
|
||||
|
||||
mDebug("stream:0x%" PRIx64 " last ready ts:%" PRId64 " s-task:0x%x", streamId, ts, taskId);
|
||||
return ts;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
int64_t duration;
|
||||
|
@ -1191,6 +1213,15 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
|
|||
continue;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&execInfo.lock);
|
||||
int64_t startTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
|
||||
if (startTs != -1 && (now - startTs) < tsStreamCheckpointInterval * 1000) {
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
sdbRelease(pSdb, pStream);
|
||||
continue;
|
||||
}
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
|
||||
SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
|
||||
taosArrayPush(pList, &in);
|
||||
|
||||
|
|
Loading…
Reference in New Issue