Merge pull request #23707 from taosdata/fix/pause_stream

fix(stream): check the status before pause
This commit is contained in:
Haojun Liao 2023-11-15 18:00:48 +08:00 committed by GitHub
commit f37150fbd1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 63 additions and 41 deletions

View File

@ -140,8 +140,6 @@ typedef enum EStreamTaskEvent {
TASK_EVENT_RESUME = 0x9,
TASK_EVENT_HALT = 0xA,
TASK_EVENT_DROPPING = 0xB,
TASK_EVENT_SCAN_TSDB = 0xC,
TASK_EVENT_SCAN_WAL = 0xD,
} EStreamTaskEvent;
typedef struct {

View File

@ -1156,6 +1156,50 @@ static int32_t initStreamNodeList(SMnode* pMnode) {
return taosArrayGetSize(execInfo.pNodeList);
}
static bool taskNodeIsUpdated(SMnode* pMnode) {
// check if the node update happens or not
taosThreadMutexLock(&execInfo.lock);
int32_t numOfNodes = initStreamNodeList(pMnode);
if (numOfNodes == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = taosGetTimestampSec();
taosThreadMutexUnlock(&execInfo.lock);
return false;
}
for (int32_t i = 0; i < numOfNodes; ++i) {
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
if (pNodeEntry->stageUpdated) {
mDebug("stream task not ready due to node update detected, checkpoint not issued");
taosThreadMutexUnlock(&execInfo.lock);
return true;
}
}
bool allReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady);
if (!allReady) {
mWarn("not all vnodes ready");
taosArrayDestroy(pNodeSnapshot);
taosThreadMutexUnlock(&execInfo.lock);
return 0;
}
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
taosArrayDestroy(changeInfo.pUpdateNodeList);
taosHashCleanup(changeInfo.pDBMap);
taosArrayDestroy(pNodeSnapshot);
if (nodeUpdated) {
mDebug("stream task not ready due to node update");
}
taosThreadMutexUnlock(&execInfo.lock);
return nodeUpdated;
}
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
@ -1163,50 +1207,24 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SStreamObj *pStream = NULL;
int32_t code = 0;
{ // check if the node update happens or not
int64_t ts = taosGetTimestampSec();
taosThreadMutexLock(&execInfo.lock);
int32_t numOfNodes = initStreamNodeList(pMnode);
taosThreadMutexUnlock(&execInfo.lock);
if (numOfNodes == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = ts;
return 0;
}
for(int32_t i = 0; i < numOfNodes; ++i) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
if (pNodeEntry->stageUpdated) {
mDebug("stream task not ready due to node update detected, checkpoint not issued");
return 0;
}
}
bool allReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady);
if (!allReady) {
mWarn("not all vnodes are ready, ignore the checkpoint") taosArrayDestroy(pNodeSnapshot);
return 0;
}
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
taosArrayDestroy(changeInfo.pUpdateNodeList);
taosHashCleanup(changeInfo.pDBMap);
taosArrayDestroy(pNodeSnapshot);
if (nodeUpdated) {
mDebug("stream task not ready due to node update, checkpoint not issued");
return 0;
}
// check if the node update happens or not
bool updated = taskNodeIsUpdated(pMnode);
if (updated) {
mWarn("checkpoint ignore, stream task nodes update detected");
return -1;
}
{ // check if all tasks are in TASK_STATUS__READY status
bool ready = true;
taosThreadMutexLock(&execInfo.lock);
// no streams exists, abort
int32_t numOfTasks = taosArrayGetSize(execInfo.pTaskList);
if (numOfTasks <= 0) {
taosThreadMutexUnlock(&execInfo.lock);
return 0;
}
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId * p = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
@ -1762,6 +1780,12 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return -1;
}
bool updated = taskNodeIsUpdated(pMnode);
if (updated) {
mError("tasks are not ready for pause, node update detected");
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "pause-stream");
if (pTrans == NULL) {
mError("stream:%s, failed to pause stream since %s", pauseReq.name, terrstr());