From c0b8234ffe7b06d688dfa823f67a5df8fd21b61a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 8 Feb 2022 17:08:40 +0800 Subject: [PATCH] update stb --- source/dnode/mnode/impl/src/mndStb.c | 133 +++++++++++++++++++++------ 1 file changed, 104 insertions(+), 29 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index ba0b5aac9d..bb19d906fb 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -465,8 +465,8 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) { SStbObj stbObj = {0}; - tstrncpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); - tstrncpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN); + memcpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); + memcpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN); stbObj.createdTime = taosGetTimestampMs(); stbObj.updateTime = stbObj.createdTime; stbObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); @@ -477,17 +477,13 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCr stbObj.numOfTags = pCreate->numOfTags; stbObj.pColumns = malloc(stbObj.numOfColumns * sizeof(SSchema)); - if (stbObj.pColumns == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - memcpy(stbObj.pColumns, pCreate->pSchema, stbObj.numOfColumns * sizeof(SSchema)); - stbObj.pTags = malloc(stbObj.numOfTags * sizeof(SSchema)); - if (stbObj.pTags == NULL) { + if (stbObj.pColumns == NULL || stbObj.pTags == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + + memcpy(stbObj.pColumns, pCreate->pSchema, stbObj.numOfColumns * sizeof(SSchema)); memcpy(stbObj.pTags, pCreate->pSchema + stbObj.numOfColumns, stbObj.numOfTags * sizeof(SSchema)); for (int32_t i = 0; i < stbObj.numOfColumns; ++i) { @@ -853,41 +849,119 @@ static int32_t mndChangeSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, con return 0; } -static int32_t mndUpdateStb(const SMAlterStbReq *pAlter, const SStbObj *pOld, SStbObj *pNew) { - int32_t code = 0; + +static int32_t mndSetUpdateStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { + SSdbRaw *pRedoRaw = mndStbActionEncode(pStb); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1; + + return 0; +} + +static int32_t mndSetUpdateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { + SSdbRaw *pCommitRaw = mndStbActionEncode(pStb); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1; + + return 0; +} + + +static int32_t mndSetUpdateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { + SSdb *pSdb = pMnode->pSdb; + SVgObj *pVgroup = NULL; + void *pIter = NULL; + int32_t contLen; + + while (1) { + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + if (pVgroup->dbUid != pDb->uid) continue; + + void *pReq = mndBuildCreateStbReq(pMnode, pVgroup, pStb, &contLen); + if (pReq == NULL) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + action.pCont = pReq; + action.contLen = contLen; + action.msgType = TDMT_VND_CREATE_STB; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pReq); + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + return -1; + } + sdbRelease(pSdb, pVgroup); + } + + return 0; +} + +static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) { + SStbObj stbObj = {0}; + taosRLockLatch(&pOld->lock); + memcpy(&stbObj, pOld, sizeof(SStbObj)); + stbObj.pColumns = NULL; + stbObj.pTags = NULL; + stbObj.updateTime = taosGetTimestampMs(); + taosRUnLockLatch(&pOld->lock); + + int32_t code = -1; switch (pAlter->alterType) { case TSDB_ALTER_TABLE_ADD_TAG_COLUMN: - code = mndAddSuperTableTag(pOld, pNew, pAlter->pSchema, 1); + code = mndAddSuperTableTag(pOld, &stbObj, pAlter->pSchema, 1); break; case TSDB_ALTER_TABLE_DROP_TAG_COLUMN: - code = mndDropSuperTableTag(pOld, pNew, pAlter->pSchema[0].name); + code = mndDropSuperTableTag(pOld, &stbObj, pAlter->pSchema[0].name); break; case TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN: - code = mndModifySuperTableTagName(pOld, pNew, pAlter->pSchema[0].name, pAlter->pSchema[1].name); + code = mndModifySuperTableTagName(pOld, &stbObj, pAlter->pSchema[0].name, pAlter->pSchema[1].name); break; case TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN: - code = mndChangeSuperTableTag(pOld, pNew, &pAlter->pSchema[0]); + code = mndChangeSuperTableTag(pOld, &stbObj, &pAlter->pSchema[0]); break; case TSDB_ALTER_TABLE_ADD_COLUMN: - code = mndAddSuperTableColumn(pOld, pNew, pAlter->pSchema, 1); + code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pSchema, 1); break; case TSDB_ALTER_TABLE_DROP_COLUMN: - code = mndDropSuperTableColumn(pOld, pNew, pAlter->pSchema[0].name); + code = mndDropSuperTableColumn(pOld, &stbObj, pAlter->pSchema[0].name); break; case TSDB_ALTER_TABLE_CHANGE_COLUMN: - code = mndChangeSuperTableColumn(pOld, pNew, &pAlter->pSchema[0]); + code = mndChangeSuperTableColumn(pOld, &stbObj, &pAlter->pSchema[0]); break; default: terrno = TSDB_CODE_MND_INVALID_STB_OPTION; break; } - if (code != 0) { - tfree(pNew->pTags); - tfree(pNew->pColumns); - } + if (code != 0) goto UPDATE_STB_OVER; + code = -1; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); + if (pTrans == NULL) goto UPDATE_STB_OVER; + + mDebug("trans:%d, used to update stb:%s", pTrans->id, pAlter->name); + + if (mndSetUpdateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto UPDATE_STB_OVER; + if (mndSetUpdateStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto UPDATE_STB_OVER; + if (mndSetUpdateStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto UPDATE_STB_OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto UPDATE_STB_OVER; + + code = 0; + +UPDATE_STB_OVER: + mndTransDrop(pTrans); + tfree(stbObj.pTags); + tfree(stbObj.pColumns); return code; } @@ -909,14 +983,15 @@ static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) { return -1; } - SStbObj stbObj = {0}; - taosRLockLatch(&pStb->lock); - memcpy(&stbObj, pStb, sizeof(SStbObj)); - stbObj.pColumns = NULL; - stbObj.pTags = NULL; - taosRUnLockLatch(&pStb->lock); + SDbObj *pDb = mndAcquireDbByStb(pMnode, pAlter->name); + if (pDb == NULL) { + mndReleaseStb(pMnode, pStb); + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + mError("stb:%s, failed to update since %s", pAlter->name, terrstr()); + return -1; + } - int32_t code = mndUpdateStb(pAlter, pStb, &stbObj); + int32_t code = mndUpdateStb(pMnode, pReq, pAlter, pDb, pStb); mndReleaseStb(pMnode, pStb); if (code != 0) {