From 607ca02f30936444dc20e7890d385262cd966a41 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 29 Jul 2020 05:29:54 +0000 Subject: [PATCH] [TD-997] --- src/inc/mnode.h | 1 + src/mnode/inc/mnodeSdb.h | 12 ++- src/mnode/src/mnodeAcct.c | 2 +- src/mnode/src/mnodeCluster.c | 11 +- src/mnode/src/mnodeDb.c | 41 ++++---- src/mnode/src/mnodeDnode.c | 17 +-- src/mnode/src/mnodeMnode.c | 9 +- src/mnode/src/mnodeSdb.c | 72 +++++++++++-- src/mnode/src/mnodeTable.c | 198 ++++++++++++++++------------------- src/mnode/src/mnodeUser.c | 14 +-- src/mnode/src/mnodeVgroup.c | 70 +++++++++---- 11 files changed, 252 insertions(+), 195 deletions(-) diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 01358fdb44..03a25e0f92 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -42,6 +42,7 @@ typedef struct SMnodeMsg { int8_t expected; int8_t retry; int32_t code; + void * pObj; struct SAcctObj * pAcct; struct SDnodeObj *pDnode; struct SUserObj * pUser; diff --git a/src/mnode/inc/mnodeSdb.h b/src/mnode/inc/mnodeSdb.h index cadd1b1f3d..f4854f69a0 100644 --- a/src/mnode/inc/mnodeSdb.h +++ b/src/mnode/inc/mnodeSdb.h @@ -47,15 +47,16 @@ typedef enum { SDB_OPER_LOCAL } ESdbOper; -typedef struct { +typedef struct SSdbOper { ESdbOper type; - void * table; - void * pObj; - void * rowData; int32_t rowSize; int32_t retCode; // for callback in sdb queue int32_t processedCount; // for sync fwd callback - int32_t (*cb)(struct SMnodeMsg *pMsg, int32_t code); + int32_t (*reqFp)(struct SMnodeMsg *pMsg); + int32_t (*writeCb)(struct SMnodeMsg *pMsg, int32_t code); + void * table; + void * pObj; + void * rowData; struct SMnodeMsg *pMsg; } SSdbOper; @@ -86,6 +87,7 @@ void sdbUpdateMnodeRoles(); int32_t sdbInsertRow(SSdbOper *pOper); int32_t sdbDeleteRow(SSdbOper *pOper); int32_t sdbUpdateRow(SSdbOper *pOper); +int32_t sdbInsertRowImp(SSdbOper *pOper); void *sdbGetRow(void *handle, void *key); void *sdbFetchRow(void *handle, void *pIter, void **ppRow); diff --git a/src/mnode/src/mnodeAcct.c b/src/mnode/src/mnodeAcct.c index 287e8bb723..c40a696ede 100644 --- a/src/mnode/src/mnodeAcct.c +++ b/src/mnode/src/mnodeAcct.c @@ -85,7 +85,7 @@ static int32_t mnodeAcctActionRestored() { if (numOfRows <= 0 && dnodeIsFirstDeploy()) { mInfo("dnode first deploy, create root acct"); int32_t code = mnodeCreateRootAcct(); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("failed to create root account, reason:%s", tstrerror(code)); return code; } diff --git a/src/mnode/src/mnodeCluster.c b/src/mnode/src/mnodeCluster.c index 8933107dd1..41727712b5 100644 --- a/src/mnode/src/mnodeCluster.c +++ b/src/mnode/src/mnodeCluster.c @@ -71,7 +71,7 @@ static int32_t mnodeClusterActionRestored() { if (numOfRows <= 0 && dnodeIsFirstDeploy()) { mInfo("dnode first deploy, create cluster"); int32_t code = mnodeCreateCluster(); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("failed to create cluster, reason:%s", tstrerror(code)); return code; } @@ -159,16 +159,15 @@ int32_t mnodeGetClusterId() { void mnodeUpdateClusterId() { SClusterObj *pCluster = NULL; - mnodeGetNextCluster(NULL, &pCluster); + void *pIter = mnodeGetNextCluster(NULL, &pCluster); if (pCluster != NULL) { tsClusterId = pCluster->clusterId; - mnodeDecClusterRef(pCluster); mInfo("cluster id is %d", tsClusterId); - } else { - //assert(false); } -} + mnodeDecClusterRef(pCluster); + sdbFreeIter(pIter); +} static int32_t mnodeGetClusterMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { int32_t cols = 0; diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 199cc63da6..82779e7d69 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -324,8 +324,10 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) { static int32_t mnodeCreateDbCb(SMnodeMsg *pMsg, int32_t code) { SDbObj *pDb = pMsg->pDb; - if (pDb != NULL) { + if (code == TSDB_CODE_SUCCESS) { mLInfo("db:%s, is created by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); + } else { + mError("db:%s, failed to create by %s, reason:%s", pDb->name, mnodeGetUserFromMsg(pMsg), tstrerror(code)); } return code; @@ -386,17 +388,16 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMs .pObj = pDb, .rowSize = sizeof(SDbObj), .pMsg = pMsg, - .cb = mnodeCreateDbCb + .writeCb = mnodeCreateDbCb }; code = sdbInsertRow(&oper); - if (code != TSDB_CODE_SUCCESS) { - mLInfo("db:%s, failed to create, reason:%s", pDb->name, tstrerror(code)); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("db:%s, failed to create, reason:%s", pDb->name, tstrerror(code)); mnodeDestroyDb(pDb); - return code; - } else { - return TSDB_CODE_MND_ACTION_IN_PROGRESS; } + + return code; } bool mnodeCheckIsMonitorDB(char *db, char *monitordb) { @@ -754,8 +755,8 @@ static int32_t mnodeSetDbDropping(SDbObj *pDb) { }; int32_t code = sdbUpdateRow(&oper); - if (code != TSDB_CODE_SUCCESS) { - return TSDB_CODE_MND_SDB_ERROR; + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("db:%s, failed to set dropping state, reason:%s", pDb->name, tstrerror(code)); } return code; @@ -947,12 +948,12 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter, void *pMsg) { .table = tsDbSdb, .pObj = pDb, .pMsg = pMsg, - .cb = mnodeAlterDbCb + .writeCb = mnodeAlterDbCb }; code = sdbUpdateRow(&oper); - if (code == TSDB_CODE_SUCCESS) { - if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("db:%s, failed to alter, reason:%s", pDb->name, tstrerror(code)); } } @@ -995,16 +996,16 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) { mInfo("db:%s, drop db from sdb", pDb->name); SSdbOper oper = { - .type = SDB_OPER_GLOBAL, - .table = tsDbSdb, - .pObj = pDb, - .pMsg = pMsg, - .cb = mnodeDropDbCb + .type = SDB_OPER_GLOBAL, + .table = tsDbSdb, + .pObj = pDb, + .pMsg = pMsg, + .writeCb = mnodeDropDbCb }; int32_t code = sdbDeleteRow(&oper); - if (code == TSDB_CODE_SUCCESS) { - code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("db:%s, failed to drop, reason:%s", pDb->name, tstrerror(code)); } return code; @@ -1031,7 +1032,7 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) { } int32_t code = mnodeSetDbDropping(pMsg->pDb); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("db:%s, failed to drop, reason:%s", pDrop->db, tstrerror(code)); return code; } diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index aa05eade24..0b4162c772 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -261,7 +261,8 @@ void mnodeUpdateDnode(SDnodeObj *pDnode) { .pObj = pDnode }; - if (sdbUpdateRow(&oper) != 0) { + int32_t code = sdbUpdateRow(&oper); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("dnodeId:%d, failed update", pDnode->dnodeId); } } @@ -501,13 +502,12 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) { }; int32_t code = sdbInsertRow(&oper); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { int dnodeId = pDnode->dnodeId; tfree(pDnode); - mError("failed to create dnode:%d, result:%s", dnodeId, tstrerror(code)); + mError("failed to create dnode:%d, reason:%s", dnodeId, tstrerror(code)); } else { - mInfo("dnode:%d is created, result:%s", pDnode->dnodeId, tstrerror(code)); - if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + mLInfo("dnode:%d is created", pDnode->dnodeId); } return code; @@ -522,9 +522,10 @@ int32_t mnodeDropDnode(SDnodeObj *pDnode, void *pMsg) { }; int32_t code = sdbDeleteRow(&oper); - if (code == TSDB_CODE_SUCCESS) { - mLInfo("dnode:%d, is dropped from cluster, result:%s", pDnode->dnodeId, tstrerror(code)); - if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("dnode:%d, failed to drop from cluster, result:%s", pDnode->dnodeId, tstrerror(code)); + } else { + mLInfo("dnode:%d, is dropped from cluster", pDnode->dnodeId); } return code; diff --git a/src/mnode/src/mnodeMnode.c b/src/mnode/src/mnodeMnode.c index 2fff885fca..5f82a9afad 100644 --- a/src/mnode/src/mnodeMnode.c +++ b/src/mnode/src/mnodeMnode.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taoserror.h" +#include "tglobal.h" #include "trpc.h" #include "tsync.h" #include "tbalance.h" @@ -31,8 +32,6 @@ #include "mnodeShow.h" #include "mnodeUser.h" -#include "tglobal.h" - static void * tsMnodeSdb = NULL; static int32_t tsMnodeUpdateSize = 0; static SRpcEpSet tsMnodeEpSetForShell; @@ -279,9 +278,8 @@ int32_t mnodeAddMnode(int32_t dnodeId) { }; int32_t code = sdbInsertRow(&oper); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { tfree(pMnode); - code = TSDB_CODE_MND_SDB_ERROR; } mnodeUpdateMnodeEpSet(); @@ -313,9 +311,6 @@ int32_t mnodeDropMnode(int32_t dnodeId) { }; int32_t code = sdbDeleteRow(&oper); - if (code != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_MND_SDB_ERROR; - } sdbDecRef(tsMnodeSdb, pMnode); diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 26efcfeac0..374430879d 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -101,6 +101,11 @@ static int32_t sdbInitWriteWorker(); static void sdbCleanupWriteWorker(); static int32_t sdbAllocWriteQueue(); static void sdbFreeWritequeue(); +static int32_t sdbUpdateRowImp(SSdbOper *pOper); +static int32_t sdbDeleteRowImp(SSdbOper *pOper); +static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper); +static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper); +static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper); int32_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->autoIndex; @@ -260,8 +265,20 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { tstrerror(code)); } - if (pOper->cb != NULL) { - pOper->retCode = (*pOper->cb)(pMsg, pOper->retCode); + // failed to forward, need revert insert + if (pOper->retCode != TSDB_CODE_SUCCESS) { + SWalHead *pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK; + int32_t action = pHead->msgType % 10; + sdbError("table:%s record:%p:%s ver:%" PRIu64 ", action:%d failed to foward reason:%s", + ((SSdbTable *)pOper->table)->tableName, pOper->pObj, sdbGetKeyStr(pOper->table, pHead->cont), + pHead->version, action, tstrerror(pOper->retCode)); + if (action == SDB_ACTION_INSERT) { + sdbDeleteHash(pOper->table, pOper); + } + } + + if (pOper->writeCb != NULL) { + pOper->retCode = (*pOper->writeCb)(pMsg, pOper->retCode); } dnodeSendRpcMnodeWriteRsp(pMsg, pOper->retCode); @@ -269,6 +286,7 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { if (ahandle == NULL) { sdbDecRef(pOper->table, pOper->pObj); } + taosFreeQitem(pOper); } @@ -609,7 +627,8 @@ int32_t sdbInsertRow(SSdbOper *pOper) { if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (sdbGetRowFromObj(pTable, pOper->pObj)) { - sdbError("table:%s, failed to insert record:%s, already exist", pTable->tableName, sdbGetKeyStrFromObj(pTable, pOper->pObj)); + sdbError("table:%s, failed to insert record:%s, already exist", pTable->tableName, + sdbGetKeyStrFromObj(pTable, pOper->pObj)); sdbDecRef(pTable, pOper->pObj); return TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE; } @@ -634,9 +653,20 @@ int32_t sdbInsertRow(SSdbOper *pOper) { return TSDB_CODE_SUCCESS; } + if (pOper->reqFp) { + return (*pOper->reqFp)(pOper->pMsg); + } else { + return sdbInsertRowImp(pOper); + } +} + +int32_t sdbInsertRowImp(SSdbOper *pOper) { + SSdbTable *pTable = (SSdbTable *)pOper->table; + if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; + int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; SSdbOper *pNewOper = taosAllocateQitem(size); - + SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK; pHead->version = 0; pHead->len = pOper->rowSize; @@ -655,7 +685,8 @@ int32_t sdbInsertRow(SSdbOper *pOper) { sdbIncRef(pNewOper->table, pNewOper->pObj); taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper); - return TSDB_CODE_SUCCESS; + + return TSDB_CODE_MND_ACTION_IN_PROGRESS; } bool sdbCheckRowDeleted(void *pTableInput, void *pRow) { @@ -664,7 +695,6 @@ bool sdbCheckRowDeleted(void *pTableInput, void *pRow) { int32_t *updateEnd = pRow + pTable->refCountPos - 4; return atomic_val_compare_exchange_32(updateEnd, 1, 1) == 1; - // return (*updateEnd == 1); } int32_t sdbDeleteRow(SSdbOper *pOper) { @@ -692,13 +722,24 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { return TSDB_CODE_SUCCESS; } + if (pOper->reqFp) { + return (*pOper->reqFp)(pOper->pMsg); + } else { + return sdbDeleteRowImp(pOper); + } +} + +int32_t sdbDeleteRowImp(SSdbOper *pOper) { + SSdbTable *pTable = (SSdbTable *)pOper->table; + if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; + int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; SSdbOper *pNewOper = taosAllocateQitem(size); SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK; pHead->version = 0; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE; - + pOper->rowData = pHead->cont; (*pTable->encodeFp)(pOper); pHead->len = pOper->rowSize; @@ -711,7 +752,8 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { } taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper); - return TSDB_CODE_SUCCESS; + + return TSDB_CODE_MND_ACTION_IN_PROGRESS; } int32_t sdbUpdateRow(SSdbOper *pOper) { @@ -735,6 +777,17 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { return TSDB_CODE_SUCCESS; } + if (pOper->reqFp) { + return (*pOper->reqFp)(pOper->pMsg); + } else { + return sdbUpdateRowImp(pOper); + } +} + +int32_t sdbUpdateRowImp(SSdbOper *pOper) { + SSdbTable *pTable = (SSdbTable *)pOper->table; + if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; + int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; SSdbOper *pNewOper = taosAllocateQitem(size); @@ -755,7 +808,8 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { sdbIncRef(pNewOper->table, pNewOper->pObj); taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper); - return TSDB_CODE_SUCCESS; + + return TSDB_CODE_MND_ACTION_IN_PROGRESS; } void *sdbFetchRow(void *handle, void *pNode, void **ppRow) { diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 053a1522a7..df80fb1c47 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -865,18 +865,17 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { .pObj = pStable, .rowSize = sizeof(SSuperTableObj) + schemaSize, .pMsg = pMsg, - .cb = mnodeCreateSuperTableCb + .writeCb = mnodeCreateSuperTableCb }; int32_t code = sdbInsertRow(&oper); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mnodeDestroySuperTable(pStable); pMsg->pTable = NULL; mError("app:%p:%p, table:%s, failed to create, sdb error", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId); - return code; - } else { - return TSDB_CODE_MND_ACTION_IN_PROGRESS; } + + return code; } static int32_t mnodeDropSuperTableCb(SMnodeMsg *pMsg, int32_t code) { @@ -924,13 +923,15 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { .table = tsSuperTableSdb, .pObj = pStable, .pMsg = pMsg, - .cb = mnodeDropSuperTableCb + .writeCb = mnodeDropSuperTableCb }; int32_t code = sdbDeleteRow(&oper); - if (code == TSDB_CODE_SUCCESS) { - return TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("app:%p:%p, table:%s, failed to drop, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId, + tstrerror(code)); } + return code; } @@ -995,15 +996,10 @@ static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t .table = tsSuperTableSdb, .pObj = pStable, .pMsg = pMsg, - .cb = mnodeAddSuperTableTagCb + .writeCb = mnodeAddSuperTableTagCb }; - int32_t code = sdbUpdateRow(&oper); - if (code == TSDB_CODE_SUCCESS) { - code = TSDB_CODE_MND_ACTION_IN_PROGRESS; - } - - return code; + return sdbUpdateRow(&oper); } static int32_t mnodeDropSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) { @@ -1034,15 +1030,10 @@ static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) { .table = tsSuperTableSdb, .pObj = pStable, .pMsg = pMsg, - .cb = mnodeDropSuperTableTagCb + .writeCb = mnodeDropSuperTableTagCb }; - int32_t code = sdbUpdateRow(&oper); - if (code == TSDB_CODE_SUCCESS) { - code = TSDB_CODE_MND_ACTION_IN_PROGRESS; - } - - return code; + return sdbUpdateRow(&oper); } static int32_t mnodeModifySuperTableTagNameCb(SMnodeMsg *pMsg, int32_t code) { @@ -1083,15 +1074,10 @@ static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, c .table = tsSuperTableSdb, .pObj = pStable, .pMsg = pMsg, - .cb = mnodeModifySuperTableTagNameCb + .writeCb = mnodeModifySuperTableTagNameCb }; - int32_t code = sdbUpdateRow(&oper); - if (code == TSDB_CODE_SUCCESS) { - code = TSDB_CODE_MND_ACTION_IN_PROGRESS; - } - - return code; + return sdbUpdateRow(&oper); } static int32_t mnodeFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName) { @@ -1162,15 +1148,10 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32 .table = tsSuperTableSdb, .pObj = pStable, .pMsg = pMsg, - .cb = mnodeAddSuperTableColumnCb + .writeCb = mnodeAddSuperTableColumnCb }; - int32_t code = sdbUpdateRow(&oper); - if (code == TSDB_CODE_SUCCESS) { - code = TSDB_CODE_MND_ACTION_IN_PROGRESS; - } - - return code; + return sdbUpdateRow(&oper); } static int32_t mnodeDropSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) { @@ -1212,15 +1193,10 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) { .table = tsSuperTableSdb, .pObj = pStable, .pMsg = pMsg, - .cb = mnodeDropSuperTableColumnCb + .writeCb = mnodeDropSuperTableColumnCb }; - int32_t code = sdbUpdateRow(&oper); - if (code == TSDB_CODE_SUCCESS) { - code = TSDB_CODE_MND_ACTION_IN_PROGRESS; - } - - return code; + return sdbUpdateRow(&oper); } static int32_t mnodeChangeSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) { @@ -1261,15 +1237,10 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg, char *oldName, char .table = tsSuperTableSdb, .pObj = pStable, .pMsg = pMsg, - .cb = mnodeChangeSuperTableColumnCb + .writeCb = mnodeChangeSuperTableColumnCb }; - int32_t code = sdbUpdateRow(&oper); - if (code == TSDB_CODE_SUCCESS) { - code = TSDB_CODE_MND_ACTION_IN_PROGRESS; - } - - return code; + return sdbUpdateRow(&oper); } // show super tables @@ -1645,20 +1616,12 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO return pCreate; } -static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { +static int32_t mnodeDoCreateChildTableFp(SMnodeMsg *pMsg) { SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; assert(pTable); - if (code == TSDB_CODE_SUCCESS) { - mDebug("app:%p:%p, table:%s, created in mnode, vgId:%d sid:%d, uid:%" PRIu64 ", result:%s", pMsg->rpcMsg.ahandle, - pMsg, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid, tstrerror(code)); - } else { - mError("app:%p:%p, table:%s, failed to create table sid:%d, uid:%" PRIu64 ", reason:%s", pMsg->rpcMsg.ahandle, pMsg, - pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code)); - SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsChildTableSdb}; - sdbDeleteRow(&desc); - return code; - } + mDebug("app:%p:%p, table:%s, created in mnode, vgId:%d sid:%d, uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg, + pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid); SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, pTable); @@ -1679,6 +1642,34 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } +static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { + SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; + SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; + assert(pTable); + + if (code == TSDB_CODE_SUCCESS) { + if (pCreate->getMeta) { + mDebug("app:%p:%p, table:%s, created in dnode and continue to get meta, thandle:%p", pMsg->rpcMsg.ahandle, pMsg, + pTable->info.tableId, pMsg->rpcMsg.handle); + + pMsg->retry = 0; + dnodeReprocessMnodeWriteMsg(pMsg); + } else { + mDebug("app:%p:%p, table:%s, created in dnode, thandle:%p", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, + pMsg->rpcMsg.handle); + + dnodeSendRpcMnodeWriteRsp(pMsg, TSDB_CODE_SUCCESS); + } + return TSDB_CODE_MND_ACTION_IN_PROGRESS; + } else { + mError("app:%p:%p, table:%s, failed to create table sid:%d, uid:%" PRIu64 ", reason:%s", pMsg->rpcMsg.ahandle, pMsg, + pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code)); + SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsChildTableSdb}; + sdbDeleteRow(&desc); + return code; + } +} + static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { SVgObj *pVgroup = pMsg->pVgroup; SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; @@ -1752,23 +1743,23 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { pMsg->pTable = (STableObj *)pTable; mnodeIncTableRef(pMsg->pTable); - SSdbOper desc = {0}; - desc.type = SDB_OPER_GLOBAL; - desc.pObj = pTable; - desc.table = tsChildTableSdb; - desc.pMsg = pMsg; - desc.cb = mnodeDoCreateChildTableCb; + SSdbOper desc = { + .type = SDB_OPER_GLOBAL, + .pObj = pTable, + .table = tsChildTableSdb, + .pMsg = pMsg, + .reqFp = mnodeDoCreateChildTableFp + }; int32_t code = sdbInsertRow(&desc); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mnodeDestroyChildTable(pTable); pMsg->pTable = NULL; - mError("app:%p:%p, table:%s, update sdb error, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId, + mError("app:%p:%p, table:%s, failed to create, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId, tstrerror(code)); - return code; - } else { - return TSDB_CODE_MND_ACTION_IN_PROGRESS; } + + return code; } static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { @@ -1813,7 +1804,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { return terrno; } else { mDebug("app:%p:%p, table:%s, send create msg to vnode again", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId); - return mnodeDoCreateChildTableCb(pMsg, TSDB_CODE_SUCCESS); + return mnodeDoCreateChildTableFp(pMsg); } } @@ -1878,13 +1869,15 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) { .table = tsChildTableSdb, .pObj = pTable, .pMsg = pMsg, - .cb = mnodeDropChildTableCb + .writeCb = mnodeDropChildTableCb }; int32_t code = sdbDeleteRow(&oper); - if (code == TSDB_CODE_SUCCESS) { - return TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("app:%p:%p, ctable:%s, failed to drop, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, + tstrerror(code)); } + return code; } @@ -1980,15 +1973,10 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3 .table = tsChildTableSdb, .pObj = pTable, .pMsg = pMsg, - .cb = mnodeAlterNormalTableColumnCb + .writeCb = mnodeAlterNormalTableColumnCb }; - int32_t code = sdbUpdateRow(&oper); - if (code == TSDB_CODE_SUCCESS) { - return TSDB_CODE_MND_ACTION_IN_PROGRESS; - } - - return code; + return sdbUpdateRow(&oper); } static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) { @@ -2018,15 +2006,10 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) { .table = tsChildTableSdb, .pObj = pTable, .pMsg = pMsg, - .cb = mnodeAlterNormalTableColumnCb + .writeCb = mnodeAlterNormalTableColumnCb }; - int32_t code = sdbUpdateRow(&oper); - if (code == TSDB_CODE_SUCCESS) { - return TSDB_CODE_MND_ACTION_IN_PROGRESS; - } - - return code; + return sdbUpdateRow(&oper); } static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char *newName) { @@ -2060,15 +2043,10 @@ static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char .table = tsChildTableSdb, .pObj = pTable, .pMsg = pMsg, - .cb = mnodeAlterNormalTableColumnCb + .writeCb = mnodeAlterNormalTableColumnCb }; - int32_t code = sdbUpdateRow(&oper); - if (code == TSDB_CODE_SUCCESS) { - code = TSDB_CODE_MND_ACTION_IN_PROGRESS; - } - - return code; + return sdbUpdateRow(&oper); } static int32_t mnodeSetSchemaFromNormalTable(SSchema *pSchema, SChildTableObj *pTable) { @@ -2374,19 +2352,19 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { } if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { - SCMCreateTableMsg *pCreate = mnodeMsg->rpcMsg.pCont; - if (pCreate->getMeta) { - mDebug("app:%p:%p, table:%s, created in dnode and continue to get meta, thandle:%p result:%s", - mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, mnodeMsg->rpcMsg.handle, - tstrerror(rpcMsg->code)); - - mnodeMsg->retry = 0; - dnodeReprocessMnodeWriteMsg(mnodeMsg); - } else { - mDebug("app:%p:%p, table:%s, created in dnode, thandle:%p result:%s", mnodeMsg->rpcMsg.ahandle, mnodeMsg, - pTable->info.tableId, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); - - dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS); + SSdbOper desc = { + .type = SDB_OPER_GLOBAL, + .pObj = pTable, + .table = tsChildTableSdb, + .pMsg = mnodeMsg, + .writeCb = mnodeDoCreateChildTableCb + }; + + int32_t code = sdbInsertRowImp(&desc); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mnodeMsg->pTable = NULL; + mnodeDestroyChildTable(pTable); + dnodeSendRpcMnodeWriteRsp(mnodeMsg, code); } } else { if (mnodeMsg->retry++ < 10) { diff --git a/src/mnode/src/mnodeUser.c b/src/mnode/src/mnodeUser.c index 8c783eebaf..a875cff4a2 100644 --- a/src/mnode/src/mnodeUser.c +++ b/src/mnode/src/mnodeUser.c @@ -182,9 +182,10 @@ static int32_t mnodeUpdateUser(SUserObj *pUser, void *pMsg) { }; int32_t code = sdbUpdateRow(&oper); - if (code == TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("user:%s, failed to alter by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code)); + } else { mLInfo("user:%s, is altered by %s", pUser->user, mnodeGetUserFromMsg(pMsg)); - if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } return code; @@ -236,11 +237,11 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) { }; code = sdbInsertRow(&oper); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("user:%s, failed to create by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code)); tfree(pUser); } else { mLInfo("user:%s, is created by %s", pUser->user, mnodeGetUserFromMsg(pMsg)); - if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } return code; @@ -255,9 +256,10 @@ static int32_t mnodeDropUser(SUserObj *pUser, void *pMsg) { }; int32_t code = sdbDeleteRow(&oper); - if (code == TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("user:%s, failed to drop by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code)); + } else { mLInfo("user:%s, is dropped by %s", pUser->user, mnodeGetUserFromMsg(pMsg)); - if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } return code; diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 089168c31e..df5df8c94a 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -256,7 +256,8 @@ void mnodeUpdateVgroup(SVgObj *pVgroup) { .pObj = pVgroup }; - if (sdbUpdateRow(&oper) != TSDB_CODE_SUCCESS) { + int32_t code = sdbUpdateRow(&oper); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("vgId:%d, failed to update vgroup", pVgroup->vgId); } mnodeSendAlterVgroupMsg(pVgroup); @@ -476,23 +477,11 @@ void *mnodeGetNextVgroup(void *pIter, SVgObj **pVgroup) { return sdbFetchRow(tsVgroupSdb, pIter, (void **)pVgroup); } -static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) { +static int32_t mnodeCreateVgroupFp(SMnodeMsg *pMsg) { SVgObj *pVgroup = pMsg->pVgroup; SDbObj *pDb = pMsg->pDb; assert(pVgroup); - if (code != TSDB_CODE_SUCCESS) { - mError("app:%p:%p, vgId:%d, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, - tstrerror(code)); - SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb}; - sdbDeleteRow(&desc); - return code; - } else { - pVgroup->status = TAOS_VG_STATUS_READY; - SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb}; - (void)sdbUpdateRow(&desc); - } - mInfo("app:%p:%p, vgId:%d, is created in mnode, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { @@ -508,6 +497,29 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } +static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) { + SVgObj *pVgroup = pMsg->pVgroup; + SDbObj *pDb = pMsg->pDb; + assert(pVgroup); + + if (code != TSDB_CODE_SUCCESS) { + mError("app:%p:%p, vgId:%d, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, + tstrerror(code)); + SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb}; + sdbDeleteRow(&desc); + return code; + } else { + mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, + pDb->name, pVgroup->numOfVnodes); + pVgroup->status = TAOS_VG_STATUS_READY; + SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb}; + (void)sdbUpdateRow(&desc); + + dnodeReprocessMnodeWriteMsg(pMsg); + return TSDB_CODE_MND_ACTION_IN_PROGRESS; + } +} + int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) { if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR; SDbObj *pDb = pMsg->pDb; @@ -527,20 +539,18 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) { mnodeIncVgroupRef(pVgroup); SSdbOper oper = { - .type = SDB_OPER_GLOBAL, - .table = tsVgroupSdb, - .pObj = pVgroup, + .type = SDB_OPER_GLOBAL, + .table = tsVgroupSdb, + .pObj = pVgroup, .rowSize = sizeof(SVgObj), - .pMsg = pMsg, - .cb = mnodeCreateVgroupCb + .pMsg = pMsg, + .reqFp = mnodeCreateVgroupFp }; int32_t code = sdbInsertRow(&oper); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { pMsg->pVgroup = NULL; mnodeDestroyVgroup(pVgroup); - } else { - code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } return code; @@ -891,7 +901,21 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { if (mnodeMsg->received != mnodeMsg->expected) return; if (mnodeMsg->received == mnodeMsg->successed) { - dnodeReprocessMnodeWriteMsg(mnodeMsg); + SSdbOper oper = { + .type = SDB_OPER_GLOBAL, + .table = tsVgroupSdb, + .pObj = pVgroup, + .rowSize = sizeof(SVgObj), + .pMsg = mnodeMsg, + .writeCb = mnodeCreateVgroupCb + }; + + int32_t code = sdbInsertRowImp(&oper); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mnodeMsg->pVgroup = NULL; + mnodeDestroyVgroup(pVgroup); + dnodeSendRpcMnodeWriteRsp(mnodeMsg, code); + } } else { SSdbOper oper = { .type = SDB_OPER_GLOBAL,