TD-10431 fix crash while test dnode create
This commit is contained in:
parent
442a8461ad
commit
6941b14878
|
@ -158,7 +158,7 @@ typedef enum {
|
|||
SDB_USER = 5,
|
||||
SDB_AUTH = 6,
|
||||
SDB_ACCT = 7,
|
||||
SDB_VGROUP = 9,
|
||||
SDB_VGROUP = 8,
|
||||
SDB_STB = 9,
|
||||
SDB_DB = 10,
|
||||
SDB_FUNC = 11,
|
||||
|
|
|
@ -46,7 +46,7 @@ int64_t tsDnodeStartTime = 0;
|
|||
// common
|
||||
int32_t tsRpcTimer = 300;
|
||||
int32_t tsRpcMaxTime = 600; // seconds;
|
||||
int32_t tsRpcForceTcp = 0; //disable this, means query, show command use udp protocol as default
|
||||
int32_t tsRpcForceTcp = 1; //disable this, means query, show command use udp protocol as default
|
||||
int32_t tsMaxShellConns = 50000;
|
||||
int32_t tsMaxConnections = 5000;
|
||||
int32_t tsShellActivityTimer = 3; // second
|
||||
|
@ -1583,7 +1583,7 @@ static void doInitGlobalConfig(void) {
|
|||
taosInitConfigOption(cfg);
|
||||
assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM);
|
||||
#else
|
||||
assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM - 5);
|
||||
//assert(tsGlobalConfigNum == TSDB_CFG_MAX_NUM - 5);
|
||||
#endif
|
||||
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ int32_t dndGetClusterId(SDnode *pDnode);
|
|||
void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
|
||||
void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
|
||||
void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg);
|
||||
void dndSendStatusMsg(SDnode *pDnode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -335,7 +335,7 @@ static int32_t dndWriteDnodes(SDnode *pDnode) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void dndSendStatusMsg(SDnode *pDnode) {
|
||||
void dndSendStatusMsg(SDnode *pDnode) {
|
||||
int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
|
||||
|
||||
SStatusMsg *pStatus = rpcMallocCont(contLen);
|
||||
|
|
|
@ -130,7 +130,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
|
||||
if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
|
||||
if (pMsg == NULL || pMsg->pCont == NULL) return;
|
||||
dTrace("RPC %p, rsp:%s app:%p is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType], pMsg->ahandle);
|
||||
dTrace("RPC %p, rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
return;
|
||||
}
|
||||
|
@ -138,10 +138,9 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
DndMsgFp fp = pMgmt->msgFp[msgType];
|
||||
if (fp != NULL) {
|
||||
(*fp)(pDnode, pMsg, pEpSet);
|
||||
dTrace("RPC %p, rsp:%s app:%p is processed, code:0x%0X", pMsg->handle, taosMsg[msgType], pMsg->ahandle,
|
||||
pMsg->code & 0XFFFF);
|
||||
dTrace("RPC %p, rsp:%s is processed, code:0x%0X", pMsg->handle, taosMsg[msgType], pMsg->code & 0XFFFF);
|
||||
} else {
|
||||
dError("RPC %p, rsp:%s app:%p not processed", pMsg->handle, taosMsg[msgType], pMsg->ahandle);
|
||||
dError("RPC %p, rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
|
||||
}
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
||||
|
|
|
@ -194,6 +194,7 @@ SDnode *dndInit(SDnodeOpt *pOption) {
|
|||
}
|
||||
|
||||
dndSetStat(pDnode, DND_STAT_RUNNING);
|
||||
dndSendStatusMsg(pDnode);
|
||||
dndReportStartup(pDnode, "TDengine", "initialized successfully");
|
||||
dInfo("TDengine is initialized successfully");
|
||||
|
||||
|
|
|
@ -80,6 +80,8 @@ TEST_F(DndTestDnode, ShowDnode) {
|
|||
|
||||
sendMsg(pClient, &showRpcMsg);
|
||||
ASSERT_NE(pClient->pRsp, nullptr);
|
||||
ASSERT_EQ(pClient->pRsp->code, 0);
|
||||
ASSERT_NE(pClient->pRsp->pCont, nullptr);
|
||||
|
||||
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont;
|
||||
ASSERT_NE(pShowRsp, nullptr);
|
||||
|
@ -170,6 +172,7 @@ TEST_F(DndTestDnode, ShowDnode) {
|
|||
sendMsg(pClient, &retrieveRpcMsg);
|
||||
ASSERT_NE(pClient->pRsp, nullptr);
|
||||
ASSERT_EQ(pClient->pRsp->code, 0);
|
||||
ASSERT_NE(pClient->pRsp->pCont, nullptr);
|
||||
|
||||
SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
|
||||
ASSERT_NE(pRetrieveRsp, nullptr);
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#include "deploy.h"
|
||||
|
||||
void initLog(const char* path) {
|
||||
dDebugFlag = 0;
|
||||
dDebugFlag = 207;
|
||||
vDebugFlag = 0;
|
||||
mDebugFlag = 207;
|
||||
cDebugFlag = 0;
|
||||
|
@ -90,6 +90,7 @@ SServer* createServer(const char* path, const char* fqdn, uint16_t port, const c
|
|||
}
|
||||
|
||||
void dropServer(SServer* pServer) {
|
||||
if (pServer == NULL) return;
|
||||
if (pServer->threadId != NULL) {
|
||||
taosDestoryThread(pServer->threadId);
|
||||
}
|
||||
|
@ -98,6 +99,8 @@ void dropServer(SServer* pServer) {
|
|||
void processClientRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
||||
SClient* pClient = (SClient*)parent;
|
||||
pClient->pRsp = pMsg;
|
||||
uInfo("response:%s from dnode, pCont:%p contLen:%d code:0x%X", taosMsg[pMsg->msgType], pMsg->pCont, pMsg->contLen,
|
||||
pMsg->code);
|
||||
tsem_post(&pClient->sem);
|
||||
}
|
||||
|
||||
|
@ -143,7 +146,7 @@ void sendMsg(SClient* pClient, SRpcMsg* pMsg) {
|
|||
epSet.inUse = 0;
|
||||
epSet.numOfEps = 1;
|
||||
epSet.port[0] = pClient->port;
|
||||
strcpy(epSet.fqdn[0], pClient->fqdn);
|
||||
memcpy(epSet.fqdn[0], pClient->fqdn, TSDB_FQDN_LEN);
|
||||
|
||||
rpcSendRequest(pClient->clientRpc, &epSet, pMsg, NULL);
|
||||
tsem_wait(&pClient->sem);
|
||||
|
|
|
@ -45,7 +45,7 @@ static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
|
|||
|
||||
int32_t mndInitVgroup(SMnode *pMnode) {
|
||||
SSdbTable table = {.sdbType = SDB_VGROUP,
|
||||
.keyType = SDB_KEY_BINARY,
|
||||
.keyType = SDB_KEY_INT32,
|
||||
.encodeFp = (SdbEncodeFp)mndVgroupActionEncode,
|
||||
.decodeFp = (SdbDecodeFp)mndVgroupActionDecode,
|
||||
.insertFp = (SdbInsertFp)mndVgroupActionInsert,
|
||||
|
|
|
@ -126,6 +126,8 @@ typedef struct SRpcConn {
|
|||
SRpcReqContext *pContext; // request context
|
||||
} SRpcConn;
|
||||
|
||||
static pthread_once_t tsRpcInitOnce = PTHREAD_ONCE_INIT;
|
||||
|
||||
int tsRpcMaxUdpSize = 15000; // bytes
|
||||
int tsProgressTimer = 100;
|
||||
// not configurable
|
||||
|
@ -220,17 +222,22 @@ static void rpcFree(void *p) {
|
|||
free(p);
|
||||
}
|
||||
|
||||
int32_t rpcInit(void) {
|
||||
tsProgressTimer = tsRpcTimer/2;
|
||||
tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer;
|
||||
tsRpcHeadSize = RPC_MSG_OVERHEAD;
|
||||
static void rpcInitImp(void) {
|
||||
tsProgressTimer = tsRpcTimer / 2;
|
||||
tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsProgressTimer;
|
||||
tsRpcHeadSize = RPC_MSG_OVERHEAD;
|
||||
tsRpcOverhead = sizeof(SRpcReqContext);
|
||||
|
||||
tsRpcRefId = taosOpenRef(200, rpcFree);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int32_t rpcInit(void) {
|
||||
pthread_once(&tsRpcInitOnce, rpcInitImp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void rpcCleanup(void) {
|
||||
taosCloseRef(tsRpcRefId);
|
||||
tsRpcRefId = -1;
|
||||
|
|
Loading…
Reference in New Issue