minor changes

This commit is contained in:
Shengliang Guan 2022-01-16 23:38:50 -08:00
parent ca10ef4490
commit a15551d6f9
3 changed files with 54 additions and 51 deletions

View File

@ -1332,7 +1332,7 @@ typedef struct {
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
int8_t ignoreNotExists;
int64_t suid;
} SVDropTbReq;
typedef struct {

View File

@ -422,6 +422,9 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
int32_t threadNum = pDnode->env.numOfCores;
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
#if 1
vnodesPerThread = 1;
#endif
SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread));
for (int32_t t = 0; t < threadNum; ++t) {

View File

@ -31,16 +31,16 @@ static SSdbRaw *mndStbActionEncode(SStbObj *pStb);
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb);
static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg);
static int32_t mndProcesSMAlterStbReq(SMnodeMsg *pMsg);
static int32_t mndProcesSMDropStbReq(SMnodeMsg *pMsg);
static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg);
static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew);
static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq);
static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq);
static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq);
static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp);
static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pRsp);
static int32_t mndProcessVDropStbRsp(SMnodeMsg *pRsp);
static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq);
static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
int32_t mndInitStb(SMnode *pMnode) {
@ -52,13 +52,13 @@ int32_t mndInitStb(SMnode *pMnode) {
.updateFp = (SdbUpdateFp)mndStbActionUpdate,
.deleteFp = (SdbDeleteFp)mndStbActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcesSMCreateStbReq);
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcesSMAlterStbReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcesSMDropStbReq);
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessCreateStbInRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessAlterStbInRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessDropStbInRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STB_META, mndProcessStbMetaMsg);
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq);
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessMDropStbReq);
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessVCreateStbRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessVAlterStbRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessVDropStbRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STB_META, mndProcessStbMetaReq);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_STB, mndGetStbMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
@ -177,27 +177,27 @@ static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
return 0;
}
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb) {
mTrace("stb:%s, perform update action, old row:%p new row:%p", pOldStb->name, pOldStb, pNewStb);
atomic_exchange_32(&pOldStb->updateTime, pNewStb->updateTime);
atomic_exchange_32(&pOldStb->version, pNewStb->version);
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
mTrace("stb:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
atomic_exchange_32(&pOld->updateTime, pNew->updateTime);
atomic_exchange_32(&pOld->version, pNew->version);
taosWLockLatch(&pOldStb->lock);
pOldStb->numOfColumns = pNewStb->numOfColumns;
pOldStb->numOfTags = pNewStb->numOfTags;
int32_t totalCols = pNewStb->numOfTags + pNewStb->numOfColumns;
taosWLockLatch(&pOld->lock);
pOld->numOfColumns = pNew->numOfColumns;
pOld->numOfTags = pNew->numOfTags;
int32_t totalCols = pNew->numOfTags + pNew->numOfColumns;
int32_t totalSize = totalCols * sizeof(SSchema);
if (pOldStb->numOfTags + pOldStb->numOfColumns < totalCols) {
if (pOld->numOfTags + pOld->numOfColumns < totalCols) {
void *pSchema = malloc(totalSize);
if (pSchema != NULL) {
free(pOldStb->pSchema);
pOldStb->pSchema = pSchema;
free(pOld->pSchema);
pOld->pSchema = pSchema;
}
}
memcpy(pOldStb->pSchema, pNewStb->pSchema, totalSize);
taosWUnLockLatch(&pOldStb->lock);
memcpy(pOld->pSchema, pNew->pSchema, totalSize);
taosWUnLockLatch(&pOld->lock);
return 0;
}
@ -215,7 +215,7 @@ void mndReleaseStb(SMnode *pMnode, SStbObj *pStb) {
sdbRelease(pSdb, pStb);
}
static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) {
static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) {
SName name = {0};
tNameFromString(&name, stbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
@ -225,17 +225,17 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) {
return mndAcquireDb(pMnode, db);
}
static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int *pContLen) {
static void *mndBuildCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
SVCreateTbReq req;
void *buf;
int bsize;
int32_t bsize;
SMsgHead *pMsgHead;
req.ver = 0;
SName name = {0};
tNameFromString(&name, pStb->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE);
tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
req.name = (char*) tNameGetTableName(&name);
req.name = (char *)tNameGetTableName(&name);
req.ttl = 0;
req.keep = 0;
req.type = TD_SUPER_TABLE;
@ -264,7 +264,7 @@ static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb
return buf;
}
static SVDropTbReq *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) {
static SVDropTbReq *mndBuildDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) {
int32_t contLen = sizeof(SVDropTbReq);
SVDropTbReq *pDrop = calloc(1, contLen);
@ -356,14 +356,14 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
int contLen;
int32_t contLen;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue;
void *pMsg = mndBuildCreateStbMsg(pMnode, pVgroup, pStb, &contLen);
void *pMsg = mndBuildCreateStbReq(pMnode, pVgroup, pStb, &contLen);
if (pMsg == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
@ -398,7 +398,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue;
SVDropTbReq *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb);
SVDropTbReq *pMsg = mndBuildDropStbReq(pMnode, pVgroup, pStb);
if (pMsg == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
@ -494,8 +494,8 @@ CREATE_STB_OVER:
return code;
}
static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
static int32_t mndProcessMCreateStbReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SMCreateStbReq *pCreate = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to create", pCreate->name);
@ -548,7 +548,7 @@ static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg) {
static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pMsg) {
mndTransProcessRsp(pMsg);
return 0;
}
@ -578,10 +578,10 @@ static int32_t mndCheckAlterStbMsg(SMAlterStbReq *pAlter) {
return 0;
}
static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pOldStb, SStbObj *pNewStb) { return 0; }
static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pOld, SStbObj *pNew) { return 0; }
static int32_t mndProcesSMAlterStbReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
static int32_t mndProcessMAlterStbReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SMAlterStbReq *pAlter = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to alter", pAlter->name);
@ -612,7 +612,7 @@ static int32_t mndProcesSMAlterStbReq(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg) {
static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pMsg) {
mndTransProcessRsp(pMsg);
return 0;
}
@ -694,8 +694,8 @@ DROP_STB_OVER:
return 0;
}
static int32_t mndProcesSMDropStbReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
static int32_t mndProcessMDropStbReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SMDropStbReq *pDrop = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to drop", pDrop->name);
@ -724,12 +724,12 @@ static int32_t mndProcesSMDropStbReq(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) {
static int32_t mndProcessVDropStbRsp(SMnodeMsg *pMsg) {
mndTransProcessRsp(pMsg);
return 0;
}
static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
static int32_t mndProcessStbMetaReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;