enh: If an 'object is creating' error is reported when creating a database, wait for the transaction to end then response

This commit is contained in:
Shengliang Guan 2022-07-19 12:00:43 +08:00
parent 631eec563d
commit 83ac069bef
7 changed files with 131 additions and 67 deletions

View File

@ -35,6 +35,46 @@
extern "C" { extern "C" {
#endif #endif
typedef enum {
MND_OPER_CONNECT = 1,
MND_OPER_CREATE_ACCT,
MND_OPER_DROP_ACCT,
MND_OPER_ALTER_ACCT,
MND_OPER_CREATE_USER,
MND_OPER_DROP_USER,
MND_OPER_ALTER_USER,
MND_OPER_CREATE_BNODE,
MND_OPER_DROP_BNODE,
MND_OPER_CREATE_DNODE,
MND_OPER_DROP_DNODE,
MND_OPER_CONFIG_DNODE,
MND_OPER_CREATE_MNODE,
MND_OPER_DROP_MNODE,
MND_OPER_CREATE_QNODE,
MND_OPER_DROP_QNODE,
MND_OPER_CREATE_SNODE,
MND_OPER_DROP_SNODE,
MND_OPER_REDISTRIBUTE_VGROUP,
MND_OPER_MERGE_VGROUP,
MND_OPER_SPLIT_VGROUP,
MND_OPER_BALANCE_VGROUP,
MND_OPER_CREATE_FUNC,
MND_OPER_DROP_FUNC,
MND_OPER_KILL_TRANS,
MND_OPER_KILL_CONN,
MND_OPER_KILL_QUERY,
MND_OPER_CREATE_DB,
MND_OPER_ALTER_DB,
MND_OPER_DROP_DB,
MND_OPER_COMPACT_DB,
MND_OPER_TRIM_DB,
MND_OPER_USE_DB,
MND_OPER_WRITE_DB,
MND_OPER_READ_DB,
MND_OPER_READ_OR_WRITE_DB,
MND_OPER_SHOW_VARIBALES,
} EOperType;
typedef enum { typedef enum {
MND_AUTH_ACCT_START = 0, MND_AUTH_ACCT_START = 0,
MND_AUTH_ACCT_USER, MND_AUTH_ACCT_USER,
@ -109,9 +149,9 @@ typedef struct {
ETrnPolicy policy; ETrnPolicy policy;
ETrnConflct conflict; ETrnConflct conflict;
ETrnExec exec; ETrnExec exec;
EOperType oper;
int32_t code; int32_t code;
int32_t failedTimes; int32_t failedTimes;
SRpcHandleInfo rpcInfo;
void* rpcRsp; void* rpcRsp;
int32_t rpcRspLen; int32_t rpcRspLen;
int32_t redoActionPos; int32_t redoActionPos;
@ -130,6 +170,7 @@ typedef struct {
int32_t stopFunc; int32_t stopFunc;
int32_t paramLen; int32_t paramLen;
void* param; void* param;
SArray* pRpcArray;
} STrans; } STrans;
typedef struct { typedef struct {

View File

@ -22,46 +22,6 @@
extern "C" { extern "C" {
#endif #endif
typedef enum {
MND_OPER_CONNECT = 1,
MND_OPER_CREATE_ACCT,
MND_OPER_DROP_ACCT,
MND_OPER_ALTER_ACCT,
MND_OPER_CREATE_USER,
MND_OPER_DROP_USER,
MND_OPER_ALTER_USER,
MND_OPER_CREATE_BNODE,
MND_OPER_DROP_BNODE,
MND_OPER_CREATE_DNODE,
MND_OPER_DROP_DNODE,
MND_OPER_CONFIG_DNODE,
MND_OPER_CREATE_MNODE,
MND_OPER_DROP_MNODE,
MND_OPER_CREATE_QNODE,
MND_OPER_DROP_QNODE,
MND_OPER_CREATE_SNODE,
MND_OPER_DROP_SNODE,
MND_OPER_REDISTRIBUTE_VGROUP,
MND_OPER_MERGE_VGROUP,
MND_OPER_SPLIT_VGROUP,
MND_OPER_BALANCE_VGROUP,
MND_OPER_CREATE_FUNC,
MND_OPER_DROP_FUNC,
MND_OPER_KILL_TRANS,
MND_OPER_KILL_CONN,
MND_OPER_KILL_QUERY,
MND_OPER_CREATE_DB,
MND_OPER_ALTER_DB,
MND_OPER_DROP_DB,
MND_OPER_COMPACT_DB,
MND_OPER_TRIM_DB,
MND_OPER_USE_DB,
MND_OPER_WRITE_DB,
MND_OPER_READ_DB,
MND_OPER_READ_OR_WRITE_DB,
MND_OPER_SHOW_VARIBALES,
} EOperType;
int32_t mndInitPrivilege(SMnode *pMnode); int32_t mndInitPrivilege(SMnode *pMnode);
void mndCleanupPrivilege(SMnode *pMnode); void mndCleanupPrivilege(SMnode *pMnode);

View File

@ -73,12 +73,14 @@ void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen); void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2); void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2);
void mndTransSetSerial(STrans *pTrans); void mndTransSetSerial(STrans *pTrans);
void mndTransSetOper(STrans *pTrans, EOperType oper);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
int32_t mndTransProcessRsp(SRpcMsg *pRsp); int32_t mndTransProcessRsp(SRpcMsg *pRsp);
void mndTransPullup(SMnode *pMnode); void mndTransPullup(SMnode *pMnode);
int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans); int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans);
void mndTransExecute(SMnode *pMnode, STrans *pTrans); void mndTransExecute(SMnode *pMnode, STrans *pTrans);
int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, const char *dbname);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -487,6 +487,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db); mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
mndTransSetDbName(pTrans, dbObj.name, NULL); mndTransSetDbName(pTrans, dbObj.name, NULL);
mndTransSetOper(pTrans, MND_OPER_CREATE_DB);
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
@ -534,6 +535,14 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
terrno = TSDB_CODE_MND_DB_ALREADY_EXIST; terrno = TSDB_CODE_MND_DB_ALREADY_EXIST;
goto _OVER; goto _OVER;
} }
} else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
if (mndSetRpcInfoForDbTrans(pMnode, pReq, MND_OPER_CREATE_DB, createReq.db) == 0) {
mDebug("db:%s, is creating and response after trans finished", createReq.db);
code = TSDB_CODE_ACTION_IN_PROGRESS;
goto _OVER;
} else {
goto _OVER;
}
} else if (terrno != TSDB_CODE_MND_DB_NOT_EXIST) { } else if (terrno != TSDB_CODE_MND_DB_NOT_EXIST) {
goto _OVER; goto _OVER;
} }

View File

@ -122,6 +122,10 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT8(pRaw, dataPos, pTrans->policy, _OVER) SDB_SET_INT8(pRaw, dataPos, pTrans->policy, _OVER)
SDB_SET_INT8(pRaw, dataPos, pTrans->conflict, _OVER) SDB_SET_INT8(pRaw, dataPos, pTrans->conflict, _OVER)
SDB_SET_INT8(pRaw, dataPos, pTrans->exec, _OVER) SDB_SET_INT8(pRaw, dataPos, pTrans->exec, _OVER)
SDB_SET_INT8(pRaw, dataPos, pTrans->oper, _OVER)
SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_DB_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_DB_FNAME_LEN, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_DB_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_DB_FNAME_LEN, _OVER)
@ -269,15 +273,22 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
int8_t policy = 0; int8_t policy = 0;
int8_t conflict = 0; int8_t conflict = 0;
int8_t exec = 0; int8_t exec = 0;
int8_t oper = 0;
int8_t reserved = 0;
int8_t actionType = 0; int8_t actionType = 0;
SDB_GET_INT8(pRaw, dataPos, &stage, _OVER) SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
SDB_GET_INT8(pRaw, dataPos, &policy, _OVER) SDB_GET_INT8(pRaw, dataPos, &policy, _OVER)
SDB_GET_INT8(pRaw, dataPos, &conflict, _OVER) SDB_GET_INT8(pRaw, dataPos, &conflict, _OVER)
SDB_GET_INT8(pRaw, dataPos, &exec, _OVER) SDB_GET_INT8(pRaw, dataPos, &exec, _OVER)
SDB_GET_INT8(pRaw, dataPos, &oper, _OVER)
SDB_GET_INT8(pRaw, dataPos, &reserved, _OVER)
SDB_GET_INT8(pRaw, dataPos, &reserved, _OVER)
SDB_GET_INT8(pRaw, dataPos, &reserved, _OVER)
pTrans->stage = stage; pTrans->stage = stage;
pTrans->policy = policy; pTrans->policy = policy;
pTrans->conflict = conflict; pTrans->conflict = conflict;
pTrans->exec = exec; pTrans->exec = exec;
pTrans->oper = oper;
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_DB_FNAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_DB_FNAME_LEN, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_DB_FNAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_DB_FNAME_LEN, _OVER)
@ -495,6 +506,10 @@ static void mndTransDropData(STrans *pTrans) {
mndTransDropActions(pTrans->commitActions); mndTransDropActions(pTrans->commitActions);
pTrans->commitActions = NULL; pTrans->commitActions = NULL;
} }
if (pTrans->pRpcArray != NULL) {
taosArrayDestroy(pTrans->pRpcArray);
pTrans->pRpcArray = NULL;
}
if (pTrans->rpcRsp != NULL) { if (pTrans->rpcRsp != NULL) {
taosMemoryFree(pTrans->rpcRsp); taosMemoryFree(pTrans->rpcRsp);
pTrans->rpcRsp = NULL; pTrans->rpcRsp = NULL;
@ -585,14 +600,18 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL) { if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL ||
pTrans->pRpcArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to create transaction since %s", terrstr()); mError("failed to create transaction since %s", terrstr());
return NULL; return NULL;
} }
if (pReq != NULL) pTrans->rpcInfo = pReq->info; if (pReq != NULL) {
taosArrayPush(pTrans->pRpcArray, &pReq->info);
}
mTrace("trans:%d, local object is created, data:%p", pTrans->id, pTrans); mTrace("trans:%d, local object is created, data:%p", pTrans->id, pTrans);
return pTrans; return pTrans;
} }
@ -677,6 +696,31 @@ void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *
pTrans->paramLen = paramLen; pTrans->paramLen = paramLen;
} }
int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, const char *dbname) {
STrans *pTrans = NULL;
void *pIter = NULL;
int32_t code = -1;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
if (pIter == NULL) break;
if (pTrans->oper == oper) {
if (strcasecmp(dbname, pTrans->dbname1) == 0) {
mDebug("trans:%d, db:%s oper:%d matched with input", pTrans->id, dbname, oper);
if (taosArrayPush(pTrans->pRpcArray, &pMsg->info) != NULL) {
code = 0;
}
sdbRelease(pMnode->pSdb, pTrans);
break;
}
}
sdbRelease(pMnode->pSdb, pTrans);
}
return code;
}
void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2) { void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2) {
if (dbname1 != NULL) { if (dbname1 != NULL) {
memcpy(pTrans->dbname1, dbname1, TSDB_DB_FNAME_LEN); memcpy(pTrans->dbname1, dbname1, TSDB_DB_FNAME_LEN);
@ -688,6 +732,8 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2)
void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; } void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; }
void mndTransSetOper(STrans *pTrans, EOperType oper) { pTrans->oper = oper; }
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
SSdbRaw *pRaw = mndTransActionEncode(pTrans); SSdbRaw *pRaw = mndTransActionEncode(pTrans);
if (pRaw == NULL) { if (pRaw == NULL) {
@ -711,7 +757,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
static bool mndCheckDbConflict(const char *db, STrans *pTrans) { static bool mndCheckDbConflict(const char *db, STrans *pTrans) {
if (db[0] == 0) return false; if (db[0] == 0) return false;
if (strcmp(db, pTrans->dbname1) == 0 || strcmp(db, pTrans->dbname2) == 0) return true; if (strcasecmp(db, pTrans->dbname1) == 0 || strcasecmp(db, pTrans->dbname2) == 0) return true;
return false; return false;
} }
@ -784,9 +830,10 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
return -1; return -1;
} }
pNew->rpcInfo = pTrans->rpcInfo; pNew->pRpcArray = pTrans->pRpcArray;
pNew->rpcRsp = pTrans->rpcRsp; pNew->rpcRsp = pTrans->rpcRsp;
pNew->rpcRspLen = pTrans->rpcRspLen; pNew->rpcRspLen = pTrans->rpcRspLen;
pTrans->pRpcArray = NULL;
pTrans->rpcRsp = NULL; pTrans->rpcRsp = NULL;
pTrans->rpcRspLen = 0; pTrans->rpcRspLen = 0;
@ -835,13 +882,20 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
} }
} }
if (sendRsp && pTrans->rpcInfo.handle != NULL) { if (!sendRsp) return;
int32_t size = taosArrayGetSize(pTrans->pRpcArray);
if (size <= 0) return;
for (int32_t i = 0; i < size; ++i) {
SRpcHandleInfo *pInfo = taosArrayGet(pTrans->pRpcArray, i);
if (pInfo->handle != NULL) {
mDebug("trans:%d, send rsp, code:0x%x stage:%s app:%p", pTrans->id, code, mndTransStr(pTrans->stage), mDebug("trans:%d, send rsp, code:0x%x stage:%s app:%p", pTrans->id, code, mndTransStr(pTrans->stage),
pTrans->rpcInfo.ahandle); pInfo->ahandle);
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
code = TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL; code = TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL;
} }
SRpcMsg rspMsg = {.code = code, .info = pTrans->rpcInfo}; SRpcMsg rspMsg = {.code = code, .info = *pInfo};
if (pTrans->rpcRspLen != 0) { if (pTrans->rpcRspLen != 0) {
void *rpcCont = rpcMallocCont(pTrans->rpcRspLen); void *rpcCont = rpcMallocCont(pTrans->rpcRspLen);
@ -850,15 +904,13 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
rspMsg.pCont = rpcCont; rspMsg.pCont = rpcCont;
rspMsg.contLen = pTrans->rpcRspLen; rspMsg.contLen = pTrans->rpcRspLen;
} }
taosMemoryFree(pTrans->rpcRsp);
} }
tmsgSendRsp(&rspMsg); tmsgSendRsp(&rspMsg);
pTrans->rpcInfo.handle = NULL;
pTrans->rpcRsp = NULL;
pTrans->rpcRspLen = 0;
} }
} }
taosArrayClear(pTrans->pRpcArray);
}
int32_t mndTransProcessRsp(SRpcMsg *pRsp) { int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
SMnode *pMnode = pRsp->info.node; SMnode *pMnode = pRsp->info.node;

View File

@ -152,7 +152,7 @@ endi
system_content sh/checkValgrind.sh -n dnode2 system_content sh/checkValgrind.sh -n dnode2
print cmd return result ----> [ $system_content ] print cmd return result ----> [ $system_content ]
if $system_content > 4 then if $system_content > 0 then
return -1 return -1
endi endi