minor changes
This commit is contained in:
parent
a15551d6f9
commit
2158727db3
|
@ -421,11 +421,12 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
|
|||
pMgmt->totalVnodes = numOfVnodes;
|
||||
|
||||
int32_t threadNum = pDnode->env.numOfCores;
|
||||
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
|
||||
#if 1
|
||||
vnodesPerThread = 1;
|
||||
threadNum = 1;
|
||||
#endif
|
||||
|
||||
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
|
||||
|
||||
SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread));
|
||||
for (int32_t t = 0; t < threadNum; ++t) {
|
||||
threads[t].threadIndex = t;
|
||||
|
|
|
@ -276,12 +276,12 @@ static SVDropTbReq *mndBuildDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj
|
|||
pDrop->head.contLen = htonl(contLen);
|
||||
pDrop->head.vgId = htonl(pVgroup->vgId);
|
||||
memcpy(pDrop->name, pStb->name, TSDB_TABLE_FNAME_LEN);
|
||||
// pDrop->suid = htobe64(pStb->uid);
|
||||
pDrop->suid = htobe64(pStb->uid);
|
||||
|
||||
return pDrop;
|
||||
}
|
||||
|
||||
static int32_t mndCheckCreateStbMsg(SMCreateStbReq *pCreate) {
|
||||
static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
|
||||
pCreate->numOfColumns = htonl(pCreate->numOfColumns);
|
||||
pCreate->numOfTags = htonl(pCreate->numOfTags);
|
||||
int32_t totalCols = pCreate->numOfColumns + pCreate->numOfTags;
|
||||
|
@ -363,8 +363,8 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
if (pIter == NULL) break;
|
||||
if (pVgroup->dbUid != pDb->uid) continue;
|
||||
|
||||
void *pMsg = mndBuildCreateStbReq(pMnode, pVgroup, pStb, &contLen);
|
||||
if (pMsg == NULL) {
|
||||
void *pReq = mndBuildCreateStbReq(pMnode, pVgroup, pStb, &contLen);
|
||||
if (pReq == NULL) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -373,11 +373,11 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
action.pCont = pMsg;
|
||||
action.pCont = pReq;
|
||||
action.contLen = contLen;
|
||||
action.msgType = TDMT_VND_CREATE_STB;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
free(pReq);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
return -1;
|
||||
|
@ -398,8 +398,8 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
if (pIter == NULL) break;
|
||||
if (pVgroup->dbUid != pDb->uid) continue;
|
||||
|
||||
SVDropTbReq *pMsg = mndBuildDropStbReq(pMnode, pVgroup, pStb);
|
||||
if (pMsg == NULL) {
|
||||
SVDropTbReq *pReq = mndBuildDropStbReq(pMnode, pVgroup, pStb);
|
||||
if (pReq == NULL) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -408,11 +408,11 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
action.pCont = pMsg;
|
||||
action.pCont = pReq;
|
||||
action.contLen = sizeof(SVDropTbReq);
|
||||
action.msgType = TDMT_VND_DROP_STB;
|
||||
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
free(pReq);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
return -1;
|
||||
|
@ -423,7 +423,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SMCreateStbReq *pCreate, SDbObj *pDb) {
|
||||
static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) {
|
||||
SStbObj stbObj = {0};
|
||||
tstrncpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
tstrncpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
|
@ -449,43 +449,17 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SMCreateStbReq *pCr
|
|||
}
|
||||
|
||||
int32_t code = 0;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto CREATE_STB_OVER;
|
||||
|
||||
mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
|
||||
|
||||
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) {
|
||||
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
|
||||
goto CREATE_STB_OVER;
|
||||
}
|
||||
|
||||
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, &stbObj) != 0) {
|
||||
mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
|
||||
goto CREATE_STB_OVER;
|
||||
}
|
||||
|
||||
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) {
|
||||
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
|
||||
goto CREATE_STB_OVER;
|
||||
}
|
||||
|
||||
if (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) {
|
||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||
goto CREATE_STB_OVER;
|
||||
}
|
||||
|
||||
if (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, &stbObj) != 0) {
|
||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||
goto CREATE_STB_OVER;
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
|
||||
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
|
||||
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
|
||||
if (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
|
||||
if (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_STB_OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
|
@ -494,13 +468,13 @@ CREATE_STB_OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessMCreateStbReq(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
SMCreateStbReq *pCreate = pMsg->rpcMsg.pCont;
|
||||
static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SMCreateStbReq *pCreate = pReq->rpcMsg.pCont;
|
||||
|
||||
mDebug("stb:%s, start to create", pCreate->name);
|
||||
|
||||
if (mndCheckCreateStbMsg(pCreate) != 0) {
|
||||
if (mndCheckCreateStbReq(pCreate) != 0) {
|
||||
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
@ -536,7 +510,7 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pMsg) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = mndCreateStb(pMnode, pMsg, pCreate, pDb);
|
||||
int32_t code = mndCreateStb(pMnode, pReq, pCreate, pDb);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
|
||||
if (code != 0) {
|
||||
|
@ -548,12 +522,12 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pMsg) {
|
|||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pMsg) {
|
||||
mndTransProcessRsp(pMsg);
|
||||
static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) {
|
||||
mndTransProcessRsp(pRsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCheckAlterStbMsg(SMAlterStbReq *pAlter) {
|
||||
static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
|
||||
SSchema *pSchema = &pAlter->schema;
|
||||
pSchema->colId = htonl(pSchema->colId);
|
||||
pSchema->bytes = htonl(pSchema->bytes);
|
||||
|
@ -578,15 +552,15 @@ static int32_t mndCheckAlterStbMsg(SMAlterStbReq *pAlter) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pOld, SStbObj *pNew) { return 0; }
|
||||
static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pReq, SStbObj *pOld, SStbObj *pNew) { return 0; }
|
||||
|
||||
static int32_t mndProcessMAlterStbReq(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
SMAlterStbReq *pAlter = pMsg->rpcMsg.pCont;
|
||||
static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SMAlterStbReq *pAlter = pReq->rpcMsg.pCont;
|
||||
|
||||
mDebug("stb:%s, start to alter", pAlter->name);
|
||||
|
||||
if (mndCheckAlterStbMsg(pAlter) != 0) {
|
||||
if (mndCheckAlterStbReq(pAlter) != 0) {
|
||||
mError("stb:%s, failed to alter since %s", pAlter->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
@ -601,7 +575,7 @@ static int32_t mndProcessMAlterStbReq(SMnodeMsg *pMsg) {
|
|||
SStbObj stbObj = {0};
|
||||
memcpy(&stbObj, pStb, sizeof(SStbObj));
|
||||
|
||||
int32_t code = mndUpdateStb(pMnode, pMsg, pStb, &stbObj);
|
||||
int32_t code = mndUpdateStb(pMnode, pReq, pStb, &stbObj);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
|
||||
if (code != 0) {
|
||||
|
@ -612,8 +586,8 @@ static int32_t mndProcessMAlterStbReq(SMnodeMsg *pMsg) {
|
|||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pMsg) {
|
||||
mndTransProcessRsp(pMsg);
|
||||
static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pRsp) {
|
||||
mndTransProcessRsp(pRsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -648,44 +622,19 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SStbObj
|
|||
|
||||
static int32_t mndSetDropStbUndoActions(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { return 0; }
|
||||
|
||||
static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) {
|
||||
static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pReq, SStbObj *pStb) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("stb:%s, failed to drop since %s", pStb->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||
if (pTrans == NULL)goto DROP_STB_OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
|
||||
|
||||
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) {
|
||||
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
|
||||
goto DROP_STB_OVER;
|
||||
}
|
||||
|
||||
if (mndSetDropStbUndoLogs(pMnode, pTrans, pStb) != 0) {
|
||||
mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
|
||||
goto DROP_STB_OVER;
|
||||
}
|
||||
|
||||
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) {
|
||||
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
|
||||
goto DROP_STB_OVER;
|
||||
}
|
||||
|
||||
if (mndSetDropStbRedoActions(pMnode, pTrans, pStb) != 0) {
|
||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||
goto DROP_STB_OVER;
|
||||
}
|
||||
|
||||
if (mndSetDropStbUndoActions(pMnode, pTrans, pStb) != 0) {
|
||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||
goto DROP_STB_OVER;
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
goto DROP_STB_OVER;
|
||||
}
|
||||
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
||||
if (mndSetDropStbUndoLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
||||
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
||||
if (mndSetDropStbRedoActions(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
||||
if (mndSetDropStbUndoActions(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_STB_OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
|
@ -694,9 +643,9 @@ DROP_STB_OVER:
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessMDropStbReq(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
SMDropStbReq *pDrop = pMsg->rpcMsg.pCont;
|
||||
static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SMDropStbReq *pDrop = pReq->rpcMsg.pCont;
|
||||
|
||||
mDebug("stb:%s, start to drop", pDrop->name);
|
||||
|
||||
|
@ -712,7 +661,7 @@ static int32_t mndProcessMDropStbReq(SMnodeMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t code = mndDropStb(pMnode, pMsg, pStb);
|
||||
int32_t code = mndDropStb(pMnode, pReq, pStb);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
|
||||
if (code != 0) {
|
||||
|
@ -724,14 +673,14 @@ static int32_t mndProcessMDropStbReq(SMnodeMsg *pMsg) {
|
|||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static int32_t mndProcessVDropStbRsp(SMnodeMsg *pMsg) {
|
||||
mndTransProcessRsp(pMsg);
|
||||
static int32_t mndProcessVDropStbRsp(SMnodeMsg *pRsp) {
|
||||
mndTransProcessRsp(pRsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessStbMetaReq(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
|
||||
static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
STableInfoReq *pInfo = pReq->rpcMsg.pCont;
|
||||
|
||||
mDebug("stb:%s, start to retrieve meta", pInfo->tableFname);
|
||||
|
||||
|
@ -786,8 +735,8 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pMsg) {
|
|||
mndReleaseDb(pMnode, pDb);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
|
||||
pMsg->pCont = pMeta;
|
||||
pMsg->contLen = contLen;
|
||||
pReq->pCont = pMeta;
|
||||
pReq->contLen = contLen;
|
||||
|
||||
mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pStb->numOfColumns, pStb->numOfTags);
|
||||
return 0;
|
||||
|
@ -820,8 +769,8 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
if (mndGetNumOfStbs(pMnode, pShow->db, &pShow->numOfRows) != 0) {
|
||||
|
@ -883,8 +832,8 @@ static void mndExtractTableName(char *tableId, char *name) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
SStbObj *pStb = NULL;
|
||||
|
|
Loading…
Reference in New Issue