Merge pull request #22095 from taosdata/fix/TS-3672
fix:add sdbFetchCancel to release hash node
This commit is contained in:
commit
0d922c0d66
|
@ -77,7 +77,6 @@ static SClusterObj *mndAcquireCluster(SMnode *pMnode, void **ppIter) {
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
*ppIter = pIter;
|
*ppIter = pIter;
|
||||||
|
|
||||||
return pCluster;
|
return pCluster;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1303,11 +1303,10 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
|
||||||
if (pDb && (vindex >= pDb->cfg.numOfVgroups)) {
|
if (pDb && (vindex >= pDb->cfg.numOfVgroups)) {
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbCancelFetch(pSdb, pIter);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUseDbReq *pReq) {
|
int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUseDbReq *pReq) {
|
||||||
|
|
|
@ -706,6 +706,7 @@ _OVER:
|
||||||
} else {
|
} else {
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
}
|
}
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
sdbFreeRaw(pRaw);
|
sdbFreeRaw(pRaw);
|
||||||
return terrno;
|
return terrno;
|
||||||
|
|
|
@ -831,6 +831,7 @@ int32_t mndGetIdxsByTagName(SMnode *pMnode, SStbObj *pStb, char *tagName, SIdxOb
|
||||||
if (pIdx->stbUid == pStb->uid && strcasecmp(pIdx->colName, tagName) == 0) {
|
if (pIdx->stbUid == pStb->uid && strcasecmp(pIdx->colName, tagName) == 0) {
|
||||||
memcpy((char *)idx, (char *)pIdx, sizeof(SIdxObj));
|
memcpy((char *)idx, (char *)pIdx, sizeof(SIdxObj));
|
||||||
sdbRelease(pSdb, pIdx);
|
sdbRelease(pSdb, pIdx);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -851,7 +852,7 @@ int32_t mndDropIdxsByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
|
||||||
if (pIdx->dbUid == pDb->uid) {
|
if (pIdx->dbUid == pDb->uid) {
|
||||||
if (mndSetDropIdxCommitLogs(pMnode, pTrans, pIdx) != 0) {
|
if (mndSetDropIdxCommitLogs(pMnode, pTrans, pIdx) != 0) {
|
||||||
sdbRelease(pSdb, pIdx);
|
sdbRelease(pSdb, pIdx);
|
||||||
sdbCancelFetch(pSdb, pIdx);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -454,6 +454,7 @@ int32_t mndCreateQnodeList(SMnode *pMnode, SArray **pList, int32_t limit) {
|
||||||
sdbRelease(pSdb, pObj);
|
sdbRelease(pSdb, pObj);
|
||||||
|
|
||||||
if (limit > 0 && numOfRows >= limit) {
|
if (limit > 0 && numOfRows >= limit) {
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -168,6 +168,7 @@ SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) {
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
// TODO random fetch
|
// TODO random fetch
|
||||||
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj);
|
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void**)&pObj);
|
||||||
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
return pObj;
|
return pObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -197,6 +198,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
|
||||||
sdbRelease(pMnode->pSdb, pVgroup);
|
sdbRelease(pMnode->pSdb, pVgroup);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
return pVgroup;
|
return pVgroup;
|
||||||
}
|
}
|
||||||
return pVgroup;
|
return pVgroup;
|
||||||
|
@ -435,6 +437,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
qDestroyQueryPlan(pPlan);
|
qDestroyQueryPlan(pPlan);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -444,6 +447,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
||||||
if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
|
if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
qDestroyQueryPlan(pPlan);
|
qDestroyQueryPlan(pPlan);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -453,6 +457,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = code;
|
terrno = code;
|
||||||
qDestroyQueryPlan(pPlan);
|
qDestroyQueryPlan(pPlan);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -492,6 +497,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qDestroyQueryPlan(pPlan);
|
qDestroyQueryPlan(pPlan);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -900,7 +900,6 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
|
||||||
|
|
||||||
SMsgHead *pHead = rpcMallocCont(contLen);
|
SMsgHead *pHead = rpcMallocCont(contLen);
|
||||||
if (pHead == NULL) {
|
if (pHead == NULL) {
|
||||||
sdbCancelFetch(pSdb, pVgroup);
|
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1240,6 +1239,7 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName,
|
||||||
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
|
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
|
||||||
mError("topic:%s, create ast error", pTopic->name);
|
mError("topic:%s, create ast error", pTopic->name);
|
||||||
sdbRelease(pSdb, pTopic);
|
sdbRelease(pSdb, pTopic);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1260,6 +1260,7 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName,
|
||||||
mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId);
|
mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId);
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
nodesDestroyList(pNodeList);
|
nodesDestroyList(pNodeList);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
sdbRelease(pSdb, pTopic);
|
sdbRelease(pSdb, pTopic);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1287,6 +1288,7 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName
|
||||||
terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION;
|
||||||
mError("stream:%s, create ast error", pStream->name);
|
mError("stream:%s, create ast error", pStream->name);
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1306,6 +1308,7 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
nodesDestroyList(pNodeList);
|
nodesDestroyList(pNodeList);
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
mInfo("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
|
mInfo("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
|
||||||
|
@ -1335,6 +1338,7 @@ static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName,
|
||||||
terrno = TSDB_CODE_SDB_INVALID_DATA_CONTENT;
|
terrno = TSDB_CODE_SDB_INVALID_DATA_CONTENT;
|
||||||
mError("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d failed since parse AST err",
|
mError("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d failed since parse AST err",
|
||||||
pSma->name, stbFullName, suid, colId);
|
pSma->name, stbFullName, suid, colId);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1355,6 +1359,7 @@ static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName,
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
nodesDestroyList(pNodeList);
|
nodesDestroyList(pNodeList);
|
||||||
sdbRelease(pSdb, pSma);
|
sdbRelease(pSdb, pSma);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
mInfo("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
|
mInfo("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
|
||||||
|
@ -2268,6 +2273,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
|
||||||
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
|
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
if (pTopic->stbUid == suid) {
|
if (pTopic->stbUid == suid) {
|
||||||
sdbRelease(pSdb, pTopic);
|
sdbRelease(pSdb, pTopic);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2282,6 +2288,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
|
||||||
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
|
||||||
mError("topic:%s, create ast error", pTopic->name);
|
mError("topic:%s, create ast error", pTopic->name);
|
||||||
sdbRelease(pSdb, pTopic);
|
sdbRelease(pSdb, pTopic);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2295,6 +2302,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
|
||||||
sdbRelease(pSdb, pTopic);
|
sdbRelease(pSdb, pTopic);
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
nodesDestroyList(pNodeList);
|
nodesDestroyList(pNodeList);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
goto NEXT;
|
goto NEXT;
|
||||||
|
@ -2322,6 +2330,7 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStream->targetStbUid == suid) {
|
if (pStream->targetStbUid == suid) {
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -2330,6 +2339,7 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
|
||||||
if (nodesStringToNode(pStream->ast, &pAst) != 0) {
|
if (nodesStringToNode(pStream->ast, &pAst) != 0) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_STREAM_OPTION;
|
||||||
mError("stream:%s, create ast error", pStream->name);
|
mError("stream:%s, create ast error", pStream->name);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -2341,6 +2351,7 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
|
||||||
SColumnNode *pCol = (SColumnNode *)pNode;
|
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||||
|
|
||||||
if (pCol->tableId == suid) {
|
if (pCol->tableId == suid) {
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
nodesDestroyList(pNodeList);
|
nodesDestroyList(pNodeList);
|
||||||
|
|
|
@ -705,12 +705,14 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
if (numOfStream > MND_STREAM_MAX_NUM) {
|
if (numOfStream > MND_STREAM_MAX_NUM) {
|
||||||
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
|
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
|
||||||
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
|
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
|
||||||
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStream->targetStbUid == streamObj.targetStbUid) {
|
if (pStream->targetStbUid == streamObj.targetStbUid) {
|
||||||
mError("Cannot write the same stable as other stream:%s", pStream->name);
|
mError("Cannot write the same stable as other stream:%s", pStream->name);
|
||||||
terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE;
|
terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE;
|
||||||
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1104,6 +1104,7 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
|
||||||
if (taosHashGetSize(pSub->consumerHash) != 0) {
|
if (taosHashGetSize(pSub->consumerHash) != 0) {
|
||||||
sdbRelease(pSdb, pSub);
|
sdbRelease(pSdb, pSub);
|
||||||
terrno = TSDB_CODE_MND_IN_REBALANCE;
|
terrno = TSDB_CODE_MND_IN_REBALANCE;
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
|
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
|
||||||
|
@ -1122,12 +1123,14 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
sdbRelease(pSdb, pSub);
|
sdbRelease(pSdb, pSub);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
|
if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
|
||||||
sdbRelease(pSdb, pSub);
|
sdbRelease(pSdb, pSub);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -513,6 +513,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
|
tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
|
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
|
||||||
|
@ -522,6 +523,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
if (tEncodeSTqCheckInfo(&encoder, &info) < 0) {
|
if (tEncodeSTqCheckInfo(&encoder, &info) < 0) {
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
@ -535,6 +537,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
buf = NULL;
|
buf = NULL;
|
||||||
|
@ -647,7 +650,6 @@ static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTo
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -698,6 +700,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
if (strcmp(name, pTopic->name) == 0) {
|
if (strcmp(name, pTopic->name) == 0) {
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||||
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
||||||
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
||||||
|
@ -711,6 +714,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
if (strcmp(name, pTopic->name) == 0) {
|
if (strcmp(name, pTopic->name) == 0) {
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||||
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
|
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
|
||||||
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
||||||
|
@ -724,6 +728,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
if (strcmp(name, pTopic->name) == 0) {
|
if (strcmp(name, pTopic->name) == 0) {
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||||
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)",
|
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)",
|
||||||
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
||||||
|
@ -735,6 +740,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) {
|
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db) != 0) {
|
||||||
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -788,6 +794,8 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -796,6 +804,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
|
int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
|
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
|
||||||
|
@ -999,6 +1008,7 @@ bool mndTopicExistsForDb(SMnode *pMnode, SDbObj *pDb) {
|
||||||
|
|
||||||
if (pTopic->dbUid == pDb->uid) {
|
if (pTopic->dbUid == pDb->uid) {
|
||||||
sdbRelease(pSdb, pTopic);
|
sdbRelease(pSdb, pTopic);
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1444,7 +1444,9 @@ int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) {
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
code = -1;
|
code = -1;
|
||||||
if (mndUserDupObj(pUser, &newUser) != 0) break;
|
if (mndUserDupObj(pUser, &newUser) != 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
bool inRead = (taosHashGet(newUser.readDbs, db, len) != NULL);
|
bool inRead = (taosHashGet(newUser.readDbs, db, len) != NULL);
|
||||||
bool inWrite = (taosHashGet(newUser.writeDbs, db, len) != NULL);
|
bool inWrite = (taosHashGet(newUser.writeDbs, db, len) != NULL);
|
||||||
|
@ -1453,7 +1455,9 @@ int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) {
|
||||||
(void)taosHashRemove(newUser.writeDbs, db, len);
|
(void)taosHashRemove(newUser.writeDbs, db, len);
|
||||||
|
|
||||||
SSdbRaw *pCommitRaw = mndUserActionEncode(&newUser);
|
SSdbRaw *pCommitRaw = mndUserActionEncode(&newUser);
|
||||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) break;
|
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1491,7 +1495,9 @@ int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic) {
|
||||||
if (inTopic) {
|
if (inTopic) {
|
||||||
(void)taosHashRemove(newUser.topics, topic, len);
|
(void)taosHashRemove(newUser.topics, topic, len);
|
||||||
SSdbRaw *pCommitRaw = mndUserActionEncode(&newUser);
|
SSdbRaw *pCommitRaw = mndUserActionEncode(&newUser);
|
||||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) break;
|
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2591,6 +2591,7 @@ static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
|
||||||
pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
|
pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
if (!mndIsDnodeOnline(pDnode, curMs)) {
|
if (!mndIsDnodeOnline(pDnode, curMs)) {
|
||||||
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
terrno = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
|
terrno = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
|
||||||
mError("failed to balance vgroup since %s, dnode:%d", terrstr(), pDnode->id);
|
mError("failed to balance vgroup since %s, dnode:%d", terrstr(), pDnode->id);
|
||||||
sdbRelease(pMnode->pSdb, pDnode);
|
sdbRelease(pMnode->pSdb, pDnode);
|
||||||
|
|
Loading…
Reference in New Issue