Merge pull request #11065 from taosdata/feature/shm
TD-14354 faileure while execute create database if not exists group_db0 keep 36500
This commit is contained in:
commit
5af4594ff1
|
@ -369,6 +369,10 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_MAX_DB_CACHE_LAST_ROW 3
|
||||
#define TSDB_DEFAULT_CACHE_LAST_ROW 0
|
||||
|
||||
#define TSDB_MIN_DB_STREAM_MODE 0
|
||||
#define TSDB_MAX_DB_STREAM_MODE 1
|
||||
#define TSDB_DEFAULT_DB_STREAM_MODE 0
|
||||
|
||||
#define TSDB_MIN_DB_FILE_FACTOR 0
|
||||
#define TSDB_MAX_DB_FILE_FACTOR 1
|
||||
#define TSDB_DEFAULT_DB_FILE_FACTOR 0.1
|
||||
|
|
|
@ -119,14 +119,14 @@ static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) {
|
|||
static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {
|
||||
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||
pRpc->pCont = pCont;
|
||||
dTrace("msg:%p, get from child queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pRpc->msgType), pRpc->handle,
|
||||
pRpc->ahandle);
|
||||
dTrace("msg:%p, get from child process queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pRpc->msgType),
|
||||
pRpc->handle, pRpc->ahandle);
|
||||
|
||||
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
|
||||
int32_t code = (*msgFp)(pWrapper, pMsg);
|
||||
dTrace("msg:%p, is processed, code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
|
||||
|
||||
if (code != 0) {
|
||||
dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
|
||||
if (pRpc->msgType & 1U) {
|
||||
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
|
||||
dndSendRsp(pWrapper, &rsp);
|
||||
|
@ -140,8 +140,8 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t
|
|||
|
||||
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, int32_t msgLen, void *pCont, int32_t contLen) {
|
||||
pRpc->pCont = pCont;
|
||||
dTrace("msg:%p, get from parent queue, type:%s handle:%p app:%p", pRpc, TMSG_INFO(pRpc->msgType), pRpc->handle,
|
||||
pRpc->ahandle);
|
||||
dTrace("msg:%p, get from parent process queue, type:%s handle:%p app:%p", pRpc, TMSG_INFO(pRpc->msgType),
|
||||
pRpc->handle, pRpc->ahandle);
|
||||
|
||||
dndSendRsp(pWrapper, pRpc);
|
||||
taosMemoryFree(pRpc);
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
||||
SMnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
|
||||
dTrace("msg:%p, will be processed in mnode queue", pMsg);
|
||||
dTrace("msg:%p, get from mnode queue", pMsg);
|
||||
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||
int32_t code = -1;
|
||||
|
||||
|
@ -31,9 +31,11 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
if (pRpc->msgType & 1U) {
|
||||
if (pRpc->handle == NULL) return;
|
||||
if (code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
if (code != 0) code = terrno;
|
||||
if (pRpc->handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
if (code != 0) {
|
||||
code = terrno;
|
||||
dError("msg:%p, failed to process since %s", pMsg, terrstr());
|
||||
}
|
||||
SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp};
|
||||
dndSendRsp(pMgmt->pWrapper, &rsp);
|
||||
}
|
||||
|
@ -47,7 +49,7 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
|||
static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
||||
SMnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
|
||||
dTrace("msg:%p, will be processed in mnode queue", pMsg);
|
||||
dTrace("msg:%p, get from mnode query queue", pMsg);
|
||||
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||
int32_t code = -1;
|
||||
|
||||
|
@ -55,8 +57,8 @@ static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
|||
code = mndProcessMsg(pMsg);
|
||||
|
||||
if (pRpc->msgType & 1U) {
|
||||
if (pRpc->handle == NULL) return;
|
||||
if (code != 0) {
|
||||
if (pRpc->handle != NULL && code != 0) {
|
||||
dError("msg:%p, failed to process since %s", pMsg, terrstr());
|
||||
SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .ahandle = pRpc->ahandle};
|
||||
dndSendRsp(pMgmt->pWrapper, &rsp);
|
||||
}
|
||||
|
|
|
@ -276,6 +276,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
|
|||
if (pCfg->quorum > pCfg->replications) return -1;
|
||||
if (pCfg->update < TSDB_MIN_DB_UPDATE || pCfg->update > TSDB_MAX_DB_UPDATE) return -1;
|
||||
if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) return -1;
|
||||
if (pCfg->streamMode < TSDB_MIN_DB_STREAM_MODE || pCfg->streamMode > TSDB_MAX_DB_STREAM_MODE) return -1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -285,8 +286,8 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
|
|||
if (pCfg->totalBlocks < 0) pCfg->totalBlocks = TSDB_DEFAULT_TOTAL_BLOCKS;
|
||||
if (pCfg->daysPerFile < 0) pCfg->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
|
||||
if (pCfg->daysToKeep0 < 0) pCfg->daysToKeep0 = TSDB_DEFAULT_KEEP;
|
||||
if (pCfg->daysToKeep1 < 0) pCfg->daysToKeep1 = TSDB_DEFAULT_KEEP;
|
||||
if (pCfg->daysToKeep2 < 0) pCfg->daysToKeep2 = TSDB_DEFAULT_KEEP;
|
||||
if (pCfg->daysToKeep1 < 0) pCfg->daysToKeep1 = pCfg->daysToKeep0;
|
||||
if (pCfg->daysToKeep2 < 0) pCfg->daysToKeep2 = pCfg->daysToKeep1;
|
||||
if (pCfg->minRows < 0) pCfg->minRows = TSDB_DEFAULT_MIN_ROW_FBLOCK;
|
||||
if (pCfg->maxRows < 0) pCfg->maxRows = TSDB_DEFAULT_MAX_ROW_FBLOCK;
|
||||
if (pCfg->commitTime < 0) pCfg->commitTime = TSDB_DEFAULT_COMMIT_TIME;
|
||||
|
@ -298,6 +299,8 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
|
|||
if (pCfg->quorum < 0) pCfg->quorum = TSDB_DEFAULT_DB_QUORUM_OPTION;
|
||||
if (pCfg->update < 0) pCfg->update = TSDB_DEFAULT_DB_UPDATE_OPTION;
|
||||
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW;
|
||||
if (pCfg->streamMode < 0) pCfg->streamMode = TSDB_DEFAULT_DB_STREAM_MODE;
|
||||
if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
|
||||
|
@ -431,11 +434,11 @@ static int32_t mndCreateDb(SMnode *pMnode, SNodeMsg *pReq, SCreateDbReq *pCreate
|
|||
.daysToKeep2 = pCreate->daysToKeep2,
|
||||
.minRows = pCreate->minRows,
|
||||
.maxRows = pCreate->maxRows,
|
||||
.fsyncPeriod = pCreate->fsyncPeriod,
|
||||
.commitTime = pCreate->commitTime,
|
||||
.fsyncPeriod = pCreate->fsyncPeriod,
|
||||
.walLevel = pCreate->walLevel,
|
||||
.precision = pCreate->precision,
|
||||
.compression = pCreate->compression,
|
||||
.walLevel = pCreate->walLevel,
|
||||
.replications = pCreate->replications,
|
||||
.quorum = pCreate->quorum,
|
||||
.update = pCreate->update,
|
||||
|
@ -445,7 +448,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SNodeMsg *pReq, SCreateDbReq *pCreate
|
|||
|
||||
dbObj.cfg.numOfRetensions = pCreate->numOfRetensions;
|
||||
dbObj.cfg.pRetensions = pCreate->pRetensions;
|
||||
pCreate = NULL;
|
||||
pCreate->pRetensions = NULL;
|
||||
|
||||
mndSetDefaultDbCfg(&dbObj.cfg);
|
||||
|
||||
|
|
Loading…
Reference in New Issue