diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 053bb20e2a..6e001f303e 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -224,6 +224,7 @@ int32_t taosGetErrSize(); // #define TSDB_CODE_MND_FAILED_TO_INIT_STEP TAOS_DEF_ERROR_CODE(0, 0x0314) // 2.x #define TSDB_CODE_MND_USER_DISABLED TAOS_DEF_ERROR_CODE(0, 0x0315) #define TSDB_CODE_MND_INVALID_PLATFORM TAOS_DEF_ERROR_CODE(0, 0x0316) +#define TSDB_CODE_MND_RETURN_VALUE_NULL TAOS_DEF_ERROR_CODE(0, 0x0317) // mnode-sdb #define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0320) // internal diff --git a/source/dnode/mnode/impl/src/mndCompact.c b/source/dnode/mnode/impl/src/mndCompact.c index 308089a9c1..2b25f20fda 100644 --- a/source/dnode/mnode/impl/src/mndCompact.c +++ b/source/dnode/mnode/impl/src/mndCompact.c @@ -75,11 +75,11 @@ int32_t tDeserializeSCompactObj(void *buf, int32_t bufLen, SCompactObj *pObj) { SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); - if (tStartDecode(&decoder) < 0) return -1; + TAOS_CHECK_RETURN(tStartDecode(&decoder)); - if (tDecodeI32(&decoder, &pObj->compactId) < 0) return -1; - if (tDecodeCStrTo(&decoder, pObj->dbname) < 0) return -1; - if (tDecodeI64(&decoder, &pObj->startTime) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->compactId)); + TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, pObj->dbname)); + TAOS_CHECK_RETURN(tDecodeI64(&decoder, &pObj->startTime)); tEndDecode(&decoder); @@ -174,13 +174,10 @@ SSdbRow *mndCompactActionDecode(SSdbRaw *pRaw) { } SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER); - if (tDeserializeSCompactObj(buf, tlen, pCompact) < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + if ((terrno = tDeserializeSCompactObj(buf, tlen, pCompact)) < 0) { goto OVER; } - // taosInitRWLatch(&pView->lock); - OVER: taosMemoryFreeClear(buf); if (terrno != TSDB_CODE_SUCCESS) { @@ -227,6 +224,7 @@ void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact) { // compact db int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj *pCompact, SDbObj *pDb, SCompactDbRsp *rsp) { + int32_t code = 0; pCompact->compactId = tGenIdPI32(); strcpy(pCompact->dbname, pDb->name); @@ -234,10 +232,14 @@ int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj *pCompac pCompact->startTime = taosGetTimestampMs(); SSdbRaw *pVgRaw = mndCompactActionEncode(pCompact); - if (pVgRaw == NULL) return -1; - if (mndTransAppendPrepareLog(pTrans, pVgRaw) != 0) { + if (pVgRaw == NULL) { + code = TSDB_CODE_SDB_OBJ_NOT_THERE; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) { sdbFreeRaw(pVgRaw); - return -1; + TAOS_RETURN(code); } (void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY); @@ -335,44 +337,62 @@ static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pC static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t compactId, int32_t dnodeid) { + int32_t code = 0; STransAction action = {0}; SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid); - if (pDnode == NULL) return -1; + if (pDnode == NULL) { + code = TSDB_CODE_SDB_OBJ_NOT_THERE; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } action.epSet = mndGetDnodeEpset(pDnode); mndReleaseDnode(pMnode, pDnode); int32_t contLen = 0; void *pReq = mndBuildKillCompactReq(pMnode, pVgroup, &contLen, compactId, dnodeid); - if (pReq == NULL) return -1; + if (pReq == NULL) { + code = TSDB_CODE_SDB_OBJ_NOT_THERE; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } action.pCont = pReq; action.contLen = contLen; action.msgType = TDMT_VND_KILL_COMPACT; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { + if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) { taosMemoryFree(pReq); - return -1; + TAOS_RETURN(code); } return 0; } static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompact) { + int32_t code = 0; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-compact"); if (pTrans == NULL) { mError("compact:%" PRId32 ", failed to drop since %s", pCompact->compactId, terrstr()); - return -1; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); } mInfo("trans:%d, used to kill compact:%" PRId32, pTrans->id, pCompact->compactId); mndTransSetDbName(pTrans, pCompact->dbname, NULL); SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + if (pCommitRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + mndTransDrop(pTrans); + TAOS_RETURN(code); + } + if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) { mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); - return -1; + TAOS_RETURN(code); } (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); @@ -386,14 +406,20 @@ static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompa SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId); if (pVgroup == NULL) { mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr()); + sdbCancelFetch(pMnode->pSdb, pIter); + sdbRelease(pMnode->pSdb, pDetail); mndTransDrop(pTrans); - return -1; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); } - if (mndAddKillCompactAction(pMnode, pTrans, pVgroup, pCompact->compactId, pDetail->dnodeId) != 0) { + if ((code = mndAddKillCompactAction(pMnode, pTrans, pVgroup, pCompact->compactId, pDetail->dnodeId)) != 0) { mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr()); + sdbCancelFetch(pMnode->pSdb, pIter); + sdbRelease(pMnode->pSdb, pDetail); mndTransDrop(pTrans); - return -1; + TAOS_RETURN(code); } mndReleaseVgroup(pMnode, pVgroup); @@ -412,10 +438,10 @@ static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompa sdbRelease(pMnode->pSdb, pDetail); } - if (mndTransPrepare(pMnode, pTrans) != 0) { + if ((code = mndTransPrepare(pMnode, pTrans)) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); - return -1; + TAOS_RETURN(code); } mndTransDrop(pTrans); @@ -423,30 +449,27 @@ static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompa } int32_t mndProcessKillCompactReq(SRpcMsg *pReq) { + int32_t code = 0; + int32_t lino = 0; SKillCompactReq killCompactReq = {0}; - if (tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &killCompactReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; + + if ((code = tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &killCompactReq)) != 0) { + TAOS_RETURN(code); } mInfo("start to kill compact:%" PRId32, killCompactReq.compactId); SMnode *pMnode = pReq->info.node; - int32_t code = -1; SCompactObj *pCompact = mndAcquireCompact(pMnode, killCompactReq.compactId); if (pCompact == NULL) { - terrno = TSDB_CODE_MND_INVALID_COMPACT_ID; + code = TSDB_CODE_MND_INVALID_COMPACT_ID; tFreeSKillCompactReq(&killCompactReq); - return -1; + TAOS_RETURN(code); } - if (0 != mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB)) { - goto _OVER; - } + TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB), &lino, _OVER); - if (mndKillCompact(pMnode, pReq, pCompact) < 0) { - goto _OVER; - } + TAOS_CHECK_GOTO(mndKillCompact(pMnode, pReq, pCompact), &lino, _OVER); code = TSDB_CODE_ACTION_IN_PROGRESS; @@ -463,12 +486,14 @@ _OVER: tFreeSKillCompactReq(&killCompactReq); sdbRelease(pMnode->pSdb, pCompact); - return code; + TAOS_RETURN(code); } // update progress static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId, SQueryCompactProgressRsp *rsp) { + int32_t code = 0; + void *pIter = NULL; while (1) { SCompactDetailObj *pDetail = NULL; @@ -479,9 +504,10 @@ static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t c pDetail->newNumberFileset = rsp->numberFileset; pDetail->newFinished = rsp->finished; + sdbCancelFetch(pMnode->pSdb, pIter); sdbRelease(pMnode->pSdb, pDetail); - return 0; + TAOS_RETURN(code); } sdbRelease(pMnode->pSdb, pDetail); @@ -491,14 +517,13 @@ static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t c } int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq) { - SQueryCompactProgressRsp req = {0}; int32_t code = 0; + SQueryCompactProgressRsp req = {0}; code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req); if (code != 0) { - terrno = TSDB_CODE_INVALID_MSG; mError("failed to deserialize vnode-query-compact-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont, pReq->contLen); - return -1; + TAOS_RETURN(code); } mDebug("compact:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId, @@ -508,13 +533,12 @@ int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq) { code = mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req); if (code != 0) { - terrno = code; mError("compact:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId, req.vgId, req.dnodeId, req.numberFileset, req.finished); - return -1; + TAOS_RETURN(code); } - return 0; + TAOS_RETURN(code); } // timer @@ -531,7 +555,10 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) { SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId); if (pDnode == NULL) break; - addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port); + if (addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port) != 0) { + sdbRelease(pMnode->pSdb, pDetail); + continue; + } mndReleaseDnode(pMnode, pDnode); SQueryCompactProgressReq req; @@ -541,8 +568,6 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) { int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req); if (contLen < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - sdbCancelFetch(pMnode->pSdb, pDetail); sdbRelease(pMnode->pSdb, pDetail); continue; } @@ -551,7 +576,6 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) { SMsgHead *pHead = rpcMallocCont(contLen); if (pHead == NULL) { - sdbCancelFetch(pMnode->pSdb, pDetail); sdbRelease(pMnode->pSdb, pDetail); continue; } @@ -563,13 +587,6 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) { SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_COMPACT_PROGRESS, .contLen = contLen}; - // rpcMsg.pCont = rpcMallocCont(contLen); - // if (rpcMsg.pCont == NULL) { - // return; - // } - - // memcpy(rpcMsg.pCont, pHead, contLen); - rpcMsg.pCont = pHead; char detail[1024] = {0}; @@ -589,6 +606,7 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) { } static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) { + int32_t code = 0; bool needSave = false; void *pIter = NULL; while (1) { @@ -612,7 +630,7 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) { } SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId); - if (pCompact == NULL) return 0; + if (pCompact == NULL) TAOS_RETURN(code); SDbObj *pDb = mndAcquireDb(pMnode, pCompact->dbname); if (pDb == NULL) { @@ -625,13 +643,15 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) { if (!needSave) { mDebug("compact:%" PRId32 ", no need to save", compactId); - return 0; + TAOS_RETURN(code); } STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-compact-progress"); if (pTrans == NULL) { mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr()); - return -1; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); } mInfo("compact:%d, trans:%d, used to update compact progress.", compactId, pTrans->id); @@ -654,10 +674,20 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) { pDetail->finished = pDetail->newFinished; SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id, terrstr()); + if (pCommitRaw == NULL) { + sdbCancelFetch(pMnode->pSdb, pIter); + sdbRelease(pMnode->pSdb, pDetail); mndTransDrop(pTrans); - return -1; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) { + mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id, terrstr()); + sdbCancelFetch(pMnode->pSdb, pIter); + sdbRelease(pMnode->pSdb, pDetail); + mndTransDrop(pTrans); + TAOS_RETURN(code); } (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); } @@ -678,11 +708,13 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) { if (pDetail->numberFileset == -1 && pDetail->finished == -1) { allFinished = false; + sdbCancelFetch(pMnode->pSdb, pDetail); sdbRelease(pMnode->pSdb, pDetail); break; } if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) { allFinished = false; + sdbCancelFetch(pMnode->pSdb, pDetail); sdbRelease(pMnode->pSdb, pDetail); break; } @@ -710,11 +742,19 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) { if (pDetail->compactId == pCompact->compactId) { SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + if (pCommitRaw == NULL) { + mndTransDrop(pTrans); + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) { mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id, terrstr()); + sdbCancelFetch(pMnode->pSdb, pIter); + sdbRelease(pMnode->pSdb, pDetail); mndTransDrop(pTrans); - return -1; + TAOS_RETURN(code); } (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); mInfo("compact:%d, add drop compactdetail action", pDetail->compactDetailId); @@ -724,20 +764,26 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) { } SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + if (pCommitRaw == NULL) { + mndTransDrop(pTrans); + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) { mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr()); mndTransDrop(pTrans); - return -1; + TAOS_RETURN(code); } (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); mInfo("compact:%d, add drop compact action", pCompact->compactId); } - if (mndTransPrepare(pMnode, pTrans) != 0) { + if ((code = mndTransPrepare(pMnode, pTrans)) != 0) { mError("compact:%d, trans:%d, failed to prepare since %s", compactId, pTrans->id, terrstr()); mndTransDrop(pTrans); sdbRelease(pMnode->pSdb, pCompact); - return -1; + TAOS_RETURN(code); } sdbRelease(pMnode->pSdb, pCompact); @@ -746,6 +792,7 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) { } void mndCompactPullup(SMnode *pMnode) { + int32_t code = 0; SSdb *pSdb = pMnode->pSdb; SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_COMPACT), sizeof(int32_t)); if (pArray == NULL) return; @@ -766,7 +813,9 @@ void mndCompactPullup(SMnode *pMnode) { if (pCompact != NULL) { mInfo("compact:%d, begin to pull up", pCompact->compactId); mndCompactSendProgressReq(pMnode, pCompact); - mndSaveCompactProgress(pMnode, pCompact->compactId); + if ((code = mndSaveCompactProgress(pMnode, pCompact->compactId)) != 0) { + mError("compact:%d, failed to save compact progress since %s", pCompact->compactId, tstrerror(code)); + } } mndReleaseCompact(pMnode, pCompact); } diff --git a/source/dnode/mnode/impl/src/mndCompactDetail.c b/source/dnode/mnode/impl/src/mndCompactDetail.c index 6b1dd78093..7d73cf8dcd 100644 --- a/source/dnode/mnode/impl/src/mndCompactDetail.c +++ b/source/dnode/mnode/impl/src/mndCompactDetail.c @@ -125,17 +125,17 @@ int32_t tDeserializeSCompactDetailObj(void *buf, int32_t bufLen, SCompactDetailO SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); - if (tStartDecode(&decoder) < 0) return -1; - - if (tDecodeI32(&decoder, &pObj->compactDetailId) < 0) return -1; - if (tDecodeI32(&decoder, &pObj->compactId) < 0) return -1; - if (tDecodeI32(&decoder, &pObj->vgId) < 0) return -1; - if (tDecodeI32(&decoder, &pObj->dnodeId) < 0) return -1; - if (tDecodeI32(&decoder, &pObj->numberFileset) < 0) return -1; - if (tDecodeI32(&decoder, &pObj->finished) < 0) return -1; - if (tDecodeI64(&decoder, &pObj->startTime) < 0) return -1; - if (tDecodeI32(&decoder, &pObj->newNumberFileset) < 0) return -1; - if (tDecodeI32(&decoder, &pObj->newFinished) < 0) return -1; + TAOS_CHECK_RETURN(tStartDecode(&decoder)); + + TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->compactDetailId)); + TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->compactId)); + TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->vgId)); + TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->dnodeId)); + TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->numberFileset)); + TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->finished)); + TAOS_CHECK_RETURN(tDecodeI64(&decoder, &pObj->startTime)); + TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->newNumberFileset)); + TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->newFinished)); tEndDecode(&decoder); @@ -231,13 +231,10 @@ SSdbRow *mndCompactDetailActionDecode(SSdbRaw *pRaw) { } SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER); - if (tDeserializeSCompactDetailObj(buf, tlen, pCompact) < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + if ((terrno = tDeserializeSCompactDetailObj(buf, tlen, pCompact)) < 0) { goto OVER; } - //taosInitRWLatch(&pView->lock); - OVER: taosMemoryFreeClear(buf); if (terrno != TSDB_CODE_SUCCESS) { @@ -274,6 +271,7 @@ int32_t mndCompactDetailActionUpdate(SSdb *pSdb, SCompactDetailObj *pOldCompact, int32_t mndAddCompactDetailToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* pCompact, SVgObj *pVgroup, SVnodeGid *pVgid, int32_t index){ + int32_t code = 0; SCompactDetailObj compactDetail = {0}; compactDetail.compactDetailId = index; compactDetail.compactId = pCompact->compactId; @@ -292,9 +290,11 @@ int32_t mndAddCompactDetailToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* p if (pVgRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) { sdbFreeRaw(pVgRaw); - return -1; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); } (void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY); - return 0; + TAOS_RETURN(code); } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 9a7a8155ec..5511506021 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -95,26 +95,21 @@ static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const ch bool enableReplay) { SMqTopicObj *pTopic = NULL; int32_t code = 0; + int32_t lino = 0; int32_t numOfTopics = taosArrayGetSize(pTopicList); for (int32_t i = 0; i < numOfTopics; i++) { char *pOneTopic = taosArrayGetP(pTopicList, i); pTopic = mndAcquireTopic(pMnode, pOneTopic); if (pTopic == NULL) { // terrno has been set by callee function - code = -1; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; goto FAILED; } - if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) { - code = TSDB_CODE_MND_NO_RIGHTS; - terrno = TSDB_CODE_MND_NO_RIGHTS; - goto FAILED; - } + TAOS_CHECK_GOTO(mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic), &lino, FAILED); - if ((terrno = grantCheckExpire(TSDB_GRANT_SUBSCRIPTION)) < 0) { - code = terrno; - goto FAILED; - } + TAOS_CHECK_GOTO(grantCheckExpire(TSDB_GRANT_SUBSCRIPTION), &lino, FAILED); if (enableReplay) { if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) { @@ -123,7 +118,8 @@ static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const ch } else if (pTopic->ntbUid == 0 && pTopic->ctbStbUid == 0) { SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db); if (pDb == NULL) { - code = -1; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; goto FAILED; } if (pDb->cfg.numOfVgroups != 1) { @@ -146,6 +142,7 @@ FAILED: static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { int32_t code = 0; + int32_t lino = 0; SMnode *pMnode = pMsg->info.node; SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont; SMqConsumerObj *pConsumerNew = NULL; @@ -153,7 +150,8 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId); if (pConsumer == NULL) { mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId); - code = -1; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; goto END; } @@ -161,31 +159,27 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { pConsumer->status, mndConsumerStatusName(pConsumer->status)); if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { - terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; - code = -1; + code = TSDB_CODE_MND_CONSUMER_NOT_READY; goto END; } pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_UPDATE_REC, NULL, NULL); if (pConsumerNew == NULL){ - code = -1; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; goto END; } pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm"); if (pTrans == NULL) { - code = -1; - goto END; - } - code = validateTopics(pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false); - if (code != 0) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; goto END; } - code = mndSetConsumerCommitLogs(pTrans, pConsumerNew); - if (code != 0) { - goto END; - } + TAOS_CHECK_GOTO(validateTopics(pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false), &lino, END); + + TAOS_CHECK_GOTO(mndSetConsumerCommitLogs(pTrans, pConsumerNew), &lino, END); code = mndTransPrepare(pMnode, pTrans); END: @@ -197,6 +191,7 @@ END: static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { int32_t code = 0; + int32_t lino = 0; SMnode *pMnode = pMsg->info.node; SMqConsumerClearMsg *pClearMsg = pMsg->pCont; SMqConsumerObj *pConsumerNew = NULL; @@ -205,7 +200,7 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId); if (pConsumer == NULL) { mError("consumer:0x%" PRIx64 " failed to be found to clear it", pClearMsg->consumerId); - return 0; + TAOS_RETURN(code); } mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId, @@ -213,21 +208,20 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL); if (pConsumerNew == NULL){ - code = -1; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; goto END; } pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm"); if (pTrans == NULL) { - code = -1; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; goto END; } // this is the drop action, not the update action - code = mndSetConsumerDropLogs(pTrans, pConsumerNew); - if (code != 0) { - goto END; - } + TAOS_CHECK_GOTO(mndSetConsumerDropLogs(pTrans, pConsumerNew), &lino, END); code = mndTransPrepare(pMnode, pTrans); @@ -310,10 +304,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { SMqHbRsp rsp = {0}; SMqConsumerObj *pConsumer = NULL; - if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) { - code = TSDB_CODE_TMQ_INVALID_MSG; - goto end; - } + TAOS_CHECK_GOTO(tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req), NULL, end); int64_t consumerId = req.consumerId; pConsumer = mndAcquireConsumer(pMnode, consumerId); @@ -322,10 +313,8 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { code = TSDB_CODE_MND_CONSUMER_NOT_EXIST; goto end; } - code = checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user); - if (code != 0) { - goto end; - } + + TAOS_CHECK_GOTO(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user), NULL, end); atomic_store_32(&pConsumer->hbStatus, 0); @@ -343,7 +332,7 @@ end: tDestroySMqHbRsp(&rsp); mndReleaseConsumer(pMnode, pConsumer); tDestroySMqHbReq(&req); - return code; + TAOS_RETURN(code); } static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t epoch, SMqAskEpRsp *rsp){ @@ -447,7 +436,7 @@ static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoc pHead->walever = 0; void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - tEncodeSMqAskEpRsp(&abuf, rsp); + TAOS_CHECK_RETURN(tEncodeSMqAskEpRsp(&abuf, rsp)); // send rsp pMsg->info.rsp = buf; @@ -461,9 +450,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { SMqAskEpRsp rsp = {0}; int32_t code = 0; - if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) { - return TSDB_CODE_OUT_OF_MEMORY; - } + TAOS_CHECK_RETURN(tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req)); int64_t consumerId = req.consumerId; @@ -517,19 +504,29 @@ END: } int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer) { + int32_t code = 0; SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer); - if (pCommitRaw == NULL) return -1; - if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; - if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; - return 0; + if (pCommitRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw)); + TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)); + TAOS_RETURN(code); } int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer) { + int32_t code = 0; SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer); - if (pCommitRaw == NULL) return -1; - if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; - if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1; - return 0; + if (pCommitRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw)); + TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)); + TAOS_RETURN(code); } static void freeItem(void *param) { @@ -592,8 +589,7 @@ static int32_t checkAndSortTopic(SMnode *pMnode, SArray *pTopicList){ for (int i = 0; i < newTopicNum; i++) { int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char *)taosArrayGetP(pTopicList, i)); if (gNum >= MND_MAX_GROUP_PER_TOPIC) { - terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE; - return -1; + return TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE; } } return TSDB_CODE_SUCCESS; @@ -672,7 +668,8 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); if (pTrans == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; goto _over; } @@ -683,14 +680,14 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { pConsumerNew = buildSubConsumer(pMnode, &subscribe); if(pConsumerNew == NULL){ - code = -1; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; goto _over; } - code = mndSetConsumerCommitLogs(pTrans, pConsumerNew); - if (code != 0) goto _over; - code = mndTransPrepare(pMnode, pTrans); - if (code != 0) goto _over; + TAOS_CHECK_GOTO(mndSetConsumerCommitLogs(pTrans, pConsumerNew), NULL, _over); + + TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _over); code = TSDB_CODE_ACTION_IN_PROGRESS; _over: diff --git a/source/util/src/terror.c b/source/util/src/terror.c index ab50b83937..4543e746a1 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -168,6 +168,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_QUERY_ID, "Invalid query id") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONN_ID, "Invalid connection id") TAOS_DEFINE_ERROR(TSDB_CODE_MND_USER_DISABLED, "User is disabled") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_PLATFORM, "Unsupported feature on this platform") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_RETURN_VALUE_NULL, "Return value is null") // mnode-sdb TAOS_DEFINE_ERROR(TSDB_CODE_SDB_OBJ_ALREADY_THERE, "Object already there")