Merge pull request #23422 from taosdata/enh/TD-25879-3.0x
enh: dmodule API dependence and grant process
This commit is contained in:
commit
53e9c483c9
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -51,11 +51,7 @@ typedef enum {
|
||||||
} EGrantType;
|
} EGrantType;
|
||||||
|
|
||||||
int32_t grantCheck(EGrantType grant);
|
int32_t grantCheck(EGrantType grant);
|
||||||
#ifndef TD_GRANT_OPTIMIZE
|
|
||||||
int32_t grantAlterActiveCode(const char* old, const char* newer, char* out, int8_t type);
|
|
||||||
#else
|
|
||||||
int32_t grantAlterActiveCode(int32_t did, const char* old, const char* newer, char* out, int8_t type);
|
int32_t grantAlterActiveCode(int32_t did, const char* old, const char* newer, char* out, int8_t type);
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifndef GRANTS_CFG
|
#ifndef GRANTS_CFG
|
||||||
#ifdef TD_ENTERPRISE
|
#ifdef TD_ENTERPRISE
|
||||||
|
|
|
@ -1568,6 +1568,9 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t id;
|
int32_t id;
|
||||||
int8_t isMnode;
|
int8_t isMnode;
|
||||||
|
#ifdef TD_GRANT_HB_OPTIMIZE
|
||||||
|
int8_t offlineReason;
|
||||||
|
#endif
|
||||||
SEp ep;
|
SEp ep;
|
||||||
char active[TSDB_ACTIVE_KEY_LEN];
|
char active[TSDB_ACTIVE_KEY_LEN];
|
||||||
char connActive[TSDB_CONN_ACTIVE_KEY_LEN];
|
char connActive[TSDB_CONN_ACTIVE_KEY_LEN];
|
||||||
|
|
|
@ -163,7 +163,7 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_PAUSE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_PAUSE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_RESUME_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_RESUME_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_WHITELIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_WHITELIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -97,7 +97,11 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper);
|
||||||
void dmReleaseWrapper(SMgmtWrapper *pWrapper);
|
void dmReleaseWrapper(SMgmtWrapper *pWrapper);
|
||||||
int32_t dmInitVars(SDnode *pDnode);
|
int32_t dmInitVars(SDnode *pDnode);
|
||||||
void dmClearVars(SDnode *pDnode);
|
void dmClearVars(SDnode *pDnode);
|
||||||
|
#if defined(TD_MODULE_OPTIMIZE) || !defined(TD_ENTERPRISE)
|
||||||
|
int32_t dmInitModule(SDnode *pDnode, SMgmtWrapper *wrappers);
|
||||||
|
#else
|
||||||
int32_t dmInitModule(SDnode *pDnode);
|
int32_t dmInitModule(SDnode *pDnode);
|
||||||
|
#endif
|
||||||
bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper);
|
bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper);
|
||||||
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper);
|
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper);
|
||||||
void dmSetStatus(SDnode *pDnode, EDndRunStatus stype);
|
void dmSetStatus(SDnode *pDnode, EDndRunStatus stype);
|
||||||
|
@ -119,7 +123,11 @@ int32_t dmInitStatusClient(SDnode *pDnode);
|
||||||
void dmCleanupClient(SDnode *pDnode);
|
void dmCleanupClient(SDnode *pDnode);
|
||||||
void dmCleanupStatusClient(SDnode *pDnode);
|
void dmCleanupStatusClient(SDnode *pDnode);
|
||||||
SMsgCb dmGetMsgcb(SDnode *pDnode);
|
SMsgCb dmGetMsgcb(SDnode *pDnode);
|
||||||
|
#if defined(TD_MODULE_OPTIMIZE) || !defined(TD_ENTERPRISE)
|
||||||
|
int32_t dmInitMsgHandle(SDnode *pDnode, SMgmtWrapper *wrappers);
|
||||||
|
#else
|
||||||
int32_t dmInitMsgHandle(SDnode *pDnode);
|
int32_t dmInitMsgHandle(SDnode *pDnode);
|
||||||
|
#endif
|
||||||
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
|
|
||||||
// dmMonitor.c
|
// dmMonitor.c
|
||||||
|
|
|
@ -66,9 +66,15 @@ int32_t dmInitDnode(SDnode *pDnode) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if defined(TD_MODULE_OPTIMIZE) || !defined(TD_ENTERPRISE)
|
||||||
|
if (dmInitModule(pDnode, pDnode->wrappers) != 0) {
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
#else
|
||||||
if (dmInitModule(pDnode) != 0) {
|
if (dmInitModule(pDnode) != 0) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
indexInit(tsNumOfCommitThreads);
|
indexInit(tsNumOfCommitThreads);
|
||||||
streamMetaInit();
|
streamMetaInit();
|
||||||
|
@ -107,6 +113,77 @@ void dmCleanupDnode(SDnode *pDnode) {
|
||||||
dDebug("dnode is closed, ptr:%p", pDnode);
|
dDebug("dnode is closed, ptr:%p", pDnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if defined(TD_MODULE_OPTIMIZE) || !defined(TD_ENTERPRISE)
|
||||||
|
int32_t dmInitVars(SDnode *pDnode) {
|
||||||
|
SDnodeData *pData = &pDnode->data;
|
||||||
|
pData->dnodeId = 0;
|
||||||
|
pData->clusterId = 0;
|
||||||
|
pData->dnodeVer = 0;
|
||||||
|
pData->engineVer = 0;
|
||||||
|
pData->updateTime = 0;
|
||||||
|
pData->rebootTime = taosGetTimestampMs();
|
||||||
|
pData->dropped = 0;
|
||||||
|
pData->stopped = 0;
|
||||||
|
|
||||||
|
pData->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
|
if (pData->dnodeHash == NULL) {
|
||||||
|
dError("failed to init dnode hash");
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dmReadEps(pData) != 0) {
|
||||||
|
dError("failed to read file since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pData->dropped) {
|
||||||
|
dError("dnode will not start since its already dropped");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadRwlockInit(&pData->lock, NULL);
|
||||||
|
taosThreadMutexInit(&pDnode->mutex, NULL);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void dmClearVars(SDnode *pDnode) {
|
||||||
|
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
|
||||||
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
||||||
|
taosMemoryFreeClear(pWrapper->path);
|
||||||
|
taosThreadRwlockDestroy(&pWrapper->lock);
|
||||||
|
}
|
||||||
|
if (pDnode->lockfile != NULL) {
|
||||||
|
taosUnLockFile(pDnode->lockfile);
|
||||||
|
taosCloseFile(&pDnode->lockfile);
|
||||||
|
pDnode->lockfile = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDnodeData *pData = &pDnode->data;
|
||||||
|
taosThreadRwlockWrlock(&pData->lock);
|
||||||
|
if (pData->oldDnodeEps != NULL) {
|
||||||
|
if (dmWriteEps(pData) == 0) {
|
||||||
|
dmRemoveDnodePairs(pData);
|
||||||
|
}
|
||||||
|
taosArrayDestroy(pData->oldDnodeEps);
|
||||||
|
pData->oldDnodeEps = NULL;
|
||||||
|
}
|
||||||
|
if (pData->dnodeEps != NULL) {
|
||||||
|
taosArrayDestroy(pData->dnodeEps);
|
||||||
|
pData->dnodeEps = NULL;
|
||||||
|
}
|
||||||
|
if (pData->dnodeHash != NULL) {
|
||||||
|
taosHashCleanup(pData->dnodeHash);
|
||||||
|
pData->dnodeHash = NULL;
|
||||||
|
}
|
||||||
|
taosThreadRwlockUnlock(&pData->lock);
|
||||||
|
|
||||||
|
taosThreadRwlockDestroy(&pData->lock);
|
||||||
|
taosThreadMutexDestroy(&pDnode->mutex);
|
||||||
|
memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
void dmSetStatus(SDnode *pDnode, EDndRunStatus status) {
|
void dmSetStatus(SDnode *pDnode, EDndRunStatus status) {
|
||||||
if (pDnode->status != status) {
|
if (pDnode->status != status) {
|
||||||
dDebug("dnode status set from %s to %s", dmStatStr(pDnode->status), dmStatStr(status));
|
dDebug("dnode status set from %s to %s", dmStatStr(pDnode->status), dmStatStr(status));
|
||||||
|
|
|
@ -251,6 +251,33 @@ _OVER:
|
||||||
dmReleaseWrapper(pWrapper);
|
dmReleaseWrapper(pWrapper);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if defined(TD_MODULE_OPTIMIZE) || !defined(TD_ENTERPRISE)
|
||||||
|
int32_t dmInitMsgHandle(SDnode *pDnode, SMgmtWrapper *wrappers) {
|
||||||
|
SDnodeTrans *pTrans = &pDnode->trans;
|
||||||
|
|
||||||
|
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
|
||||||
|
SMgmtWrapper *pWrapper = wrappers + ntype;
|
||||||
|
SArray *pArray = (*pWrapper->func.getHandlesFp)();
|
||||||
|
if (pArray == NULL) return -1;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||||
|
SMgmtHandle *pMgmt = taosArrayGet(pArray, i);
|
||||||
|
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
|
||||||
|
if (pMgmt->needCheckVgId) {
|
||||||
|
pHandle->needCheckVgId = pMgmt->needCheckVgId;
|
||||||
|
}
|
||||||
|
if (!pMgmt->needCheckVgId) {
|
||||||
|
pHandle->defaultNtype = ntype;
|
||||||
|
}
|
||||||
|
pWrapper->msgFps[TMSG_INDEX(pMgmt->msgType)] = pMgmt->msgFp;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pArray);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
#else
|
||||||
int32_t dmInitMsgHandle(SDnode *pDnode) {
|
int32_t dmInitMsgHandle(SDnode *pDnode) {
|
||||||
SDnodeTrans *pTrans = &pDnode->trans;
|
SDnodeTrans *pTrans = &pDnode->trans;
|
||||||
|
|
||||||
|
@ -276,6 +303,7 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
SDnode *pDnode = dmInstance();
|
SDnode *pDnode = dmInstance();
|
||||||
|
|
|
@ -397,6 +397,9 @@ void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) {
|
||||||
SDnodeInfo dInfo;
|
SDnodeInfo dInfo;
|
||||||
dInfo.id = pDnode->id;
|
dInfo.id = pDnode->id;
|
||||||
dInfo.ep.port = pDnode->port;
|
dInfo.ep.port = pDnode->port;
|
||||||
|
#ifdef TD_GRANT_HB_OPTIMIZE
|
||||||
|
dInfo.offlineReason = pDnode->offlineReason;
|
||||||
|
#endif
|
||||||
tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
tstrncpy(dInfo.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
||||||
tstrncpy(dInfo.active, pDnode->active, TSDB_ACTIVE_KEY_LEN);
|
tstrncpy(dInfo.active, pDnode->active, TSDB_ACTIVE_KEY_LEN);
|
||||||
tstrncpy(dInfo.connActive, pDnode->connActive, TSDB_CONN_ACTIVE_KEY_LEN);
|
tstrncpy(dInfo.connActive, pDnode->connActive, TSDB_CONN_ACTIVE_KEY_LEN);
|
||||||
|
@ -781,11 +784,7 @@ static int32_t mndConfigDnode(SMnode *pMnode, SRpcMsg *pReq, SMCfgDnodeReq *pCfg
|
||||||
|
|
||||||
SDnodeObj tmpDnode = *pDnode;
|
SDnodeObj tmpDnode = *pDnode;
|
||||||
if (action == DND_ACTIVE_CODE) {
|
if (action == DND_ACTIVE_CODE) {
|
||||||
#ifndef TD_GRANT_OPTIMIZE
|
|
||||||
if (grantAlterActiveCode(pDnode->active, pCfgReq->value, tmpDnode.active, 0) != 0) {
|
|
||||||
#else
|
|
||||||
if (grantAlterActiveCode(pDnode->id, pDnode->active, pCfgReq->value, tmpDnode.active, 0) != 0) {
|
if (grantAlterActiveCode(pDnode->id, pDnode->active, pCfgReq->value, tmpDnode.active, 0) != 0) {
|
||||||
#endif
|
|
||||||
if (TSDB_CODE_DUP_KEY != terrno) {
|
if (TSDB_CODE_DUP_KEY != terrno) {
|
||||||
mError("dnode:%d, config dnode:%d, app:%p config:%s value:%s failed since %s", pDnode->id, pCfgReq->dnodeId,
|
mError("dnode:%d, config dnode:%d, app:%p config:%s value:%s failed since %s", pDnode->id, pCfgReq->dnodeId,
|
||||||
pReq->info.ahandle, pCfgReq->config, pCfgReq->value, terrstr());
|
pReq->info.ahandle, pCfgReq->config, pCfgReq->value, terrstr());
|
||||||
|
@ -801,11 +800,7 @@ static int32_t mndConfigDnode(SMnode *pMnode, SRpcMsg *pReq, SMCfgDnodeReq *pCfg
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
} else if (action == DND_CONN_ACTIVE_CODE) {
|
} else if (action == DND_CONN_ACTIVE_CODE) {
|
||||||
#ifndef TD_GRANT_OPTIMIZE
|
|
||||||
if (grantAlterActiveCode(pDnode->connActive, pCfgReq->value, tmpDnode.connActive, 1) != 0) {
|
|
||||||
#else
|
|
||||||
if (grantAlterActiveCode(pDnode->id, pDnode->connActive, pCfgReq->value, tmpDnode.connActive, 1) != 0) {
|
if (grantAlterActiveCode(pDnode->id, pDnode->connActive, pCfgReq->value, tmpDnode.connActive, 1) != 0) {
|
||||||
#endif
|
|
||||||
if (TSDB_CODE_DUP_KEY != terrno) {
|
if (TSDB_CODE_DUP_KEY != terrno) {
|
||||||
mError("dnode:%d, config dnode:%d, app:%p config:%s value:%s failed since %s", pDnode->id, pCfgReq->dnodeId,
|
mError("dnode:%d, config dnode:%d, app:%p config:%s value:%s failed since %s", pDnode->id, pCfgReq->dnodeId,
|
||||||
pReq->info.ahandle, pCfgReq->config, pCfgReq->value, terrstr());
|
pReq->info.ahandle, pCfgReq->config, pCfgReq->value, terrstr());
|
||||||
|
|
|
@ -131,13 +131,9 @@ void grantAdd(EGrantType grant, uint64_t value) {}
|
||||||
void grantRestore(EGrantType grant, uint64_t value) {}
|
void grantRestore(EGrantType grant, uint64_t value) {}
|
||||||
int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
|
int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
|
||||||
int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
|
int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
|
||||||
#ifndef TD_GRANT_OPTIMIZE
|
|
||||||
int32_t grantAlterActiveCode(const char *old, const char *new, char *out, int8_t type) { return TSDB_CODE_SUCCESS; }
|
|
||||||
#else
|
|
||||||
int32_t grantAlterActiveCode(int32_t did, const char *old, const char *new, char *out, int8_t type) {
|
int32_t grantAlterActiveCode(int32_t did, const char *old, const char *new, char *out, int8_t type) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue