Merge pull request #28158 from taosdata/fix/TD-32358
fix double send resp
This commit is contained in:
commit
2769258c52
|
@ -1012,10 +1012,10 @@ _OVER:
|
|||
|
||||
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
mndTransSetDbName(pTrans, pDb->name, pStb->name);
|
||||
TAOS_CHECK_RETURN (mndTransCheckConflict(pMnode, pTrans));
|
||||
TAOS_CHECK_RETURN (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb));
|
||||
TAOS_CHECK_RETURN (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb));
|
||||
TAOS_CHECK_RETURN (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb));
|
||||
TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans));
|
||||
TAOS_CHECK_RETURN(mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb));
|
||||
TAOS_CHECK_RETURN(mndSetCreateStbRedoActions(pMnode, pTrans, pDb, pStb));
|
||||
TAOS_CHECK_RETURN(mndSetCreateStbUndoActions(pMnode, pTrans, pDb, pStb));
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1051,7 +1051,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
|
|||
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS, .pCont = pHead, .contLen = contLen, .info = pReq->info};
|
||||
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
code = tmsgSendReq(&epSet, &rpcMsg);
|
||||
if (code != 0) {
|
||||
mError("vgId:%d, failed to send drop ttl table request to vnode since 0x%x", pVgroup->vgId, code);
|
||||
|
@ -1500,8 +1500,8 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
|
|||
|
||||
static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
|
||||
int32_t code = 0;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
||||
|
@ -1562,8 +1562,8 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName,
|
|||
|
||||
static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
|
||||
int32_t code = 0;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SStreamObj *pStream = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||
|
@ -1616,8 +1616,8 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName
|
|||
|
||||
static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
|
||||
int32_t code = 0;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SSmaObj *pSma = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||
|
@ -2233,7 +2233,7 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName,
|
|||
|
||||
static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion *pStbVer, bool *schema, bool *sma) {
|
||||
int32_t code = 0;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
snprintf(tbFName, sizeof(tbFName), "%s.%s", pStbVer->dbFName, pStbVer->stbName);
|
||||
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, pStbVer->dbFName);
|
||||
|
@ -2278,7 +2278,7 @@ static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion *pStbVer, bo
|
|||
|
||||
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
|
||||
int32_t code = 0;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
|
||||
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
|
||||
|
@ -2302,7 +2302,7 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char
|
|||
|
||||
static int32_t mndBuildStbCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp) {
|
||||
int32_t code = 0;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
|
||||
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
|
||||
|
@ -2656,7 +2656,7 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
|
|||
code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb);
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
||||
SName name = {0};
|
||||
SName name = {0};
|
||||
int32_t ret = 0;
|
||||
if ((ret = tNameFromString(&name, alterReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0)
|
||||
mError("stb:%s, failed to tNameFromString since %s", alterReq.name, tstrerror(ret));
|
||||
|
@ -2779,8 +2779,8 @@ _OVER:
|
|||
|
||||
static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid) {
|
||||
int32_t code = 0;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
||||
|
@ -2839,8 +2839,8 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
|
|||
|
||||
static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, int64_t suid) {
|
||||
int32_t code = 0;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SStreamObj *pStream = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||
|
@ -2945,7 +2945,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
|
|||
code = mndDropStb(pMnode, pReq, pDb, pStb);
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
||||
SName name = {0};
|
||||
SName name = {0};
|
||||
int32_t ret = 0;
|
||||
if ((ret = tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0)
|
||||
mError("stb:%s, failed to tNameFromString since %s", dropReq.name, tstrerror(ret));
|
||||
|
@ -3016,7 +3016,7 @@ _OVER:
|
|||
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
tFreeSTableMetaRsp(&metaRsp);
|
||||
//TODO change to TAOS_RETURN
|
||||
// TODO change to TAOS_RETURN
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -3562,7 +3562,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
|
|||
|
||||
SName name = {0};
|
||||
|
||||
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
|
||||
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
|
||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
|
@ -4259,7 +4259,9 @@ static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq) {
|
|||
code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
|
||||
if (code) goto _OVER;
|
||||
}
|
||||
if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0) code = 0;
|
||||
if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0) {
|
||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
_OVER:
|
||||
tFreeSMDropTbsReq(&dropReq);
|
||||
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
|
||||
|
@ -4458,7 +4460,7 @@ static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) {
|
|||
|
||||
code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
|
||||
if (code) goto _end;
|
||||
if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0) code = 0;
|
||||
if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
_end:
|
||||
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
|
||||
tDecoderClear(&decoder);
|
||||
|
|
Loading…
Reference in New Issue