refactor stream backend

This commit is contained in:
yihaoDeng 2023-10-23 14:06:25 +08:00
parent 1c3c58062d
commit 9d210ec957
1 changed files with 45 additions and 46 deletions

View File

@ -1119,36 +1119,30 @@ static const char *mndGetStreamDB(SMnode *pMnode) {
return p; return p;
} }
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { static int32_t mndCheckNodeStatus(SMnode *pMnode) {
SMnode *pMnode = pReq->info.node; bool ready = true;
SSdb *pSdb = pMnode->pSdb; // check if the node update happens or not
void *pIter = NULL; int64_t ts = taosGetTimestampSec();
SStreamObj *pStream = NULL;
int32_t code = 0;
{ // check if the node update happens or not if (execNodeList.pNodeEntryList == NULL || (taosArrayGetSize(execNodeList.pNodeEntryList) == 0)) {
int64_t ts = taosGetTimestampSec(); if (execNodeList.pNodeEntryList != NULL) {
execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList);
if (execNodeList.pNodeEntryList == NULL || (taosArrayGetSize(execNodeList.pNodeEntryList) == 0)) {
if (execNodeList.pNodeEntryList != NULL) {
execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList);
}
execNodeList.pNodeEntryList = extractNodeListFromStream(pMnode);
} }
if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) { execNodeList.pNodeEntryList = extractNodeListFromStream(pMnode);
mDebug("stream task node change checking done, no vgroups exist, do nothing"); }
execNodeList.ts = ts;
return 0;
}
for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) {
SNodeEntry *pNodeEntry = taosArrayGet(execNodeList.pNodeEntryList, i); mDebug("stream task node change checking done, no vgroups exist, do nothing");
if (pNodeEntry->stageUpdated) { execNodeList.ts = ts;
mDebug("stream task not ready due to node update detected, checkpoint not issued"); return -1;
return 0; }
}
for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
SNodeEntry *pNodeEntry = taosArrayGet(execNodeList.pNodeEntryList, i);
if (pNodeEntry->stageUpdated) {
mDebug("stream task not ready due to node update detected, checkpoint not issued");
return -1;
} }
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
@ -1161,34 +1155,39 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
if (nodeUpdated) { if (nodeUpdated) {
mDebug("stream task not ready due to node update, checkpoint not issued"); mDebug("stream task not ready due to node update, checkpoint not issued");
return 0; return -1;
} }
} }
{ // check if all tasks are in TASK_STATUS__NORMAL status // check if all tasks are in TASK_STATUS__NORMAL status
bool ready = true;
taosThreadMutexLock(&execNodeList.lock); taosThreadMutexLock(&execNodeList.lock);
for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) {
STaskId *p = taosArrayGet(execNodeList.pTaskList, i); STaskId *p = taosArrayGet(execNodeList.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execNodeList.pTaskMap, p, sizeof(*p)); STaskStatusEntry *pEntry = taosHashGet(execNodeList.pTaskMap, p, sizeof(*p));
if (pEntry == NULL) { if (pEntry == NULL) {
continue; continue;
}
if (pEntry->status != TASK_STATUS__NORMAL) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued",
pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamGetTaskStatusStr(pEntry->status));
ready = false;
break;
}
} }
taosThreadMutexUnlock(&execNodeList.lock);
if (!ready) { if (pEntry->status != TASK_STATUS__NORMAL) {
return 0; mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued",
pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamGetTaskStatusStr(pEntry->status));
ready = false;
break;
} }
} }
taosThreadMutexUnlock(&execNodeList.lock);
return ready == true ? 0 : -1;
}
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SStreamObj *pStream = NULL;
int32_t code = 0;
if ((code = mndCheckNodeStatus(pMnode)) != 0) {
return code;
}
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont; SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
int64_t checkpointId = pMsg->checkpointId; int64_t checkpointId = pMsg->checkpointId;