refactor: do some internal refactor.
This commit is contained in:
parent
21e98e358e
commit
74572dd6a7
|
@ -42,14 +42,14 @@ typedef struct SNodeEntry {
|
||||||
int64_t hbTimestamp; // second
|
int64_t hbTimestamp; // second
|
||||||
} SNodeEntry;
|
} SNodeEntry;
|
||||||
|
|
||||||
typedef struct SStreamExecNodeInfo {
|
typedef struct SStreamExecInfo {
|
||||||
SArray *pNodeEntryList;
|
SArray *pNodeEntryList;
|
||||||
int64_t ts; // snapshot ts
|
int64_t ts; // snapshot ts
|
||||||
int64_t activeCheckpoint; // active check point id
|
int64_t activeCheckpoint; // active check point id
|
||||||
SHashObj *pTaskMap;
|
SHashObj *pTaskMap;
|
||||||
SArray *pTaskList;
|
SArray *pTaskList;
|
||||||
TdThreadMutex lock;
|
TdThreadMutex lock;
|
||||||
} SStreamExecNodeInfo;
|
} SStreamExecInfo;
|
||||||
|
|
||||||
typedef struct SVgroupChangeInfo {
|
typedef struct SVgroupChangeInfo {
|
||||||
SHashObj *pDBMap;
|
SHashObj *pDBMap;
|
||||||
|
@ -57,7 +57,7 @@ typedef struct SVgroupChangeInfo {
|
||||||
} SVgroupChangeInfo;
|
} SVgroupChangeInfo;
|
||||||
|
|
||||||
static int32_t mndNodeCheckSentinel = 0;
|
static int32_t mndNodeCheckSentinel = 0;
|
||||||
static SStreamExecNodeInfo execNodeList;
|
static SStreamExecInfo execInfo;
|
||||||
|
|
||||||
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
|
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
|
||||||
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
|
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
|
||||||
|
@ -77,18 +77,17 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
|
||||||
int64_t streamId, int32_t taskId);
|
int64_t streamId, int32_t taskId);
|
||||||
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
|
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
|
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
|
||||||
|
|
||||||
static SArray *extractNodeListFromStream(SMnode *pMnode);
|
static SArray *extractNodeListFromStream(SMnode *pMnode);
|
||||||
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode);
|
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode);
|
||||||
|
|
||||||
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
|
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
|
||||||
|
|
||||||
static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name);
|
static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name);
|
||||||
static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans);
|
static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans);
|
||||||
static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset);
|
static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset);
|
||||||
static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
|
static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
|
||||||
|
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
||||||
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode);
|
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
||||||
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode);
|
|
||||||
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
|
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
|
||||||
static int32_t doKillActiveCheckpointTrans(SMnode *pMnode);
|
static int32_t doKillActiveCheckpointTrans(SMnode *pMnode);
|
||||||
|
|
||||||
|
@ -130,18 +129,18 @@ int32_t mndInitStream(SMnode *pMnode) {
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
|
||||||
|
|
||||||
taosThreadMutexInit(&execNodeList.lock, NULL);
|
taosThreadMutexInit(&execInfo.lock, NULL);
|
||||||
execNodeList.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
execInfo.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
||||||
execNodeList.pTaskList = taosArrayInit(4, sizeof(STaskId));
|
execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
|
||||||
|
|
||||||
return sdbSetTable(pMnode->pSdb, table);
|
return sdbSetTable(pMnode->pSdb, table);
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndCleanupStream(SMnode *pMnode) {
|
void mndCleanupStream(SMnode *pMnode) {
|
||||||
taosArrayDestroy(execNodeList.pTaskList);
|
taosArrayDestroy(execInfo.pTaskList);
|
||||||
taosHashCleanup(execNodeList.pTaskMap);
|
taosHashCleanup(execInfo.pTaskMap);
|
||||||
taosThreadMutexDestroy(&execNodeList.lock);
|
taosThreadMutexDestroy(&execInfo.lock);
|
||||||
mDebug("mnd stream cleanup");
|
mDebug("mnd stream exec info cleanup");
|
||||||
}
|
}
|
||||||
|
|
||||||
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
|
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
|
||||||
|
@ -848,10 +847,10 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
|
||||||
taosThreadMutexLock(&execNodeList.lock);
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
mDebug("register to stream task node list");
|
mDebug("register to stream task node list");
|
||||||
keepStreamTasksInBuf(&streamObj, &execNodeList);
|
keepStreamTasksInBuf(&streamObj, &execInfo);
|
||||||
taosThreadMutexUnlock(&execNodeList.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
|
||||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
|
@ -883,9 +882,8 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t checkpointId = taosGetTimestampMs();
|
|
||||||
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
|
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
|
||||||
pMsg->checkpointId = checkpointId;
|
pMsg->checkpointId = taosGetTimestampMs();
|
||||||
|
|
||||||
int32_t size = sizeof(SMStreamDoCheckpointMsg);
|
int32_t size = sizeof(SMStreamDoCheckpointMsg);
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size};
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size};
|
||||||
|
@ -1085,6 +1083,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
|
||||||
pStream->checkpointId = chkptId;
|
pStream->checkpointId = chkptId;
|
||||||
pStream->checkpointFreq = taosGetTimestampMs();
|
pStream->checkpointFreq = taosGetTimestampMs();
|
||||||
pStream->currentTick = 0;
|
pStream->currentTick = 0;
|
||||||
|
|
||||||
// 3. commit log: stream checkpoint info
|
// 3. commit log: stream checkpoint info
|
||||||
pStream->version = pStream->version + 1;
|
pStream->version = pStream->version + 1;
|
||||||
|
|
||||||
|
@ -1134,22 +1133,22 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
{ // check if the node update happens or not
|
{ // check if the node update happens or not
|
||||||
int64_t ts = taosGetTimestampSec();
|
int64_t ts = taosGetTimestampSec();
|
||||||
|
|
||||||
if (execNodeList.pNodeEntryList == NULL || (taosArrayGetSize(execNodeList.pNodeEntryList) == 0)) {
|
if (execInfo.pNodeEntryList == NULL || (taosArrayGetSize(execInfo.pNodeEntryList) == 0)) {
|
||||||
if (execNodeList.pNodeEntryList != NULL) {
|
if (execInfo.pNodeEntryList != NULL) {
|
||||||
execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList);
|
execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList);
|
||||||
}
|
}
|
||||||
|
|
||||||
execNodeList.pNodeEntryList = extractNodeListFromStream(pMnode);
|
execInfo.pNodeEntryList = extractNodeListFromStream(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) {
|
if (taosArrayGetSize(execInfo.pNodeEntryList) == 0) {
|
||||||
mDebug("stream task node change checking done, no vgroups exist, do nothing");
|
mDebug("stream task node change checking done, no vgroups exist, do nothing");
|
||||||
execNodeList.ts = ts;
|
execInfo.ts = ts;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
|
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) {
|
||||||
SNodeEntry* pNodeEntry = taosArrayGet(execNodeList.pNodeEntryList, i);
|
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i);
|
||||||
if (pNodeEntry->stageUpdated) {
|
if (pNodeEntry->stageUpdated) {
|
||||||
mDebug("stream task not ready due to node update detected, checkpoint not issued");
|
mDebug("stream task not ready due to node update detected, checkpoint not issued");
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1158,7 +1157,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
|
|
||||||
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
|
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
|
||||||
|
|
||||||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
|
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot);
|
||||||
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
|
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
|
||||||
taosArrayDestroy(changeInfo.pUpdateNodeList);
|
taosArrayDestroy(changeInfo.pUpdateNodeList);
|
||||||
taosHashCleanup(changeInfo.pDBMap);
|
taosHashCleanup(changeInfo.pDBMap);
|
||||||
|
@ -1173,10 +1172,10 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
{ // check if all tasks are in TASK_STATUS__READY status
|
{ // check if all tasks are in TASK_STATUS__READY status
|
||||||
bool ready = true;
|
bool ready = true;
|
||||||
|
|
||||||
taosThreadMutexLock(&execNodeList.lock);
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
|
||||||
STaskId *p = taosArrayGet(execNodeList.pTaskList, i);
|
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
|
||||||
STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, p, sizeof(*p));
|
STaskStatusEntry* pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
|
||||||
if (pEntry == NULL) {
|
if (pEntry == NULL) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1188,7 +1187,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&execNodeList.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
if (!ready) {
|
if (!ready) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1229,11 +1228,16 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
mError("failed to prepre trans rebalance since %s", terrstr());
|
mError("failed to prepare trans rebalance since %s", terrstr());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
|
||||||
|
// only one trans here
|
||||||
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
|
execInfo.activeCheckpoint = checkpointId;
|
||||||
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1311,7 +1315,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
removeStreamTasksInBuf(pStream, &execNodeList);
|
removeStreamTasksInBuf(pStream, &execInfo);
|
||||||
|
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB);
|
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB);
|
||||||
|
@ -1562,7 +1566,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
||||||
char status[20 + VARSTR_HEADER_SIZE] = {0};
|
char status[20 + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
|
||||||
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
||||||
STaskStatusEntry* pe = taosHashGet(execNodeList.pTaskMap, &id, sizeof(id));
|
STaskStatusEntry* pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
|
||||||
if (pe == NULL) {
|
if (pe == NULL) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -2196,12 +2200,12 @@ static void doExtractTasksFromStream(SMnode *pMnode) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
keepStreamTasksInBuf(pStream, &execNodeList);
|
keepStreamTasksInBuf(pStream, &execInfo);
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doRemoveFromTask(SStreamExecNodeInfo* pExecNode, STaskId* pRemovedId) {
|
static int32_t doRemoveFromTask(SStreamExecInfo* pExecNode, STaskId* pRemovedId) {
|
||||||
void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
|
void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId));
|
||||||
|
|
||||||
if (p != NULL) {
|
if (p != NULL) {
|
||||||
|
@ -2236,10 +2240,10 @@ static bool taskNodeExists(SArray* pList, int32_t nodeId) {
|
||||||
int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
|
int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
|
||||||
SArray* pRemoveTaskList = taosArrayInit(4, sizeof(STaskId));
|
SArray* pRemoveTaskList = taosArrayInit(4, sizeof(STaskId));
|
||||||
|
|
||||||
int32_t numOfTask = taosArrayGetSize(execNodeList.pTaskList);
|
int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
|
||||||
for(int32_t i = 0; i < numOfTask; ++i) {
|
for(int32_t i = 0; i < numOfTask; ++i) {
|
||||||
STaskId* pId = taosArrayGet(execNodeList.pTaskList, i);
|
STaskId* pId = taosArrayGet(execInfo.pTaskList, i);
|
||||||
STaskStatusEntry* pEntry = taosHashGet(execNodeList.pTaskMap, pId, sizeof(*pId));
|
STaskStatusEntry* pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
|
||||||
|
|
||||||
bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
|
bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
|
||||||
if (!existed) {
|
if (!existed) {
|
||||||
|
@ -2249,16 +2253,16 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
|
||||||
|
|
||||||
for(int32_t i = 0; i < taosArrayGetSize(pRemoveTaskList); ++i) {
|
for(int32_t i = 0; i < taosArrayGetSize(pRemoveTaskList); ++i) {
|
||||||
STaskId* pId = taosArrayGet(pRemoveTaskList, i);
|
STaskId* pId = taosArrayGet(pRemoveTaskList, i);
|
||||||
doRemoveFromTask(&execNodeList, pId);
|
doRemoveFromTask(&execInfo, pId);
|
||||||
}
|
}
|
||||||
|
|
||||||
mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemoveTaskList),
|
mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemoveTaskList),
|
||||||
(int32_t) taosArrayGetSize(execNodeList.pTaskList));
|
(int32_t) taosArrayGetSize(execInfo.pTaskList));
|
||||||
|
|
||||||
int32_t size = taosArrayGetSize(pNodeSnapshot);
|
int32_t size = taosArrayGetSize(pNodeSnapshot);
|
||||||
SArray* pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry));
|
SArray* pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry));
|
||||||
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
|
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) {
|
||||||
SNodeEntry* p = taosArrayGet(execNodeList.pNodeEntryList, i);
|
SNodeEntry* p = taosArrayGet(execInfo.pNodeEntryList, i);
|
||||||
|
|
||||||
for(int32_t j = 0; j < size; ++j) {
|
for(int32_t j = 0; j < size; ++j) {
|
||||||
SNodeEntry* pEntry = taosArrayGet(pNodeSnapshot, j);
|
SNodeEntry* pEntry = taosArrayGet(pNodeSnapshot, j);
|
||||||
|
@ -2269,8 +2273,8 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList);
|
execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList);
|
||||||
execNodeList.pNodeEntryList = pValidNodeEntryList;
|
execInfo.pNodeEntryList = pValidNodeEntryList;
|
||||||
|
|
||||||
taosArrayDestroy(pRemoveTaskList);
|
taosArrayDestroy(pRemoveTaskList);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -2289,26 +2293,26 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
int64_t ts = taosGetTimestampSec();
|
int64_t ts = taosGetTimestampSec();
|
||||||
|
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
if (execNodeList.pNodeEntryList == NULL || (taosArrayGetSize(execNodeList.pNodeEntryList) == 0)) {
|
if (execInfo.pNodeEntryList == NULL || (taosArrayGetSize(execInfo.pNodeEntryList) == 0)) {
|
||||||
if (execNodeList.pNodeEntryList != NULL) {
|
if (execInfo.pNodeEntryList != NULL) {
|
||||||
execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList);
|
execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList);
|
||||||
}
|
}
|
||||||
execNodeList.pNodeEntryList = extractNodeListFromStream(pMnode);
|
execInfo.pNodeEntryList = extractNodeListFromStream(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) {
|
if (taosArrayGetSize(execInfo.pNodeEntryList) == 0) {
|
||||||
mDebug("end to do stream task node change checking, no vgroup exists, do nothing");
|
mDebug("end to do stream task node change checking, no vgroup exists, do nothing");
|
||||||
execNodeList.ts = ts;
|
execInfo.ts = ts;
|
||||||
atomic_store_32(&mndNodeCheckSentinel, 0);
|
atomic_store_32(&mndNodeCheckSentinel, 0);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
|
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
|
||||||
|
|
||||||
taosThreadMutexLock(&execNodeList.lock);
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
removeExpirednodeEntryAndTask(pNodeSnapshot);
|
removeExpirednodeEntryAndTask(pNodeSnapshot);
|
||||||
|
|
||||||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
|
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot);
|
||||||
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
|
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
|
||||||
|
|
||||||
// kill current active checkpoint transaction, since the transaction is vnode wide.
|
// kill current active checkpoint transaction, since the transaction is vnode wide.
|
||||||
|
@ -2318,16 +2322,16 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
// keep the new vnode snapshot
|
// keep the new vnode snapshot
|
||||||
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
mDebug("create trans successfully, update cached node list");
|
mDebug("create trans successfully, update cached node list");
|
||||||
taosArrayDestroy(execNodeList.pNodeEntryList);
|
taosArrayDestroy(execInfo.pNodeEntryList);
|
||||||
execNodeList.pNodeEntryList = pNodeSnapshot;
|
execInfo.pNodeEntryList = pNodeSnapshot;
|
||||||
execNodeList.ts = ts;
|
execInfo.ts = ts;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
mDebug("no update found in nodeList");
|
mDebug("no update found in nodeList");
|
||||||
taosArrayDestroy(pNodeSnapshot);
|
taosArrayDestroy(pNodeSnapshot);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&execNodeList.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
taosArrayDestroy(changeInfo.pUpdateNodeList);
|
taosArrayDestroy(changeInfo.pUpdateNodeList);
|
||||||
taosHashCleanup(changeInfo.pDBMap);
|
taosHashCleanup(changeInfo.pDBMap);
|
||||||
|
|
||||||
|
@ -2359,7 +2363,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode) {
|
void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
|
||||||
int32_t level = taosArrayGetSize(pStream->tasks);
|
int32_t level = taosArrayGetSize(pStream->tasks);
|
||||||
|
|
||||||
for (int32_t i = 0; i < level; i++) {
|
for (int32_t i = 0; i < level; i++) {
|
||||||
|
@ -2384,7 +2388,7 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecNodeInfo * pExecNode) {
|
void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecInfo * pExecNode) {
|
||||||
int32_t level = taosArrayGetSize(pStream->tasks);
|
int32_t level = taosArrayGetSize(pStream->tasks);
|
||||||
for (int32_t i = 0; i < level; i++) {
|
for (int32_t i = 0; i < level; i++) {
|
||||||
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
|
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
|
||||||
|
@ -2577,24 +2581,24 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
|
|
||||||
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
|
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
|
||||||
|
|
||||||
taosThreadMutexLock(&execNodeList.lock);
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
int32_t numOfExisted = taosHashGetSize(execNodeList.pTaskMap);
|
int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap);
|
||||||
if (numOfExisted == 0) {
|
if (numOfExisted == 0) {
|
||||||
doExtractTasksFromStream(pMnode);
|
doExtractTasksFromStream(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < req.numOfTasks; ++i) {
|
for (int32_t i = 0; i < req.numOfTasks; ++i) {
|
||||||
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
|
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
|
||||||
STaskStatusEntry *pEntry = taosHashGet(execNodeList.pTaskMap, &p->id, sizeof(p->id));
|
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
|
||||||
if (pEntry == NULL) {
|
if (pEntry == NULL) {
|
||||||
mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId);
|
mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (p->stage != pEntry->stage && pEntry->stage != -1) {
|
if (p->stage != pEntry->stage && pEntry->stage != -1) {
|
||||||
int32_t numOfNodes = taosArrayGetSize(execNodeList.pNodeEntryList);
|
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
|
||||||
for(int32_t j = 0; j < numOfNodes; ++j) {
|
for(int32_t j = 0; j < numOfNodes; ++j) {
|
||||||
SNodeEntry* pNodeEntry = taosArrayGet(execNodeList.pNodeEntryList, j);
|
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j);
|
||||||
if (pNodeEntry->nodeId == pEntry->nodeId) {
|
if (pNodeEntry->nodeId == pEntry->nodeId) {
|
||||||
mInfo("vgId:%d stage updated, from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64,
|
mInfo("vgId:%d stage updated, from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64,
|
||||||
pEntry->nodeId, pEntry->stage, p->stage, pEntry->id.taskId);
|
pEntry->nodeId, pEntry->stage, p->stage, pEntry->id.taskId);
|
||||||
|
@ -2628,16 +2632,16 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
// current checkpoint is failed, rollback from the checkpoint trans
|
// current checkpoint is failed, rollback from the checkpoint trans
|
||||||
// kill the checkpoint trans and then set all tasks status to be normal
|
// kill the checkpoint trans and then set all tasks status to be normal
|
||||||
if (checkpointFailed && activeCheckpointId != 0) {
|
if (checkpointFailed && activeCheckpointId != 0) {
|
||||||
if (execNodeList.activeCheckpoint != activeCheckpointId) {
|
ASSERT(execInfo.activeCheckpoint == activeCheckpointId);
|
||||||
mInfo("checkpointId:%"PRId64" failed, issue task-reset trans to reset all tasks status", activeCheckpointId);
|
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", activeCheckpointId);
|
||||||
execNodeList.activeCheckpoint = activeCheckpointId;
|
// execInfo.activeCheckpoint = activeCheckpointId;
|
||||||
mndResetFromCheckpoint(pMnode);
|
mndResetFromCheckpoint(pMnode);
|
||||||
} else {
|
// } else {
|
||||||
mDebug("checkpoint:%"PRId64" reset has issued already, ignore it", activeCheckpointId);
|
// mDebug("checkpoint:%"PRId64" reset has issued already, ignore it", activeCheckpointId);
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&execNodeList.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
|
||||||
taosArrayDestroy(req.pTaskStatus);
|
taosArrayDestroy(req.pTaskStatus);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
Loading…
Reference in New Issue