fix(stream): check stream task status before start checkpoint.
This commit is contained in:
parent
760dd48d7d
commit
130ad28d2c
|
@ -1168,8 +1168,6 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray *pInvalidList = taosArrayInit(4, sizeof(STaskId));
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
|
||||||
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
|
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
|
@ -1181,23 +1179,6 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pEntry->status == TASK_STATUS__STOP) {
|
|
||||||
for (int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) {
|
|
||||||
STaskId *pId = taosArrayGet(pInvalidList, j);
|
|
||||||
if (pId == NULL) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pEntry->id.streamId == pId->streamId) {
|
|
||||||
void *px = taosArrayPush(pInvalidList, &pEntry->id);
|
|
||||||
if (px == NULL) {
|
|
||||||
mError("failed to put stream into invalid list, code:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pEntry->status != TASK_STATUS__READY) {
|
if (pEntry->status != TASK_STATUS__READY) {
|
||||||
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId,
|
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId,
|
||||||
(int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
|
(int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status));
|
||||||
|
@ -1215,9 +1196,6 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
removeTasksInBuf(pInvalidList, &execInfo);
|
|
||||||
taosArrayDestroy(pInvalidList);
|
|
||||||
|
|
||||||
streamMutexUnlock(&execInfo.lock);
|
streamMutexUnlock(&execInfo.lock);
|
||||||
return ready ? 0 : -1;
|
return ready ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
@ -1258,6 +1236,30 @@ static int32_t streamWaitComparFn(const void *p1, const void *p2) {
|
||||||
return pInt1->duration > pInt2->duration ? -1 : 1;
|
return pInt1->duration > pInt2->duration ? -1 : 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// all tasks of this stream should be ready, otherwise do nothing
|
||||||
|
static bool isStreamReadyHelp(int64_t now, SStreamObj* pStream) {
|
||||||
|
bool ready = false;
|
||||||
|
|
||||||
|
streamMutexLock(&execInfo.lock);
|
||||||
|
|
||||||
|
int64_t lastReadyTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
|
||||||
|
if ((lastReadyTs == -1) || ((lastReadyTs != -1) && ((now - lastReadyTs) < tsStreamCheckpointInterval * 1000))) {
|
||||||
|
if (lastReadyTs != -1) {
|
||||||
|
mInfo("not start checkpoint, stream:0x%"PRIx64" last ready ts:%"PRId64" ready duration:%"PRId64" less than threshold",
|
||||||
|
pStream->uid, lastReadyTs, now - lastReadyTs);
|
||||||
|
} else {
|
||||||
|
mInfo("not start checkpoint, stream:0x%"PRIx64" not ready now", pStream->uid);
|
||||||
|
}
|
||||||
|
|
||||||
|
ready = false;
|
||||||
|
} else {
|
||||||
|
ready = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMutexUnlock(&execInfo.lock);
|
||||||
|
return ready;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
|
static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
@ -1284,20 +1286,17 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMutexLock(&execInfo.lock);
|
bool ready = isStreamReadyHelp(now, pStream);
|
||||||
int64_t startTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid);
|
if (!ready) {
|
||||||
if (startTs != -1 && (now - startTs) < tsStreamCheckpointInterval * 1000) {
|
|
||||||
streamMutexUnlock(&execInfo.lock);
|
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
streamMutexUnlock(&execInfo.lock);
|
|
||||||
|
|
||||||
SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
|
SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration};
|
||||||
void *p = taosArrayPush(pList, &in);
|
void *p = taosArrayPush(pList, &in);
|
||||||
if (p) {
|
if (p) {
|
||||||
int32_t currentSize = taosArrayGetSize(pList);
|
int32_t currentSize = taosArrayGetSize(pList);
|
||||||
mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chpt interval threshold: %ds(%" PRId64
|
mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chkpt interval threshold: %ds(%" PRId64
|
||||||
"s), concurrently launch threshold:%d",
|
"s), concurrently launch threshold:%d",
|
||||||
pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000,
|
pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000,
|
||||||
tsMaxConcurrentCheckpoint);
|
tsMaxConcurrentCheckpoint);
|
||||||
|
|
Loading…
Reference in New Issue