fix(stream): fix syntax error.
This commit is contained in:
parent
a29903101a
commit
cbc0bceec6
|
@ -63,8 +63,8 @@ static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
|
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
|
||||||
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
|
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code);
|
||||||
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg* pReq);
|
static int32_t mndProcessDropOrphanTaskReq(SRpcMsg* pReq);
|
||||||
|
static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList, SVgroupChangeInfo* pInfo);
|
||||||
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
|
static void mndDestroyVgroupChangeInfo(SVgroupChangeInfo *pInfo);
|
||||||
|
|
||||||
static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
|
static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
|
||||||
static void removeExpiredNodeInfo(const SArray *pNodeSnapshot);
|
static void removeExpiredNodeInfo(const SArray *pNodeSnapshot);
|
||||||
|
@ -1142,6 +1142,9 @@ int32_t extractStreamNodeList(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool taskNodeIsUpdated(SMnode *pMnode) {
|
static bool taskNodeIsUpdated(SMnode *pMnode) {
|
||||||
|
bool allReady = true;
|
||||||
|
SArray *pNodeSnapshot = NULL;
|
||||||
|
|
||||||
// check if the node update happens or not
|
// check if the node update happens or not
|
||||||
streamMutexLock(&execInfo.lock);
|
streamMutexLock(&execInfo.lock);
|
||||||
|
|
||||||
|
@ -1166,13 +1169,11 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool allReady = true;
|
|
||||||
SArray *pNodeSnapshot = NULL;
|
|
||||||
|
|
||||||
int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
|
int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
|
||||||
if (code) {
|
if (code) {
|
||||||
mError("failed to get the vgroup snapshot, ignore it and continue");
|
mError("failed to get the vgroup snapshot, ignore it and continue");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!allReady) {
|
if (!allReady) {
|
||||||
mWarn("not all vnodes ready, quit from vnodes status check");
|
mWarn("not all vnodes ready, quit from vnodes status check");
|
||||||
taosArrayDestroy(pNodeSnapshot);
|
taosArrayDestroy(pNodeSnapshot);
|
||||||
|
@ -1180,12 +1181,16 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
|
SVgroupChangeInfo changeInfo = {0};
|
||||||
|
code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo);
|
||||||
|
if (code) {
|
||||||
|
streamMutexUnlock(&execInfo.lock);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
|
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
|
||||||
|
|
||||||
taosArrayDestroy(changeInfo.pUpdateNodeList);
|
mndDestroyVgroupChangeInfo(&changeInfo);
|
||||||
taosHashCleanup(changeInfo.pDBMap);
|
|
||||||
taosArrayDestroy(pNodeSnapshot);
|
taosArrayDestroy(pNodeSnapshot);
|
||||||
|
|
||||||
if (nodeUpdated) {
|
if (nodeUpdated) {
|
||||||
|
@ -1972,11 +1977,22 @@ static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent)
|
||||||
// tasks on the will be removed replica.
|
// tasks on the will be removed replica.
|
||||||
// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we
|
// 3. vgroup redistribution is an combination operation of first increase replica and then decrease replica. So we
|
||||||
// will handle it as mentioned in 1 & 2 items.
|
// will handle it as mentioned in 1 & 2 items.
|
||||||
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList) {
|
static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList,
|
||||||
SVgroupChangeInfo info = {
|
SVgroupChangeInfo *pInfo) {
|
||||||
.pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)),
|
int32_t code = 0;
|
||||||
.pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
|
int32_t lino = 0;
|
||||||
};
|
|
||||||
|
if (pInfo == NULL) {
|
||||||
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)),
|
||||||
|
pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
||||||
|
|
||||||
|
if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) {
|
||||||
|
mndDestroyVgroupChangeInfo(pInfo);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t numOfNodes = taosArrayGetSize(pPrevNodeList);
|
int32_t numOfNodes = taosArrayGetSize(pPrevNodeList);
|
||||||
for (int32_t i = 0; i < numOfNodes; ++i) {
|
for (int32_t i = 0; i < numOfNodes; ++i) {
|
||||||
|
@ -1997,7 +2013,11 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
|
||||||
const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
|
const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset);
|
||||||
|
|
||||||
char buf[256] = {0};
|
char buf[256] = {0};
|
||||||
(void) epsetToStr(&pCurrent->epset, buf, tListLen(buf)); // ignore this error
|
code = epsetToStr(&pCurrent->epset, buf, tListLen(buf)); // ignore this error
|
||||||
|
if (code) {
|
||||||
|
mError("failed to convert epset string, code:%s", tstrerror(code));
|
||||||
|
TSDB_CHECK_CODE(code, lino, _err);
|
||||||
|
}
|
||||||
|
|
||||||
mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
|
mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId,
|
||||||
pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
|
pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated);
|
||||||
|
@ -2006,20 +2026,16 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
|
||||||
epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
|
epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset);
|
||||||
epsetAssign(&updateInfo.newEp, &pCurrent->epset);
|
epsetAssign(&updateInfo.newEp, &pCurrent->epset);
|
||||||
|
|
||||||
void* p = taosArrayPush(info.pUpdateNodeList, &updateInfo);
|
void* p = taosArrayPush(pInfo->pUpdateNodeList, &updateInfo);
|
||||||
if (p == NULL) {
|
TSDB_CHECK_NULL(p, code, lino, _err, terrno);
|
||||||
mError("failed to put update entry into node list, code:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo handle the snode info
|
// todo handle the snode info
|
||||||
if (pCurrent->nodeId != SNODE_HANDLE) {
|
if (pCurrent->nodeId != SNODE_HANDLE) {
|
||||||
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId);
|
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId);
|
||||||
int32_t code = taosHashPut(info.pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
|
code = taosHashPut(pInfo->pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
|
||||||
mndReleaseVgroup(pMnode, pVgroup);
|
mndReleaseVgroup(pMnode, pVgroup);
|
||||||
if (code) {
|
TSDB_CHECK_CODE(code, lino, _err);
|
||||||
mError("failed to put into dbmap, code:out of memory");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
@ -2027,7 +2043,18 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return info;
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
mndDestroyVgroupChangeInfo(pInfo);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void mndDestroyVgroupChangeInfo(SVgroupChangeInfo* pInfo) {
|
||||||
|
if (pInfo != NULL) {
|
||||||
|
taosArrayDestroy(pInfo->pUpdateNodeList);
|
||||||
|
taosHashCleanup(pInfo->pDBMap);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) {
|
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) {
|
||||||
|
@ -2271,7 +2298,11 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
|
SVgroupChangeInfo changeInfo = {0};
|
||||||
|
code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo);
|
||||||
|
if (code) {
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) {
|
if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) {
|
||||||
|
@ -2305,8 +2336,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
mDebug("no update found in nodeList");
|
mDebug("no update found in nodeList");
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(changeInfo.pUpdateNodeList);
|
mndDestroyVgroupChangeInfo(&changeInfo);
|
||||||
taosHashCleanup(changeInfo.pDBMap);
|
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
streamMutexUnlock(&execInfo.lock);
|
streamMutexUnlock(&execInfo.lock);
|
||||||
|
|
|
@ -416,7 +416,7 @@ int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
|
||||||
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
|
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
|
||||||
int32_t code = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
|
int32_t code = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
|
||||||
if (code) {
|
if (code) {
|
||||||
stError("%s failed create stream task id str, code:%s", pTask->id.taskId, tstrerror(code));
|
stError("0x%x failed create stream task id str, code:%s", pTask->id.taskId, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue