fix/TD-30989

This commit is contained in:
dmchen 2024-07-17 11:41:15 +00:00
parent a3c0acb78a
commit f727668ebb
5 changed files with 192 additions and 144 deletions

View File

@ -224,6 +224,7 @@ int32_t taosGetErrSize();
// #define TSDB_CODE_MND_FAILED_TO_INIT_STEP TAOS_DEF_ERROR_CODE(0, 0x0314) // 2.x // #define TSDB_CODE_MND_FAILED_TO_INIT_STEP TAOS_DEF_ERROR_CODE(0, 0x0314) // 2.x
#define TSDB_CODE_MND_USER_DISABLED TAOS_DEF_ERROR_CODE(0, 0x0315) #define TSDB_CODE_MND_USER_DISABLED TAOS_DEF_ERROR_CODE(0, 0x0315)
#define TSDB_CODE_MND_INVALID_PLATFORM TAOS_DEF_ERROR_CODE(0, 0x0316) #define TSDB_CODE_MND_INVALID_PLATFORM TAOS_DEF_ERROR_CODE(0, 0x0316)
#define TSDB_CODE_MND_RETURN_VALUE_NULL TAOS_DEF_ERROR_CODE(0, 0x0317)
// mnode-sdb // mnode-sdb
#define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0320) // internal #define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0320) // internal

View File

@ -75,11 +75,11 @@ int32_t tDeserializeSCompactObj(void *buf, int32_t bufLen, SCompactObj *pObj) {
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; TAOS_CHECK_RETURN(tStartDecode(&decoder));
if (tDecodeI32(&decoder, &pObj->compactId) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->compactId));
if (tDecodeCStrTo(&decoder, pObj->dbname) < 0) return -1; TAOS_CHECK_RETURN(tDecodeCStrTo(&decoder, pObj->dbname));
if (tDecodeI64(&decoder, &pObj->startTime) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI64(&decoder, &pObj->startTime));
tEndDecode(&decoder); tEndDecode(&decoder);
@ -174,13 +174,10 @@ SSdbRow *mndCompactActionDecode(SSdbRaw *pRaw) {
} }
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER); SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
if (tDeserializeSCompactObj(buf, tlen, pCompact) < 0) { if ((terrno = tDeserializeSCompactObj(buf, tlen, pCompact)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto OVER; goto OVER;
} }
// taosInitRWLatch(&pView->lock);
OVER: OVER:
taosMemoryFreeClear(buf); taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) { if (terrno != TSDB_CODE_SUCCESS) {
@ -227,6 +224,7 @@ void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact) {
// compact db // compact db
int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj *pCompact, SDbObj *pDb, SCompactDbRsp *rsp) { int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj *pCompact, SDbObj *pDb, SCompactDbRsp *rsp) {
int32_t code = 0;
pCompact->compactId = tGenIdPI32(); pCompact->compactId = tGenIdPI32();
strcpy(pCompact->dbname, pDb->name); strcpy(pCompact->dbname, pDb->name);
@ -234,10 +232,14 @@ int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj *pCompac
pCompact->startTime = taosGetTimestampMs(); pCompact->startTime = taosGetTimestampMs();
SSdbRaw *pVgRaw = mndCompactActionEncode(pCompact); SSdbRaw *pVgRaw = mndCompactActionEncode(pCompact);
if (pVgRaw == NULL) return -1; if (pVgRaw == NULL) {
if (mndTransAppendPrepareLog(pTrans, pVgRaw) != 0) { code = TSDB_CODE_SDB_OBJ_NOT_THERE;
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
}
if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
sdbFreeRaw(pVgRaw); sdbFreeRaw(pVgRaw);
return -1; TAOS_RETURN(code);
} }
(void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY); (void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
@ -335,44 +337,62 @@ static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pC
static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t compactId, static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t compactId,
int32_t dnodeid) { int32_t dnodeid) {
int32_t code = 0;
STransAction action = {0}; STransAction action = {0};
SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid); SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid);
if (pDnode == NULL) return -1; if (pDnode == NULL) {
code = TSDB_CODE_SDB_OBJ_NOT_THERE;
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
}
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildKillCompactReq(pMnode, pVgroup, &contLen, compactId, dnodeid); void *pReq = mndBuildKillCompactReq(pMnode, pVgroup, &contLen, compactId, dnodeid);
if (pReq == NULL) return -1; if (pReq == NULL) {
code = TSDB_CODE_SDB_OBJ_NOT_THERE;
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
}
action.pCont = pReq; action.pCont = pReq;
action.contLen = contLen; action.contLen = contLen;
action.msgType = TDMT_VND_KILL_COMPACT; action.msgType = TDMT_VND_KILL_COMPACT;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; TAOS_RETURN(code);
} }
return 0; return 0;
} }
static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompact) { static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompact) {
int32_t code = 0;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-compact"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-compact");
if (pTrans == NULL) { if (pTrans == NULL) {
mError("compact:%" PRId32 ", failed to drop since %s", pCompact->compactId, terrstr()); mError("compact:%" PRId32 ", failed to drop since %s", pCompact->compactId, terrstr());
return -1; code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
} }
mInfo("trans:%d, used to kill compact:%" PRId32, pTrans->id, pCompact->compactId); mInfo("trans:%d, used to kill compact:%" PRId32, pTrans->id, pCompact->compactId);
mndTransSetDbName(pTrans, pCompact->dbname, NULL); mndTransSetDbName(pTrans, pCompact->dbname, NULL);
SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact); SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { if (pCommitRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
mndTransDrop(pTrans);
TAOS_RETURN(code);
}
if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; TAOS_RETURN(code);
} }
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
@ -386,14 +406,20 @@ static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompa
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId); SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr()); mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
sdbCancelFetch(pMnode->pSdb, pIter);
sdbRelease(pMnode->pSdb, pDetail);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
} }
if (mndAddKillCompactAction(pMnode, pTrans, pVgroup, pCompact->compactId, pDetail->dnodeId) != 0) { if ((code = mndAddKillCompactAction(pMnode, pTrans, pVgroup, pCompact->compactId, pDetail->dnodeId)) != 0) {
mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr()); mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
sdbCancelFetch(pMnode->pSdb, pIter);
sdbRelease(pMnode->pSdb, pDetail);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; TAOS_RETURN(code);
} }
mndReleaseVgroup(pMnode, pVgroup); mndReleaseVgroup(pMnode, pVgroup);
@ -412,10 +438,10 @@ static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompa
sdbRelease(pMnode->pSdb, pDetail); sdbRelease(pMnode->pSdb, pDetail);
} }
if (mndTransPrepare(pMnode, pTrans) != 0) { if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; TAOS_RETURN(code);
} }
mndTransDrop(pTrans); mndTransDrop(pTrans);
@ -423,30 +449,27 @@ static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompa
} }
int32_t mndProcessKillCompactReq(SRpcMsg *pReq) { int32_t mndProcessKillCompactReq(SRpcMsg *pReq) {
int32_t code = 0;
int32_t lino = 0;
SKillCompactReq killCompactReq = {0}; SKillCompactReq killCompactReq = {0};
if (tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &killCompactReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; if ((code = tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &killCompactReq)) != 0) {
return -1; TAOS_RETURN(code);
} }
mInfo("start to kill compact:%" PRId32, killCompactReq.compactId); mInfo("start to kill compact:%" PRId32, killCompactReq.compactId);
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
int32_t code = -1;
SCompactObj *pCompact = mndAcquireCompact(pMnode, killCompactReq.compactId); SCompactObj *pCompact = mndAcquireCompact(pMnode, killCompactReq.compactId);
if (pCompact == NULL) { if (pCompact == NULL) {
terrno = TSDB_CODE_MND_INVALID_COMPACT_ID; code = TSDB_CODE_MND_INVALID_COMPACT_ID;
tFreeSKillCompactReq(&killCompactReq); tFreeSKillCompactReq(&killCompactReq);
return -1; TAOS_RETURN(code);
} }
if (0 != mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB)) { TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB), &lino, _OVER);
goto _OVER;
}
if (mndKillCompact(pMnode, pReq, pCompact) < 0) { TAOS_CHECK_GOTO(mndKillCompact(pMnode, pReq, pCompact), &lino, _OVER);
goto _OVER;
}
code = TSDB_CODE_ACTION_IN_PROGRESS; code = TSDB_CODE_ACTION_IN_PROGRESS;
@ -463,12 +486,14 @@ _OVER:
tFreeSKillCompactReq(&killCompactReq); tFreeSKillCompactReq(&killCompactReq);
sdbRelease(pMnode->pSdb, pCompact); sdbRelease(pMnode->pSdb, pCompact);
return code; TAOS_RETURN(code);
} }
// update progress // update progress
static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId, static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId,
SQueryCompactProgressRsp *rsp) { SQueryCompactProgressRsp *rsp) {
int32_t code = 0;
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
SCompactDetailObj *pDetail = NULL; SCompactDetailObj *pDetail = NULL;
@ -479,9 +504,10 @@ static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t c
pDetail->newNumberFileset = rsp->numberFileset; pDetail->newNumberFileset = rsp->numberFileset;
pDetail->newFinished = rsp->finished; pDetail->newFinished = rsp->finished;
sdbCancelFetch(pMnode->pSdb, pIter);
sdbRelease(pMnode->pSdb, pDetail); sdbRelease(pMnode->pSdb, pDetail);
return 0; TAOS_RETURN(code);
} }
sdbRelease(pMnode->pSdb, pDetail); sdbRelease(pMnode->pSdb, pDetail);
@ -491,14 +517,13 @@ static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t c
} }
int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq) { int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq) {
SQueryCompactProgressRsp req = {0};
int32_t code = 0; int32_t code = 0;
SQueryCompactProgressRsp req = {0};
code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req); code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req);
if (code != 0) { if (code != 0) {
terrno = TSDB_CODE_INVALID_MSG;
mError("failed to deserialize vnode-query-compact-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont, mError("failed to deserialize vnode-query-compact-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
pReq->contLen); pReq->contLen);
return -1; TAOS_RETURN(code);
} }
mDebug("compact:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId, mDebug("compact:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
@ -508,13 +533,12 @@ int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq) {
code = mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req); code = mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req);
if (code != 0) { if (code != 0) {
terrno = code;
mError("compact:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId, mError("compact:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
req.vgId, req.dnodeId, req.numberFileset, req.finished); req.vgId, req.dnodeId, req.numberFileset, req.finished);
return -1; TAOS_RETURN(code);
} }
return 0; TAOS_RETURN(code);
} }
// timer // timer
@ -531,7 +555,10 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) {
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId); SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId);
if (pDnode == NULL) break; if (pDnode == NULL) break;
addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port); if (addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port) != 0) {
sdbRelease(pMnode->pSdb, pDetail);
continue;
}
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
SQueryCompactProgressReq req; SQueryCompactProgressReq req;
@ -541,8 +568,6 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) {
int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req); int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req);
if (contLen < 0) { if (contLen < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbCancelFetch(pMnode->pSdb, pDetail);
sdbRelease(pMnode->pSdb, pDetail); sdbRelease(pMnode->pSdb, pDetail);
continue; continue;
} }
@ -551,7 +576,6 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) {
SMsgHead *pHead = rpcMallocCont(contLen); SMsgHead *pHead = rpcMallocCont(contLen);
if (pHead == NULL) { if (pHead == NULL) {
sdbCancelFetch(pMnode->pSdb, pDetail);
sdbRelease(pMnode->pSdb, pDetail); sdbRelease(pMnode->pSdb, pDetail);
continue; continue;
} }
@ -563,13 +587,6 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) {
SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_COMPACT_PROGRESS, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_COMPACT_PROGRESS, .contLen = contLen};
// rpcMsg.pCont = rpcMallocCont(contLen);
// if (rpcMsg.pCont == NULL) {
// return;
// }
// memcpy(rpcMsg.pCont, pHead, contLen);
rpcMsg.pCont = pHead; rpcMsg.pCont = pHead;
char detail[1024] = {0}; char detail[1024] = {0};
@ -589,6 +606,7 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) {
} }
static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) { static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
int32_t code = 0;
bool needSave = false; bool needSave = false;
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
@ -612,7 +630,7 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
} }
SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId); SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
if (pCompact == NULL) return 0; if (pCompact == NULL) TAOS_RETURN(code);
SDbObj *pDb = mndAcquireDb(pMnode, pCompact->dbname); SDbObj *pDb = mndAcquireDb(pMnode, pCompact->dbname);
if (pDb == NULL) { if (pDb == NULL) {
@ -625,13 +643,15 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
if (!needSave) { if (!needSave) {
mDebug("compact:%" PRId32 ", no need to save", compactId); mDebug("compact:%" PRId32 ", no need to save", compactId);
return 0; TAOS_RETURN(code);
} }
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-compact-progress"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-compact-progress");
if (pTrans == NULL) { if (pTrans == NULL) {
mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr()); mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr());
return -1; code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
} }
mInfo("compact:%d, trans:%d, used to update compact progress.", compactId, pTrans->id); mInfo("compact:%d, trans:%d, used to update compact progress.", compactId, pTrans->id);
@ -654,10 +674,20 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
pDetail->finished = pDetail->newFinished; pDetail->finished = pDetail->newFinished;
SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail); SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { if (pCommitRaw == NULL) {
mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id, terrstr()); sdbCancelFetch(pMnode->pSdb, pIter);
sdbRelease(pMnode->pSdb, pDetail);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
}
if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id, terrstr());
sdbCancelFetch(pMnode->pSdb, pIter);
sdbRelease(pMnode->pSdb, pDetail);
mndTransDrop(pTrans);
TAOS_RETURN(code);
} }
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
} }
@ -678,11 +708,13 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
if (pDetail->numberFileset == -1 && pDetail->finished == -1) { if (pDetail->numberFileset == -1 && pDetail->finished == -1) {
allFinished = false; allFinished = false;
sdbCancelFetch(pMnode->pSdb, pDetail);
sdbRelease(pMnode->pSdb, pDetail); sdbRelease(pMnode->pSdb, pDetail);
break; break;
} }
if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) { if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
allFinished = false; allFinished = false;
sdbCancelFetch(pMnode->pSdb, pDetail);
sdbRelease(pMnode->pSdb, pDetail); sdbRelease(pMnode->pSdb, pDetail);
break; break;
} }
@ -710,11 +742,19 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
if (pDetail->compactId == pCompact->compactId) { if (pDetail->compactId == pCompact->compactId) {
SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail); SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { if (pCommitRaw == NULL) {
mndTransDrop(pTrans);
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
}
if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id, mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id,
terrstr()); terrstr());
sdbCancelFetch(pMnode->pSdb, pIter);
sdbRelease(pMnode->pSdb, pDetail);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; TAOS_RETURN(code);
} }
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
mInfo("compact:%d, add drop compactdetail action", pDetail->compactDetailId); mInfo("compact:%d, add drop compactdetail action", pDetail->compactDetailId);
@ -724,20 +764,26 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
} }
SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact); SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { if (pCommitRaw == NULL) {
mndTransDrop(pTrans);
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
}
if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr()); mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; TAOS_RETURN(code);
} }
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
mInfo("compact:%d, add drop compact action", pCompact->compactId); mInfo("compact:%d, add drop compact action", pCompact->compactId);
} }
if (mndTransPrepare(pMnode, pTrans) != 0) { if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
mError("compact:%d, trans:%d, failed to prepare since %s", compactId, pTrans->id, terrstr()); mError("compact:%d, trans:%d, failed to prepare since %s", compactId, pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
sdbRelease(pMnode->pSdb, pCompact); sdbRelease(pMnode->pSdb, pCompact);
return -1; TAOS_RETURN(code);
} }
sdbRelease(pMnode->pSdb, pCompact); sdbRelease(pMnode->pSdb, pCompact);
@ -746,6 +792,7 @@ static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
} }
void mndCompactPullup(SMnode *pMnode) { void mndCompactPullup(SMnode *pMnode) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_COMPACT), sizeof(int32_t)); SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_COMPACT), sizeof(int32_t));
if (pArray == NULL) return; if (pArray == NULL) return;
@ -766,7 +813,9 @@ void mndCompactPullup(SMnode *pMnode) {
if (pCompact != NULL) { if (pCompact != NULL) {
mInfo("compact:%d, begin to pull up", pCompact->compactId); mInfo("compact:%d, begin to pull up", pCompact->compactId);
mndCompactSendProgressReq(pMnode, pCompact); mndCompactSendProgressReq(pMnode, pCompact);
mndSaveCompactProgress(pMnode, pCompact->compactId); if ((code = mndSaveCompactProgress(pMnode, pCompact->compactId)) != 0) {
mError("compact:%d, failed to save compact progress since %s", pCompact->compactId, tstrerror(code));
}
} }
mndReleaseCompact(pMnode, pCompact); mndReleaseCompact(pMnode, pCompact);
} }

View File

@ -125,17 +125,17 @@ int32_t tDeserializeSCompactDetailObj(void *buf, int32_t bufLen, SCompactDetailO
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; TAOS_CHECK_RETURN(tStartDecode(&decoder));
if (tDecodeI32(&decoder, &pObj->compactDetailId) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->compactDetailId));
if (tDecodeI32(&decoder, &pObj->compactId) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->compactId));
if (tDecodeI32(&decoder, &pObj->vgId) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->vgId));
if (tDecodeI32(&decoder, &pObj->dnodeId) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->dnodeId));
if (tDecodeI32(&decoder, &pObj->numberFileset) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->numberFileset));
if (tDecodeI32(&decoder, &pObj->finished) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->finished));
if (tDecodeI64(&decoder, &pObj->startTime) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI64(&decoder, &pObj->startTime));
if (tDecodeI32(&decoder, &pObj->newNumberFileset) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->newNumberFileset));
if (tDecodeI32(&decoder, &pObj->newFinished) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(&decoder, &pObj->newFinished));
tEndDecode(&decoder); tEndDecode(&decoder);
@ -231,13 +231,10 @@ SSdbRow *mndCompactDetailActionDecode(SSdbRaw *pRaw) {
} }
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER); SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
if (tDeserializeSCompactDetailObj(buf, tlen, pCompact) < 0) { if ((terrno = tDeserializeSCompactDetailObj(buf, tlen, pCompact)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto OVER; goto OVER;
} }
//taosInitRWLatch(&pView->lock);
OVER: OVER:
taosMemoryFreeClear(buf); taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) { if (terrno != TSDB_CODE_SUCCESS) {
@ -274,6 +271,7 @@ int32_t mndCompactDetailActionUpdate(SSdb *pSdb, SCompactDetailObj *pOldCompact,
int32_t mndAddCompactDetailToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* pCompact, SVgObj *pVgroup, int32_t mndAddCompactDetailToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* pCompact, SVgObj *pVgroup,
SVnodeGid *pVgid, int32_t index){ SVnodeGid *pVgid, int32_t index){
int32_t code = 0;
SCompactDetailObj compactDetail = {0}; SCompactDetailObj compactDetail = {0};
compactDetail.compactDetailId = index; compactDetail.compactDetailId = index;
compactDetail.compactId = pCompact->compactId; compactDetail.compactId = pCompact->compactId;
@ -292,9 +290,11 @@ int32_t mndAddCompactDetailToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* p
if (pVgRaw == NULL) return -1; if (pVgRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) { if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) {
sdbFreeRaw(pVgRaw); sdbFreeRaw(pVgRaw);
return -1; code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
TAOS_RETURN(code);
} }
(void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY); (void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
return 0; TAOS_RETURN(code);
} }

View File

@ -95,26 +95,21 @@ static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const ch
bool enableReplay) { bool enableReplay) {
SMqTopicObj *pTopic = NULL; SMqTopicObj *pTopic = NULL;
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
int32_t numOfTopics = taosArrayGetSize(pTopicList); int32_t numOfTopics = taosArrayGetSize(pTopicList);
for (int32_t i = 0; i < numOfTopics; i++) { for (int32_t i = 0; i < numOfTopics; i++) {
char *pOneTopic = taosArrayGetP(pTopicList, i); char *pOneTopic = taosArrayGetP(pTopicList, i);
pTopic = mndAcquireTopic(pMnode, pOneTopic); pTopic = mndAcquireTopic(pMnode, pOneTopic);
if (pTopic == NULL) { // terrno has been set by callee function if (pTopic == NULL) { // terrno has been set by callee function
code = -1; code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto FAILED; goto FAILED;
} }
if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) { TAOS_CHECK_GOTO(mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic), &lino, FAILED);
code = TSDB_CODE_MND_NO_RIGHTS;
terrno = TSDB_CODE_MND_NO_RIGHTS;
goto FAILED;
}
if ((terrno = grantCheckExpire(TSDB_GRANT_SUBSCRIPTION)) < 0) { TAOS_CHECK_GOTO(grantCheckExpire(TSDB_GRANT_SUBSCRIPTION), &lino, FAILED);
code = terrno;
goto FAILED;
}
if (enableReplay) { if (enableReplay) {
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) { if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
@ -123,7 +118,8 @@ static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const ch
} else if (pTopic->ntbUid == 0 && pTopic->ctbStbUid == 0) { } else if (pTopic->ntbUid == 0 && pTopic->ctbStbUid == 0) {
SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db); SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db);
if (pDb == NULL) { if (pDb == NULL) {
code = -1; code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto FAILED; goto FAILED;
} }
if (pDb->cfg.numOfVgroups != 1) { if (pDb->cfg.numOfVgroups != 1) {
@ -146,6 +142,7 @@ FAILED:
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont; SMqConsumerRecoverMsg *pRecoverMsg = pMsg->pCont;
SMqConsumerObj *pConsumerNew = NULL; SMqConsumerObj *pConsumerNew = NULL;
@ -153,7 +150,8 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pRecoverMsg->consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId); mError("cannot find consumer %" PRId64 " when processing consumer recover msg", pRecoverMsg->consumerId);
code = -1; code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto END; goto END;
} }
@ -161,31 +159,27 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
pConsumer->status, mndConsumerStatusName(pConsumer->status)); pConsumer->status, mndConsumerStatusName(pConsumer->status));
if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) {
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; code = TSDB_CODE_MND_CONSUMER_NOT_READY;
code = -1;
goto END; goto END;
} }
pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_UPDATE_REC, NULL, NULL); pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_UPDATE_REC, NULL, NULL);
if (pConsumerNew == NULL){ if (pConsumerNew == NULL){
code = -1; code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto END; goto END;
} }
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm"); pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
if (pTrans == NULL) { if (pTrans == NULL) {
code = -1; code = TSDB_CODE_MND_RETURN_VALUE_NULL;
goto END; if (terrno != 0) code = terrno;
}
code = validateTopics(pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false);
if (code != 0) {
goto END; goto END;
} }
code = mndSetConsumerCommitLogs(pTrans, pConsumerNew); TAOS_CHECK_GOTO(validateTopics(pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false), &lino, END);
if (code != 0) {
goto END; TAOS_CHECK_GOTO(mndSetConsumerCommitLogs(pTrans, pConsumerNew), &lino, END);
}
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
END: END:
@ -197,6 +191,7 @@ END:
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SMqConsumerClearMsg *pClearMsg = pMsg->pCont; SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
SMqConsumerObj *pConsumerNew = NULL; SMqConsumerObj *pConsumerNew = NULL;
@ -205,7 +200,7 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
mError("consumer:0x%" PRIx64 " failed to be found to clear it", pClearMsg->consumerId); mError("consumer:0x%" PRIx64 " failed to be found to clear it", pClearMsg->consumerId);
return 0; TAOS_RETURN(code);
} }
mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId, mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
@ -213,21 +208,20 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL); pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL);
if (pConsumerNew == NULL){ if (pConsumerNew == NULL){
code = -1; code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto END; goto END;
} }
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm"); pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
if (pTrans == NULL) { if (pTrans == NULL) {
code = -1; code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto END; goto END;
} }
// this is the drop action, not the update action // this is the drop action, not the update action
code = mndSetConsumerDropLogs(pTrans, pConsumerNew); TAOS_CHECK_GOTO(mndSetConsumerDropLogs(pTrans, pConsumerNew), &lino, END);
if (code != 0) {
goto END;
}
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
@ -310,10 +304,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMqHbRsp rsp = {0}; SMqHbRsp rsp = {0};
SMqConsumerObj *pConsumer = NULL; SMqConsumerObj *pConsumer = NULL;
if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) { TAOS_CHECK_GOTO(tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req), NULL, end);
code = TSDB_CODE_TMQ_INVALID_MSG;
goto end;
}
int64_t consumerId = req.consumerId; int64_t consumerId = req.consumerId;
pConsumer = mndAcquireConsumer(pMnode, consumerId); pConsumer = mndAcquireConsumer(pMnode, consumerId);
@ -322,10 +313,8 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
code = TSDB_CODE_MND_CONSUMER_NOT_EXIST; code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
goto end; goto end;
} }
code = checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user);
if (code != 0) { TAOS_CHECK_GOTO(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user), NULL, end);
goto end;
}
atomic_store_32(&pConsumer->hbStatus, 0); atomic_store_32(&pConsumer->hbStatus, 0);
@ -343,7 +332,7 @@ end:
tDestroySMqHbRsp(&rsp); tDestroySMqHbRsp(&rsp);
mndReleaseConsumer(pMnode, pConsumer); mndReleaseConsumer(pMnode, pConsumer);
tDestroySMqHbReq(&req); tDestroySMqHbReq(&req);
return code; TAOS_RETURN(code);
} }
static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t epoch, SMqAskEpRsp *rsp){ static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t epoch, SMqAskEpRsp *rsp){
@ -447,7 +436,7 @@ static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoc
pHead->walever = 0; pHead->walever = 0;
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqAskEpRsp(&abuf, rsp); TAOS_CHECK_RETURN(tEncodeSMqAskEpRsp(&abuf, rsp));
// send rsp // send rsp
pMsg->info.rsp = buf; pMsg->info.rsp = buf;
@ -461,9 +450,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
SMqAskEpRsp rsp = {0}; SMqAskEpRsp rsp = {0};
int32_t code = 0; int32_t code = 0;
if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) { TAOS_CHECK_RETURN(tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req));
return TSDB_CODE_OUT_OF_MEMORY;
}
int64_t consumerId = req.consumerId; int64_t consumerId = req.consumerId;
@ -517,19 +504,29 @@ END:
} }
int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer) { int32_t mndSetConsumerDropLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
int32_t code = 0;
SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer); SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
if (pCommitRaw == NULL) return -1; if (pCommitRaw == NULL) {
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; if (terrno != 0) code = terrno;
return 0; TAOS_RETURN(code);
}
TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
TAOS_RETURN(code);
} }
int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer) { int32_t mndSetConsumerCommitLogs(STrans *pTrans, SMqConsumerObj *pConsumer) {
int32_t code = 0;
SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer); SSdbRaw *pCommitRaw = mndConsumerActionEncode(pConsumer);
if (pCommitRaw == NULL) return -1; if (pCommitRaw == NULL) {
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1; if (terrno != 0) code = terrno;
return 0; TAOS_RETURN(code);
}
TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
TAOS_RETURN(code);
} }
static void freeItem(void *param) { static void freeItem(void *param) {
@ -592,8 +589,7 @@ static int32_t checkAndSortTopic(SMnode *pMnode, SArray *pTopicList){
for (int i = 0; i < newTopicNum; i++) { for (int i = 0; i < newTopicNum; i++) {
int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char *)taosArrayGetP(pTopicList, i)); int32_t gNum = mndGetGroupNumByTopic(pMnode, (const char *)taosArrayGetP(pTopicList, i));
if (gNum >= MND_MAX_GROUP_PER_TOPIC) { if (gNum >= MND_MAX_GROUP_PER_TOPIC) {
terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE; return TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
return -1;
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -672,7 +668,8 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
if (pTrans == NULL) { if (pTrans == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _over; goto _over;
} }
@ -683,14 +680,14 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
pConsumerNew = buildSubConsumer(pMnode, &subscribe); pConsumerNew = buildSubConsumer(pMnode, &subscribe);
if(pConsumerNew == NULL){ if(pConsumerNew == NULL){
code = -1; code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _over; goto _over;
} }
code = mndSetConsumerCommitLogs(pTrans, pConsumerNew);
if (code != 0) goto _over;
code = mndTransPrepare(pMnode, pTrans); TAOS_CHECK_GOTO(mndSetConsumerCommitLogs(pTrans, pConsumerNew), NULL, _over);
if (code != 0) goto _over;
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _over);
code = TSDB_CODE_ACTION_IN_PROGRESS; code = TSDB_CODE_ACTION_IN_PROGRESS;
_over: _over:

View File

@ -168,6 +168,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_QUERY_ID, "Invalid query id")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONN_ID, "Invalid connection id") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONN_ID, "Invalid connection id")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_USER_DISABLED, "User is disabled") TAOS_DEFINE_ERROR(TSDB_CODE_MND_USER_DISABLED, "User is disabled")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_PLATFORM, "Unsupported feature on this platform") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_PLATFORM, "Unsupported feature on this platform")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_RETURN_VALUE_NULL, "Return value is null")
// mnode-sdb // mnode-sdb
TAOS_DEFINE_ERROR(TSDB_CODE_SDB_OBJ_ALREADY_THERE, "Object already there") TAOS_DEFINE_ERROR(TSDB_CODE_SDB_OBJ_ALREADY_THERE, "Object already there")