fix(stream): check node status before start pause trans.

This commit is contained in:
Haojun Liao 2023-11-15 16:02:56 +08:00
parent f299bdb387
commit 1b1ad3068d
2 changed files with 62 additions and 43 deletions

View File

@ -140,8 +140,6 @@ typedef enum EStreamTaskEvent {
TASK_EVENT_RESUME = 0x9,
TASK_EVENT_HALT = 0xA,
TASK_EVENT_DROPPING = 0xB,
TASK_EVENT_SCAN_TSDB = 0xC,
TASK_EVENT_SCAN_WAL = 0xD,
} EStreamTaskEvent;
typedef struct {
@ -316,7 +314,6 @@ typedef struct SStreamStatus {
int8_t schedStatus;
int8_t keepTaskStatus;
bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it
int8_t pauseAllowed; // allowed task status to be set to be paused
int32_t timerActive; // timer is active
int32_t inScanHistorySentinel;
} SStreamStatus;
@ -789,7 +786,6 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
int32_t streamRestoreParam(SStreamTask* pTask);
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta);
void streamTaskResume(SStreamTask* pTask);
void streamTaskEnablePause(SStreamTask* pTask);
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);

View File

@ -1176,6 +1176,50 @@ static int32_t initStreamNodeList(SMnode* pMnode) {
return taosArrayGetSize(execInfo.pNodeList);
}
static bool taskNodeIsUpdated(SMnode* pMnode) {
// check if the node update happens or not
taosThreadMutexLock(&execInfo.lock);
int32_t numOfNodes = initStreamNodeList(pMnode);
if (numOfNodes == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = taosGetTimestampSec();
taosThreadMutexUnlock(&execInfo.lock);
return false;
}
for (int32_t i = 0; i < numOfNodes; ++i) {
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
if (pNodeEntry->stageUpdated) {
mDebug("stream task not ready due to node update detected, checkpoint not issued");
taosThreadMutexUnlock(&execInfo.lock);
return true;
}
}
bool allReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady);
if (!allReady) {
mWarn("not all vnodes ready");
taosArrayDestroy(pNodeSnapshot);
taosThreadMutexUnlock(&execInfo.lock);
return 0;
}
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
taosArrayDestroy(changeInfo.pUpdateNodeList);
taosHashCleanup(changeInfo.pDBMap);
taosArrayDestroy(pNodeSnapshot);
if (nodeUpdated) {
mDebug("stream task not ready due to node update");
}
taosThreadMutexUnlock(&execInfo.lock);
return nodeUpdated;
}
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
@ -1183,50 +1227,23 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SStreamObj *pStream = NULL;
int32_t code = 0;
{ // check if the node update happens or not
int64_t ts = taosGetTimestampSec();
taosThreadMutexLock(&execInfo.lock);
int32_t numOfNodes = initStreamNodeList(pMnode);
taosThreadMutexUnlock(&execInfo.lock);
if (numOfNodes == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = ts;
return 0;
}
for(int32_t i = 0; i < numOfNodes; ++i) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
if (pNodeEntry->stageUpdated) {
mDebug("stream task not ready due to node update detected, checkpoint not issued");
return 0;
}
}
bool allReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady);
if (!allReady) {
mWarn("not all vnodes are ready, ignore the checkpoint") taosArrayDestroy(pNodeSnapshot);
return 0;
}
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
taosArrayDestroy(changeInfo.pUpdateNodeList);
taosHashCleanup(changeInfo.pDBMap);
taosArrayDestroy(pNodeSnapshot);
if (nodeUpdated) {
mDebug("stream task not ready due to node update, checkpoint not issued");
return 0;
}
// check if the node update happens or not
bool updated = taskNodeIsUpdated(pMnode);
if (updated) {
mWarn("checkpoint ignore, stream task nodes update detected");
}
{ // check if all tasks are in TASK_STATUS__READY status
bool ready = true;
taosThreadMutexLock(&execInfo.lock);
// no streams exists, abort
int32_t numOfTasks = taosArrayGetSize(execInfo.pTaskList);
if (numOfTasks <= 0) {
taosThreadMutexUnlock(&execInfo.lock);
return 0;
}
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId * p = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
@ -1799,6 +1816,12 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return -1;
}
bool updated = taskNodeIsUpdated(pMnode);
if (updated) {
mError("tasks are not ready for pause, node update detected");
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "pause-stream");
if (pTrans == NULL) {
mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr());