minor changes
This commit is contained in:
parent
2af3b3a2bc
commit
19e3f91df5
|
@ -667,25 +667,25 @@ typedef struct {
|
|||
int8_t walLevel;
|
||||
int8_t quorum;
|
||||
int8_t cacheLastRow;
|
||||
} SAlterDbMsg;
|
||||
} SAlterDbReq;
|
||||
|
||||
typedef struct {
|
||||
char db[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t ignoreNotExists;
|
||||
} SDropDbMsg;
|
||||
} SDropDbReq;
|
||||
|
||||
typedef struct {
|
||||
char db[TSDB_TABLE_FNAME_LEN];
|
||||
int32_t vgVersion;
|
||||
} SUseDbMsg;
|
||||
} SUseDbReq;
|
||||
|
||||
typedef struct {
|
||||
char db[TSDB_TABLE_FNAME_LEN];
|
||||
} SSyncDbMsg;
|
||||
} SSyncDbReq;
|
||||
|
||||
typedef struct {
|
||||
char db[TSDB_TABLE_FNAME_LEN];
|
||||
} SCompactDbMsg;
|
||||
} SCompactDbReq;
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_FUNC_NAME_LEN];
|
||||
|
@ -699,16 +699,16 @@ typedef struct {
|
|||
int32_t commentSize;
|
||||
int32_t codeSize;
|
||||
char pCont[];
|
||||
} SCreateFuncMsg;
|
||||
} SCreateFuncReq;
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_FUNC_NAME_LEN];
|
||||
} SDropFuncMsg;
|
||||
} SDropFuncReq;
|
||||
|
||||
typedef struct {
|
||||
int32_t numOfFuncs;
|
||||
char pFuncNames[];
|
||||
} SRetrieveFuncMsg;
|
||||
} SRetrieveFuncReq;
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_FUNC_NAME_LEN];
|
||||
|
@ -932,7 +932,7 @@ typedef struct {
|
|||
char db[TSDB_DB_FNAME_LEN];
|
||||
int32_t numOfVgroup;
|
||||
int32_t vgid[];
|
||||
} SCompactMsg;
|
||||
} SCompactReq;
|
||||
|
||||
typedef struct SShowRsp {
|
||||
int64_t showId;
|
||||
|
@ -1042,7 +1042,7 @@ typedef struct {
|
|||
int8_t align[7];
|
||||
char name[TSDB_STEP_NAME_LEN];
|
||||
char desc[TSDB_STEP_DESC_LEN];
|
||||
} SStartupMsg;
|
||||
} SStartupReq;
|
||||
|
||||
// mq related
|
||||
typedef struct {
|
||||
|
|
|
@ -167,7 +167,7 @@ typedef struct SDnode {
|
|||
SBnodeMgmt bmgmt;
|
||||
SVnodesMgmt vmgmt;
|
||||
STransMgmt tmgmt;
|
||||
SStartupMsg startup;
|
||||
SStartupReq startup;
|
||||
} SDnode;
|
||||
|
||||
EStat dndGetStat(SDnode *pDnode);
|
||||
|
@ -175,7 +175,7 @@ void dndSetStat(SDnode *pDnode, EStat stat);
|
|||
char *dndStatStr(EStat stat);
|
||||
|
||||
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc);
|
||||
void dndGetStartup(SDnode *pDnode, SStartupMsg *pStartup);
|
||||
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -473,12 +473,12 @@ static int32_t dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
|
|||
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
|
||||
dDebug("startup req is received");
|
||||
|
||||
SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg));
|
||||
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
|
||||
dndGetStartup(pDnode, pStartup);
|
||||
|
||||
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
|
||||
|
||||
SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupMsg)};
|
||||
SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq)};
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
|
||||
|
|
|
@ -45,14 +45,14 @@ char *dndStatStr(EStat stat) {
|
|||
}
|
||||
|
||||
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc) {
|
||||
SStartupMsg *pStartup = &pDnode->startup;
|
||||
SStartupReq *pStartup = &pDnode->startup;
|
||||
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
|
||||
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
|
||||
pStartup->finished = 0;
|
||||
}
|
||||
|
||||
void dndGetStartup(SDnode *pDnode, SStartupMsg *pStartup) {
|
||||
memcpy(pStartup, &pDnode->startup, sizeof(SStartupMsg));
|
||||
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) {
|
||||
memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq));
|
||||
pStartup->finished = (dndGetStat(pDnode) == DND_STAT_RUNNING);
|
||||
}
|
||||
|
||||
|
|
|
@ -194,7 +194,7 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
|
|||
SDbObj *mndAcquireDb(SMnode *pMnode, char *db) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SDbObj *pDb = sdbAcquire(pSdb, SDB_DB, db);
|
||||
if (pDb == NULL) {
|
||||
if (pDb == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||
}
|
||||
return pDb;
|
||||
|
@ -495,7 +495,7 @@ static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq) {
|
|||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static int32_t mndSetDbCfgFromAlterDbMsg(SDbObj *pDb, SAlterDbMsg *pAlter) {
|
||||
static int32_t mndSetDbCfgFromAlterDbMsg(SDbObj *pDb, SAlterDbReq *pAlter) {
|
||||
terrno = TSDB_CODE_MND_DB_OPTION_UNCHANGED;
|
||||
|
||||
if (pAlter->totalBlocks >= 0 && pAlter->totalBlocks != pDb->cfg.totalBlocks) {
|
||||
|
@ -646,7 +646,7 @@ UPDATE_DB_OVER:
|
|||
|
||||
static int32_t mndProcessAlterDbReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SAlterDbMsg *pAlter = pReq->rpcMsg.pCont;
|
||||
SAlterDbReq *pAlter = pReq->rpcMsg.pCont;
|
||||
pAlter->totalBlocks = htonl(pAlter->totalBlocks);
|
||||
pAlter->daysToKeep0 = htonl(pAlter->daysToKeep0);
|
||||
pAlter->daysToKeep1 = htonl(pAlter->daysToKeep1);
|
||||
|
@ -793,7 +793,7 @@ DROP_DB_OVER:
|
|||
|
||||
static int32_t mndProcessDropDbReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SDropDbMsg *pDrop = pReq->rpcMsg.pCont;
|
||||
SDropDbReq *pDrop = pReq->rpcMsg.pCont;
|
||||
|
||||
mDebug("db:%s, start to drop", pDrop->db);
|
||||
|
||||
|
@ -823,7 +823,7 @@ static int32_t mndProcessDropDbReq(SMnodeMsg *pReq) {
|
|||
static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SUseDbMsg *pUse = pReq->rpcMsg.pCont;
|
||||
SUseDbReq *pUse = pReq->rpcMsg.pCont;
|
||||
pUse->vgVersion = htonl(pUse->vgVersion);
|
||||
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, pUse->db);
|
||||
|
@ -836,6 +836,7 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
|
|||
int32_t contLen = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo);
|
||||
SUseDbRsp *pRsp = rpcMallocCont(contLen);
|
||||
if (pRsp == NULL) {
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
@ -890,7 +891,7 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
|
|||
|
||||
static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SSyncDbMsg *pSync = pReq->rpcMsg.pCont;
|
||||
SSyncDbReq *pSync = pReq->rpcMsg.pCont;
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, pSync->db);
|
||||
if (pDb == NULL) {
|
||||
mError("db:%s, failed to process sync db req since %s", pSync->db, terrstr());
|
||||
|
@ -903,7 +904,7 @@ static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq) {
|
|||
|
||||
static int32_t mndProcessCompactDbReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SCompactDbMsg *pCompact = pReq->rpcMsg.pCont;
|
||||
SCompactDbReq *pCompact = pReq->rpcMsg.pCont;
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, pCompact->db);
|
||||
if (pDb == NULL) {
|
||||
mError("db:%s, failed to process compact db req since %s", pCompact->db, terrstr());
|
||||
|
|
|
@ -26,7 +26,7 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw);
|
|||
static int32_t mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc);
|
||||
static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc);
|
||||
static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOldFunc, SFuncObj *pNewFunc);
|
||||
static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncMsg *pCreate);
|
||||
static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncReq *pCreate);
|
||||
static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pMsg, SFuncObj *pFunc);
|
||||
static int32_t mndProcessCreateFuncMsg(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessDropFuncMsg(SMnodeMsg *pMsg);
|
||||
|
@ -156,7 +156,7 @@ static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOldFunc, SFuncObj *pNe
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncMsg *pCreate) {
|
||||
static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncReq *pCreate) {
|
||||
SFuncObj *pFunc = calloc(1, sizeof(SFuncObj) + pCreate->commentSize + pCreate->codeSize);
|
||||
pFunc->createdTime = taosGetTimestampMs();
|
||||
pFunc->funcType = pCreate->funcType;
|
||||
|
@ -264,7 +264,7 @@ static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pMsg, SFuncObj *pFunc) {
|
|||
static int32_t mndProcessCreateFuncMsg(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
|
||||
SCreateFuncMsg *pCreate = pMsg->rpcMsg.pCont;
|
||||
SCreateFuncReq *pCreate = pMsg->rpcMsg.pCont;
|
||||
pCreate->outputLen = htonl(pCreate->outputLen);
|
||||
pCreate->bufSize = htonl(pCreate->bufSize);
|
||||
pCreate->sigature = htobe64(pCreate->sigature);
|
||||
|
@ -323,7 +323,7 @@ static int32_t mndProcessCreateFuncMsg(SMnodeMsg *pMsg) {
|
|||
|
||||
static int32_t mndProcessDropFuncMsg(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
SDropFuncMsg *pDrop = pMsg->rpcMsg.pCont;
|
||||
SDropFuncReq *pDrop = pMsg->rpcMsg.pCont;
|
||||
|
||||
mDebug("func:%s, start to drop", pDrop->name);
|
||||
|
||||
|
@ -353,7 +353,7 @@ static int32_t mndProcessDropFuncMsg(SMnodeMsg *pMsg) {
|
|||
static int32_t mndProcessRetrieveFuncMsg(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
|
||||
SRetrieveFuncMsg *pRetrieve = pMsg->rpcMsg.pCont;
|
||||
SRetrieveFuncReq *pRetrieve = pMsg->rpcMsg.pCont;
|
||||
pRetrieve->numOfFuncs = htonl(pRetrieve->numOfFuncs);
|
||||
|
||||
int32_t size = sizeof(SRetrieveFuncRsp) + (sizeof(SFuncInfo) + TSDB_FUNC_CODE_LEN) * pRetrieve->numOfFuncs + 16384;
|
||||
|
|
|
@ -136,9 +136,9 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
|
|||
CheckBinary("master", 9);
|
||||
|
||||
{
|
||||
int32_t contLen = sizeof(SAlterDbMsg);
|
||||
int32_t contLen = sizeof(SAlterDbReq);
|
||||
|
||||
SAlterDbMsg* pReq = (SAlterDbMsg*)rpcMallocCont(contLen);
|
||||
SAlterDbReq* pReq = (SAlterDbReq*)rpcMallocCont(contLen);
|
||||
strcpy(pReq->db, "1.d1");
|
||||
pReq->totalBlocks = htonl(12);
|
||||
pReq->daysToKeep0 = htonl(300);
|
||||
|
@ -205,9 +205,9 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
|
|||
CheckInt8(0); // update
|
||||
|
||||
{
|
||||
int32_t contLen = sizeof(SDropDbMsg);
|
||||
int32_t contLen = sizeof(SDropDbReq);
|
||||
|
||||
SDropDbMsg* pReq = (SDropDbMsg*)rpcMallocCont(contLen);
|
||||
SDropDbReq* pReq = (SDropDbReq*)rpcMallocCont(contLen);
|
||||
strcpy(pReq->db, "1.d1");
|
||||
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen);
|
||||
|
@ -261,9 +261,9 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
|
|||
CheckBinary("d2", TSDB_DB_NAME_LEN - 1);
|
||||
|
||||
{
|
||||
int32_t contLen = sizeof(SUseDbMsg);
|
||||
int32_t contLen = sizeof(SUseDbReq);
|
||||
|
||||
SUseDbMsg* pReq = (SUseDbMsg*)rpcMallocCont(contLen);
|
||||
SUseDbReq* pReq = (SUseDbReq*)rpcMallocCont(contLen);
|
||||
strcpy(pReq->db, "1.d2");
|
||||
pReq->vgVersion = htonl(-1);
|
||||
|
||||
|
@ -314,9 +314,9 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
|
|||
}
|
||||
|
||||
{
|
||||
int32_t contLen = sizeof(SDropDbMsg);
|
||||
int32_t contLen = sizeof(SDropDbReq);
|
||||
|
||||
SDropDbMsg* pReq = (SDropDbMsg*)rpcMallocCont(contLen);
|
||||
SDropDbReq* pReq = (SDropDbReq*)rpcMallocCont(contLen);
|
||||
strcpy(pReq->db, "1.d2");
|
||||
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen);
|
||||
|
|
|
@ -842,11 +842,11 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
|
|||
goto _error;
|
||||
}
|
||||
|
||||
SUseDbMsg* pUseDbMsg = (SUseDbMsg*)calloc(1, sizeof(SUseDbMsg));
|
||||
SUseDbReq* pUseDbMsg = (SUseDbReq*)calloc(1, sizeof(SUseDbReq));
|
||||
tNameExtractFullName(&n, pUseDbMsg->db);
|
||||
|
||||
pDcl->pMsg = (char*)pUseDbMsg;
|
||||
pDcl->msgLen = sizeof(SUseDbMsg);
|
||||
pDcl->msgLen = sizeof(SUseDbReq);
|
||||
pDcl->msgType = TDMT_MND_USE_DB;
|
||||
break;
|
||||
}
|
||||
|
@ -895,14 +895,14 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
|
|||
goto _error;
|
||||
}
|
||||
|
||||
SDropDbMsg* pDropDbMsg = (SDropDbMsg*)calloc(1, sizeof(SDropDbMsg));
|
||||
SDropDbReq* pDropDbMsg = (SDropDbReq*)calloc(1, sizeof(SDropDbReq));
|
||||
|
||||
code = tNameExtractFullName(&name, pDropDbMsg->db);
|
||||
pDropDbMsg->ignoreNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0;
|
||||
assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_DB_NAME_T);
|
||||
|
||||
pDcl->msgType = TDMT_MND_DROP_DB;
|
||||
pDcl->msgLen = sizeof(SDropDbMsg);
|
||||
pDcl->msgLen = sizeof(SDropDbReq);
|
||||
pDcl->pMsg = (char*)pDropDbMsg;
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *ms
|
|||
|
||||
SBuildUseDBInput* bInput = (SBuildUseDBInput *)input;
|
||||
|
||||
int32_t estimateSize = sizeof(SUseDbMsg);
|
||||
int32_t estimateSize = sizeof(SUseDbReq);
|
||||
if (NULL == *msg || msgSize < estimateSize) {
|
||||
tfree(*msg);
|
||||
*msg = rpcMallocCont(estimateSize);
|
||||
|
@ -68,7 +68,7 @@ int32_t queryBuildUseDbMsg(void* input, char **msg, int32_t msgSize, int32_t *ms
|
|||
}
|
||||
}
|
||||
|
||||
SUseDbMsg *bMsg = (SUseDbMsg *)*msg;
|
||||
SUseDbReq *bMsg = (SUseDbReq *)*msg;
|
||||
|
||||
strncpy(bMsg->db, bInput->db, sizeof(bMsg->db));
|
||||
bMsg->db[sizeof(bMsg->db) - 1] = 0;
|
||||
|
|
|
@ -338,7 +338,7 @@ void *taosNetInitRpc(char *secretEncrypt, char spi) {
|
|||
return pRpcConn;
|
||||
}
|
||||
|
||||
static int32_t taosNetCheckRpc(const char* serverFqdn, uint16_t port, uint16_t pktLen, char spi, SStartupMsg *pStep) {
|
||||
static int32_t taosNetCheckRpc(const char* serverFqdn, uint16_t port, uint16_t pktLen, char spi, SStartupReq *pStep) {
|
||||
SRpcEpSet epSet;
|
||||
SRpcMsg reqMsg;
|
||||
SRpcMsg rspMsg;
|
||||
|
@ -374,7 +374,7 @@ static int32_t taosNetCheckRpc(const char* serverFqdn, uint16_t port, uint16_t p
|
|||
}
|
||||
|
||||
int32_t code = 0;
|
||||
if (pStep != NULL && rspMsg.pCont != NULL && rspMsg.contLen > 0 && rspMsg.contLen <= sizeof(SStartupMsg)) {
|
||||
if (pStep != NULL && rspMsg.pCont != NULL && rspMsg.contLen > 0 && rspMsg.contLen <= sizeof(SStartupReq)) {
|
||||
memcpy(pStep, rspMsg.pCont, rspMsg.contLen);
|
||||
code = 1;
|
||||
}
|
||||
|
@ -384,8 +384,8 @@ static int32_t taosNetCheckRpc(const char* serverFqdn, uint16_t port, uint16_t p
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t taosNetParseStartup(SStartupMsg *pCont) {
|
||||
SStartupMsg *pStep = pCont;
|
||||
static int32_t taosNetParseStartup(SStartupReq *pCont) {
|
||||
SStartupReq *pStep = pCont;
|
||||
uInfo("step:%s desc:%s", pStep->name, pStep->desc);
|
||||
|
||||
if (pStep->finished) {
|
||||
|
@ -398,7 +398,7 @@ static int32_t taosNetParseStartup(SStartupMsg *pCont) {
|
|||
static void taosNetTestStartup(char *host, int32_t port) {
|
||||
uInfo("check startup, host:%s port:%d\n", host, port);
|
||||
|
||||
SStartupMsg *pStep = malloc(sizeof(SStartupMsg));
|
||||
SStartupReq *pStep = malloc(sizeof(SStartupReq));
|
||||
while (1) {
|
||||
int32_t code = taosNetCheckRpc(host, port + TSDB_PORT_DNODEDNODE, 20, 0, pStep);
|
||||
if (code > 0) {
|
||||
|
|
Loading…
Reference in New Issue