update stb
This commit is contained in:
parent
f24e04e819
commit
c0b8234ffe
|
@ -465,8 +465,8 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
|
||||
static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) {
|
||||
SStbObj stbObj = {0};
|
||||
tstrncpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
tstrncpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
memcpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
memcpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
stbObj.createdTime = taosGetTimestampMs();
|
||||
stbObj.updateTime = stbObj.createdTime;
|
||||
stbObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
|
@ -477,17 +477,13 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCr
|
|||
stbObj.numOfTags = pCreate->numOfTags;
|
||||
|
||||
stbObj.pColumns = malloc(stbObj.numOfColumns * sizeof(SSchema));
|
||||
if (stbObj.pColumns == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
memcpy(stbObj.pColumns, pCreate->pSchema, stbObj.numOfColumns * sizeof(SSchema));
|
||||
|
||||
stbObj.pTags = malloc(stbObj.numOfTags * sizeof(SSchema));
|
||||
if (stbObj.pTags == NULL) {
|
||||
if (stbObj.pColumns == NULL || stbObj.pTags == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(stbObj.pColumns, pCreate->pSchema, stbObj.numOfColumns * sizeof(SSchema));
|
||||
memcpy(stbObj.pTags, pCreate->pSchema + stbObj.numOfColumns, stbObj.numOfTags * sizeof(SSchema));
|
||||
|
||||
for (int32_t i = 0; i < stbObj.numOfColumns; ++i) {
|
||||
|
@ -853,41 +849,119 @@ static int32_t mndChangeSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, con
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndUpdateStb(const SMAlterStbReq *pAlter, const SStbObj *pOld, SStbObj *pNew) {
|
||||
int32_t code = 0;
|
||||
|
||||
static int32_t mndSetUpdateStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetUpdateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static int32_t mndSetUpdateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SVgObj *pVgroup = NULL;
|
||||
void *pIter = NULL;
|
||||
int32_t contLen;
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (pVgroup->dbUid != pDb->uid) continue;
|
||||
|
||||
void *pReq = mndBuildCreateStbReq(pMnode, pVgroup, pStb, &contLen);
|
||||
if (pReq == NULL) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
action.pCont = pReq;
|
||||
action.contLen = contLen;
|
||||
action.msgType = TDMT_VND_CREATE_STB;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pReq);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
return -1;
|
||||
}
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
|
||||
SStbObj stbObj = {0};
|
||||
taosRLockLatch(&pOld->lock);
|
||||
memcpy(&stbObj, pOld, sizeof(SStbObj));
|
||||
stbObj.pColumns = NULL;
|
||||
stbObj.pTags = NULL;
|
||||
stbObj.updateTime = taosGetTimestampMs();
|
||||
taosRUnLockLatch(&pOld->lock);
|
||||
|
||||
int32_t code = -1;
|
||||
|
||||
switch (pAlter->alterType) {
|
||||
case TSDB_ALTER_TABLE_ADD_TAG_COLUMN:
|
||||
code = mndAddSuperTableTag(pOld, pNew, pAlter->pSchema, 1);
|
||||
code = mndAddSuperTableTag(pOld, &stbObj, pAlter->pSchema, 1);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_DROP_TAG_COLUMN:
|
||||
code = mndDropSuperTableTag(pOld, pNew, pAlter->pSchema[0].name);
|
||||
code = mndDropSuperTableTag(pOld, &stbObj, pAlter->pSchema[0].name);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN:
|
||||
code = mndModifySuperTableTagName(pOld, pNew, pAlter->pSchema[0].name, pAlter->pSchema[1].name);
|
||||
code = mndModifySuperTableTagName(pOld, &stbObj, pAlter->pSchema[0].name, pAlter->pSchema[1].name);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN:
|
||||
code = mndChangeSuperTableTag(pOld, pNew, &pAlter->pSchema[0]);
|
||||
code = mndChangeSuperTableTag(pOld, &stbObj, &pAlter->pSchema[0]);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_ADD_COLUMN:
|
||||
code = mndAddSuperTableColumn(pOld, pNew, pAlter->pSchema, 1);
|
||||
code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pSchema, 1);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_DROP_COLUMN:
|
||||
code = mndDropSuperTableColumn(pOld, pNew, pAlter->pSchema[0].name);
|
||||
code = mndDropSuperTableColumn(pOld, &stbObj, pAlter->pSchema[0].name);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_CHANGE_COLUMN:
|
||||
code = mndChangeSuperTableColumn(pOld, pNew, &pAlter->pSchema[0]);
|
||||
code = mndChangeSuperTableColumn(pOld, &stbObj, &pAlter->pSchema[0]);
|
||||
break;
|
||||
default:
|
||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||
break;
|
||||
}
|
||||
|
||||
if (code != 0) {
|
||||
tfree(pNew->pTags);
|
||||
tfree(pNew->pColumns);
|
||||
}
|
||||
if (code != 0) goto UPDATE_STB_OVER;
|
||||
|
||||
code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto UPDATE_STB_OVER;
|
||||
|
||||
mDebug("trans:%d, used to update stb:%s", pTrans->id, pAlter->name);
|
||||
|
||||
if (mndSetUpdateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto UPDATE_STB_OVER;
|
||||
if (mndSetUpdateStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto UPDATE_STB_OVER;
|
||||
if (mndSetUpdateStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto UPDATE_STB_OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto UPDATE_STB_OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
UPDATE_STB_OVER:
|
||||
mndTransDrop(pTrans);
|
||||
tfree(stbObj.pTags);
|
||||
tfree(stbObj.pColumns);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -909,14 +983,15 @@ static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
SStbObj stbObj = {0};
|
||||
taosRLockLatch(&pStb->lock);
|
||||
memcpy(&stbObj, pStb, sizeof(SStbObj));
|
||||
stbObj.pColumns = NULL;
|
||||
stbObj.pTags = NULL;
|
||||
taosRUnLockLatch(&pStb->lock);
|
||||
SDbObj *pDb = mndAcquireDbByStb(pMnode, pAlter->name);
|
||||
if (pDb == NULL) {
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
mError("stb:%s, failed to update since %s", pAlter->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = mndUpdateStb(pAlter, pStb, &stbObj);
|
||||
int32_t code = mndUpdateStb(pMnode, pReq, pAlter, pDb, pStb);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
|
||||
if (code != 0) {
|
||||
|
|
Loading…
Reference in New Issue