diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index d295e772e8..24ded1f90a 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -32,6 +32,18 @@ typedef int32_t (*PutReqToMWriteQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef int32_t (*PutReqToMReadQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); + +typedef struct SMnodeMsg { + char user[TSDB_USER_LEN]; + char db[TSDB_DB_FNAME_LEN]; + int32_t acctId; + SMnode* pMnode; + int64_t createdTime; + SRpcMsg rpcMsg; + int32_t contLen; + void* pCont; +} SMnodeMsg; + typedef struct { int32_t dnodeId; int64_t clusterId; @@ -111,14 +123,7 @@ int32_t mndRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, cha * @param pMsg The request rpc msg. * @return int32_t The created mnode msg. */ -SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg); - -/** - * @brief Cleanup mnode msg. - * - * @param pMsg The request msg. - */ -void mndCleanupMsg(SMnodeMsg *pMsg); +int32_t mndBuildMsg(SMnodeMsg *pMnodeMsg, SRpcMsg *pRpcMsg); /** * @brief Cleanup mnode msg. diff --git a/include/util/tprocess.h b/include/util/tprocess.h index 067404a025..b91c9283fe 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -48,6 +48,7 @@ typedef struct { void *pParent; bool stopFlag; bool testFlag; + bool isChild; } SProcObj; SProcObj *taosProcInit(const SProcCfg *pCfg); diff --git a/source/dnode/mgmt/impl/inc/dndEnv.h b/source/dnode/mgmt/impl/inc/dndEnv.h index 7f8e11b7aa..445a8bbf31 100644 --- a/source/dnode/mgmt/impl/inc/dndEnv.h +++ b/source/dnode/mgmt/impl/inc/dndEnv.h @@ -62,6 +62,8 @@ typedef struct { SDnodeWorker statusWorker; } SDnodeMgmt; +typedef enum { SINGLE_PROC, MULTI_PROC_PARENT, MULTI_PROC_CHILD } EProcType; + typedef struct { int32_t refCount; int8_t deployed; @@ -76,8 +78,10 @@ typedef struct { SReplica replicas[TSDB_MAX_REPLICA]; // - bool multiProcess; + MndMsgFp msgFp[TDMT_MAX]; SProcObj *pProcess; + bool singleProc; + bool isChild; } SMnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 10e79f6710..d7147bc6ea 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -61,6 +61,8 @@ typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType; typedef enum { DND_ENV_INIT = 0, DND_ENV_READY = 1, DND_ENV_CLEANUP = 2 } EEnvStat; typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps); +typedef int32_t (*MndMsgFp)(SDnode *pDnode, SMnodeMsg *pMnodeMsg); + EStat dndGetStat(SDnode *pDnode); void dndSetStat(SDnode *pDnode, EStat stat); const char *dndStatStr(EStat stat); diff --git a/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h b/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h index 40dd9e52f3..3a297368ce 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h +++ b/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h @@ -21,24 +21,34 @@ extern "C" { #endif #include "dndEnv.h" +// interface int32_t mmInit(SDnode *pDnode); void mmCleanup(SDnode *pDnode); +// internal +void mmInitMsgFp(SMnodeMgmt *pMgmt); + +SMnode *mmAcquire(SDnode *pDnode); +void mmRelease(SDnode *pDnode, SMnode *pMnode); + +// mmFile int32_t mmReadFile(SDnode *pDnode); int32_t mmWriteFile(SDnode *pDnode); +// mmMsg +void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); + +// mmQueue +int32_t mmWriteToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMnodeMsg); //////////// int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey); -void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, - SMonGrantInfo *pGrantInfo); + SMonGrantInfo *pGrantInfo); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmFile.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmFile.c index 39a1803456..89d7eefab2 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmFile.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmFile.c @@ -15,153 +15,154 @@ #define _DEFAULT_SOURCE #include "mm.h" -#include "dndMgmt.h" -#include "dndTransport.h" -#include "dndWorker.h" -// int32_t mmReadFile(SMnodeMgmt *pMgmt) { -// int32_t code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR; -// int32_t len = 0; -// int32_t maxLen = 4096; -// char *content = calloc(1, maxLen + 1); -// cJSON *root = NULL; +int32_t mmReadFile(SDnode *pDnode) { + SMnodeMgmt *pMgmt = &pDnode->mmgmt; -// char file[PATH_MAX] = {0}; -// snprintf(file, sizeof(file), "%s%smnode.json", pMgmt->path, TD_DIRSEP); + int32_t code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR; + int32_t len = 0; + int32_t maxLen = 4096; + char *content = calloc(1, maxLen + 1); + cJSON *root = NULL; + char file[PATH_MAX + 20]; -// TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); -// if (pFile == NULL) { -// dDebug("file %s not exist", file); -// code = 0; -// goto PRASE_MNODE_OVER; -// } + snprintf(file, sizeof(file), "%s%smnode.json", pDnode->dir.dnode, TD_DIRSEP); -// len = (int32_t)taosReadFile(pFile, content, maxLen); -// if (len <= 0) { -// dError("failed to read %s since content is null", file); -// goto PRASE_MNODE_OVER; -// } + TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); + if (pFile == NULL) { + dDebug("file %s not exist", file); + code = 0; + goto PRASE_MNODE_OVER; + } -// content[len] = 0; -// root = cJSON_Parse(content); -// if (root == NULL) { -// dError("failed to read %s since invalid json format", file); -// goto PRASE_MNODE_OVER; -// } + len = (int32_t)taosReadFile(pFile, content, maxLen); + if (len <= 0) { + dError("failed to read %s since content is null", file); + goto PRASE_MNODE_OVER; + } -// cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); -// if (!deployed || deployed->type != cJSON_Number) { -// dError("failed to read %s since deployed not found", file); -// goto PRASE_MNODE_OVER; -// } -// pMgmt->deployed = deployed->valueint; + content[len] = 0; + root = cJSON_Parse(content); + if (root == NULL) { + dError("failed to read %s since invalid json format", file); + goto PRASE_MNODE_OVER; + } -// cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); -// if (!dropped || dropped->type != cJSON_Number) { -// dError("failed to read %s since dropped not found", file); -// goto PRASE_MNODE_OVER; -// } -// pMgmt->dropped = dropped->valueint; + cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); + if (!deployed || deployed->type != cJSON_Number) { + dError("failed to read %s since deployed not found", file); + goto PRASE_MNODE_OVER; + } + pMgmt->deployed = deployed->valueint; -// cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes"); -// if (!mnodes || mnodes->type != cJSON_Array) { -// dError("failed to read %s since nodes not found", file); -// goto PRASE_MNODE_OVER; -// } + cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); + if (!dropped || dropped->type != cJSON_Number) { + dError("failed to read %s since dropped not found", file); + goto PRASE_MNODE_OVER; + } + pMgmt->dropped = dropped->valueint; -// pMgmt->replica = cJSON_GetArraySize(mnodes); -// if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) { -// dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica); -// goto PRASE_MNODE_OVER; -// } + cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes"); + if (!mnodes || mnodes->type != cJSON_Array) { + dError("failed to read %s since nodes not found", file); + goto PRASE_MNODE_OVER; + } -// for (int32_t i = 0; i < pMgmt->replica; ++i) { -// cJSON *node = cJSON_GetArrayItem(mnodes, i); -// if (node == NULL) break; + pMgmt->replica = cJSON_GetArraySize(mnodes); + if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) { + dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica); + goto PRASE_MNODE_OVER; + } -// SReplica *pReplica = &pMgmt->replicas[i]; + for (int32_t i = 0; i < pMgmt->replica; ++i) { + cJSON *node = cJSON_GetArrayItem(mnodes, i); + if (node == NULL) break; -// cJSON *id = cJSON_GetObjectItem(node, "id"); -// if (!id || id->type != cJSON_Number) { -// dError("failed to read %s since id not found", file); -// goto PRASE_MNODE_OVER; -// } -// pReplica->id = id->valueint; + SReplica *pReplica = &pMgmt->replicas[i]; -// cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn"); -// if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { -// dError("failed to read %s since fqdn not found", file); -// goto PRASE_MNODE_OVER; -// } -// tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN); + cJSON *id = cJSON_GetObjectItem(node, "id"); + if (!id || id->type != cJSON_Number) { + dError("failed to read %s since id not found", file); + goto PRASE_MNODE_OVER; + } + pReplica->id = id->valueint; -// cJSON *port = cJSON_GetObjectItem(node, "port"); -// if (!port || port->type != cJSON_Number) { -// dError("failed to read %s since port not found", file); -// goto PRASE_MNODE_OVER; -// } -// pReplica->port = port->valueint; -// } + cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn"); + if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { + dError("failed to read %s since fqdn not found", file); + goto PRASE_MNODE_OVER; + } + tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN); -// code = 0; -// dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); + cJSON *port = cJSON_GetObjectItem(node, "port"); + if (!port || port->type != cJSON_Number) { + dError("failed to read %s since port not found", file); + goto PRASE_MNODE_OVER; + } + pReplica->port = port->valueint; + } -// PRASE_MNODE_OVER: -// if (content != NULL) free(content); -// if (root != NULL) cJSON_Delete(root); -// if (pFile != NULL) taosCloseFile(&pFile); + code = 0; + dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); -// terrno = code; -// return code; -// } +PRASE_MNODE_OVER: + if (content != NULL) free(content); + if (root != NULL) cJSON_Delete(root); + if (pFile != NULL) taosCloseFile(&pFile); -// int32_t mmWriteFile(SMnodeMgmt *pMgmt) { -// char file[PATH_MAX] = {0}; -// snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP); + terrno = code; + return code; +} -// TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); -// if (pFile == NULL) { -// terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR; -// dError("failed to write %s since %s", file, terrstr()); -// return -1; -// } +int32_t mmWriteFile(SDnode *pDnode) { + SMnodeMgmt *pMgmt = &pDnode->mmgmt; -// int32_t len = 0; -// int32_t maxLen = 4096; -// char *content = calloc(1, maxLen + 1); + char file[PATH_MAX]; + snprintf(file, sizeof(file), "%s%smnode.json.bak", pDnode->dir.dnode, TD_DIRSEP); -// len += snprintf(content + len, maxLen - len, "{\n"); -// len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed); + TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pFile == NULL) { + terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR; + dError("failed to write %s since %s", file, terrstr()); + return -1; + } -// len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped); -// len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n"); -// for (int32_t i = 0; i < pMgmt->replica; ++i) { -// SReplica *pReplica = &pMgmt->replicas[i]; -// len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); -// len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); -// len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port); -// if (i < pMgmt->replica - 1) { -// len += snprintf(content + len, maxLen - len, " },{\n"); -// } else { -// len += snprintf(content + len, maxLen - len, " }]\n"); -// } -// } -// len += snprintf(content + len, maxLen - len, "}\n"); + int32_t len = 0; + int32_t maxLen = 4096; + char *content = calloc(1, maxLen + 1); -// taosWriteFile(pFile, content, len); -// taosFsyncFile(pFile); -// taosCloseFile(&pFile); -// free(content); + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed); -// char realfile[PATH_MAX]; -// snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP); + len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped); + len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n"); + for (int32_t i = 0; i < pMgmt->replica; ++i) { + SReplica *pReplica = &pMgmt->replicas[i]; + len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); + len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); + len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port); + if (i < pMgmt->replica - 1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }]\n"); + } + } + len += snprintf(content + len, maxLen - len, "}\n"); -// if (taosRenameFile(file, realfile) != 0) { -// terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR; -// dError("failed to rename %s since %s", file, terrstr()); -// return -1; -// } + taosWriteFile(pFile, content, len); + taosFsyncFile(pFile); + taosCloseFile(&pFile); + free(content); -// dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped); -// return 0; -// } \ No newline at end of file + char realfile[PATH_MAX + 20]; + snprintf(realfile, sizeof(realfile), "%s%smnode.json", pDnode->dir.dnode, TD_DIRSEP); + + if (taosRenameFile(file, realfile) != 0) { + terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR; + dError("failed to rename %s since %s", file, terrstr()); + return -1; + } + + dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped); + return 0; +} diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c index 57ce540b7e..9669d7d132 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c @@ -14,15 +14,17 @@ */ #define _DEFAULT_SOURCE +#include "mm.h" + #include "dndMgmt.h" #include "dndTransport.h" #include "dndWorker.h" -#include "mm.h" -static void dndWriteMnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpcMsg); + + static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg); -static SMnode *dndAcquireMnode(SDnode *pDnode) { +SMnode *mmAcquire(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = NULL; int32_t refCount = 0; @@ -42,7 +44,7 @@ static SMnode *dndAcquireMnode(SDnode *pDnode) { return pMnode; } -static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) { +void mmRelease(SDnode *pDnode, SMnode *pMnode) { if (pMnode == NULL) return; SMnodeMgmt *pMgmt = &pDnode->mmgmt; @@ -52,158 +54,6 @@ static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) { dTrace("release mnode, refCount:%d", refCount); } - int32_t mmReadFile(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - int32_t code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR; - int32_t len = 0; - int32_t maxLen = 4096; - char *content = calloc(1, maxLen + 1); - cJSON *root = NULL; - - char file[PATH_MAX + 20]; - snprintf(file, PATH_MAX + 20, "%s/mnode.json", pDnode->dir.dnode); - - // FILE *fp = fopen(file, "r"); - TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); - if (pFile == NULL) { - dDebug("file %s not exist", file); - code = 0; - goto PRASE_MNODE_OVER; - } - - len = (int32_t)taosReadFile(pFile, content, maxLen); - if (len <= 0) { - dError("failed to read %s since content is null", file); - goto PRASE_MNODE_OVER; - } - - content[len] = 0; - root = cJSON_Parse(content); - if (root == NULL) { - dError("failed to read %s since invalid json format", file); - goto PRASE_MNODE_OVER; - } - - cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); - if (!deployed || deployed->type != cJSON_Number) { - dError("failed to read %s since deployed not found", file); - goto PRASE_MNODE_OVER; - } - pMgmt->deployed = deployed->valueint; - - cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); - if (!dropped || dropped->type != cJSON_Number) { - dError("failed to read %s since dropped not found", file); - goto PRASE_MNODE_OVER; - } - pMgmt->dropped = dropped->valueint; - - cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes"); - if (!mnodes || mnodes->type != cJSON_Array) { - dError("failed to read %s since nodes not found", file); - goto PRASE_MNODE_OVER; - } - - pMgmt->replica = cJSON_GetArraySize(mnodes); - if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) { - dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica); - goto PRASE_MNODE_OVER; - } - - for (int32_t i = 0; i < pMgmt->replica; ++i) { - cJSON *node = cJSON_GetArrayItem(mnodes, i); - if (node == NULL) break; - - SReplica *pReplica = &pMgmt->replicas[i]; - - cJSON *id = cJSON_GetObjectItem(node, "id"); - if (!id || id->type != cJSON_Number) { - dError("failed to read %s since id not found", file); - goto PRASE_MNODE_OVER; - } - pReplica->id = id->valueint; - - cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn"); - if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { - dError("failed to read %s since fqdn not found", file); - goto PRASE_MNODE_OVER; - } - tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN); - - cJSON *port = cJSON_GetObjectItem(node, "port"); - if (!port || port->type != cJSON_Number) { - dError("failed to read %s since port not found", file); - goto PRASE_MNODE_OVER; - } - pReplica->port = port->valueint; - } - - code = 0; - dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); - -PRASE_MNODE_OVER: - if (content != NULL) free(content); - if (root != NULL) cJSON_Delete(root); - if (pFile != NULL) taosCloseFile(&pFile); - - terrno = code; - return code; -} - - int32_t mmWriteFile(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - - char file[PATH_MAX + 20]; - snprintf(file, PATH_MAX + 20, "%s/mnode.json.bak", pDnode->dir.dnode); - - // FILE *fp = fopen(file, "w"); - TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (pFile == NULL) { - terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR; - dError("failed to write %s since %s", file, terrstr()); - return -1; - } - - int32_t len = 0; - int32_t maxLen = 4096; - char *content = calloc(1, maxLen + 1); - - len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed); - - len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped); - len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n"); - for (int32_t i = 0; i < pMgmt->replica; ++i) { - SReplica *pReplica = &pMgmt->replicas[i]; - len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); - len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); - len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port); - if (i < pMgmt->replica - 1) { - len += snprintf(content + len, maxLen - len, " },{\n"); - } else { - len += snprintf(content + len, maxLen - len, " }]\n"); - } - } - len += snprintf(content + len, maxLen - len, "}\n"); - - taosWriteFile(pFile, content, len); - taosFsyncFile(pFile); - taosCloseFile(&pFile); - free(content); - - char realfile[PATH_MAX + 20]; - snprintf(realfile, PATH_MAX + 20, "%s/mnode.json", pDnode->dir.dnode); - - if (taosRenameFile(file, realfile) != 0) { - terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR; - dError("failed to rename %s since %s", file, terrstr()); - return -1; - } - - dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped); - return 0; -} - static int32_t mmStartWorker(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, dndProcessMnodeQueue) != 0) { @@ -256,14 +106,40 @@ static bool mmDeployRequired(SDnode *pDnode) { return true; } -static int32_t dndPutMsgToMWriteQ(SDnode *pDnode, SRpcMsg *pRpcMsg) { - dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pRpcMsg); - return 0; +static int32_t mmPutMsgToQueue(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpcMsg) { + int32_t contLen = sizeof(SMnodeMsg) + pRpcMsg->contLen; + SMnodeMsg *pMnodeMsg = taosAllocateQitem(contLen); + if (pMnodeMsg == NULL) { + return -1; + } + + pMnodeMsg->contLen = pRpcMsg->contLen; + pMnodeMsg->pCont = (char *)pMnodeMsg + sizeof(SMnodeMsg); + memcpy(pMnodeMsg->pCont, pRpcMsg->pCont, pRpcMsg->contLen); + rpcFreeCont(pRpcMsg->pCont); + + int32_t code = mmWriteToWorker(pDnode, pWorker, pMnodeMsg); + if (code != 0) { + taosFreeQitem(pMnodeMsg); + } + + return code; } -static int32_t dndPutMsgToMReadQ(SDnode *pDnode, SRpcMsg *pRpcMsg) { - dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pRpcMsg); - return 0; +static int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg) { + return mmPutMsgToQueue(pDnode, &pDnode->mmgmt.writeWorker, pRpcMsg); +} + +static int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg) { + return mmPutMsgToQueue(pDnode, &pDnode->mmgmt.readWorker, pRpcMsg); +} + +static void mmProcessChildQueue(SDnode *pDnode, SBlockItem *pBlock) { + SMnodeMsg *pMsg = (SMnodeMsg *)pBlock->pCont; + + if (mmWriteToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg) != 0) { + //todo + } } static void mmInitOptionImp(SDnode *pDnode, SMnodeOpt *pOption) { @@ -271,8 +147,8 @@ static void mmInitOptionImp(SDnode *pDnode, SMnodeOpt *pOption) { pOption->sendReqToDnodeFp = dndSendReqToDnode; pOption->sendReqToMnodeFp = dndSendReqToMnode; pOption->sendRedirectRspFp = dndSendRedirectRsp; - pOption->putReqToMWriteQFp = dndPutMsgToMWriteQ; - pOption->putReqToMReadQFp = dndPutMsgToMReadQ; + pOption->putReqToMWriteQFp = mmPutMsgToWriteQueue; + pOption->putReqToMReadQFp = mmPutMsgToReadQueue; pOption->dnodeId = dndGetDnodeId(pDnode); pOption->clusterId = dndGetClusterId(pDnode); } @@ -364,23 +240,18 @@ static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption) { return 0; } -static void dndMnodeProcessChildQueue(SDnode *pDnode, SBlockItem *pBlock) { - SRpcMsg *pMsg = (SRpcMsg *)pBlock->pCont; - dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg); - free(pBlock); -} static void dndMnodeProcessParentQueue(SMnodeMgmt *pMgmt, SBlockItem *pItem) {} static int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->multiProcess = true; + pMgmt->singleProc = false; int32_t code = mmOpenImp(pDnode, pOption); - if (code == 0 && pMgmt->multiProcess) { + if (code == 0 && !pMgmt->singleProc) { SProcCfg cfg = {0}; - cfg.childFp = (ProcFp)dndMnodeProcessChildQueue; + cfg.childFp = (ProcFp)mmProcessChildQueue; cfg.parentFp = (ProcFp)dndMnodeProcessParentQueue; cfg.childQueueSize = 1024 * 1024; cfg.parentQueueSize = 1024 * 1024; @@ -400,7 +271,7 @@ static int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) { static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SMnode *pMnode = dndAcquireMnode(pDnode); + SMnode *pMnode = mmAcquire(pDnode); if (pMnode == NULL) { dError("failed to alter mnode since %s", terrstr()); return -1; @@ -408,18 +279,18 @@ static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption) { if (mndAlter(pMnode, pOption) != 0) { dError("failed to alter mnode since %s", terrstr()); - dndReleaseMnode(pDnode, pMnode); + mmRelease(pDnode, pMnode); return -1; } - dndReleaseMnode(pDnode, pMnode); + mmRelease(pDnode, pMnode); return 0; } static int32_t dndDropMnode(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SMnode *pMnode = dndAcquireMnode(pDnode); + SMnode *pMnode = mmAcquire(pDnode); if (pMnode == NULL) { dError("failed to drop mnode since %s", terrstr()); return -1; @@ -434,12 +305,12 @@ static int32_t dndDropMnode(SDnode *pDnode) { pMgmt->dropped = 0; taosRUnLockLatch(&pMgmt->latch); - dndReleaseMnode(pDnode, pMnode); + mmRelease(pDnode, pMnode); dError("failed to drop mnode since %s", terrstr()); return -1; } - dndReleaseMnode(pDnode, pMnode); + mmRelease(pDnode, pMnode); mmStopWorker(pDnode); pMgmt->deployed = 0; mmWriteFile(pDnode); @@ -470,9 +341,9 @@ int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { return -1; } - SMnode *pMnode = dndAcquireMnode(pDnode); + SMnode *pMnode = mmAcquire(pDnode); if (pMnode != NULL) { - dndReleaseMnode(pDnode, pMnode); + mmRelease(pDnode, pMnode); terrno = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED; dError("failed to create mnode since %s", terrstr()); return -1; @@ -502,7 +373,7 @@ int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { return -1; } - SMnode *pMnode = dndAcquireMnode(pDnode); + SMnode *pMnode = mmAcquire(pDnode); if (pMnode == NULL) { terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; dError("failed to alter mnode since %s", terrstr()); @@ -511,7 +382,7 @@ int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { dDebug("start to alter mnode"); int32_t code = dndAlterMnode(pDnode, &option); - dndReleaseMnode(pDnode, pMnode); + mmRelease(pDnode, pMnode); return code; } @@ -529,7 +400,7 @@ int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { return -1; } - SMnode *pMnode = dndAcquireMnode(pDnode); + SMnode *pMnode = mmAcquire(pDnode); if (pMnode == NULL) { terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; dError("failed to drop mnode since %s", terrstr()); @@ -538,7 +409,7 @@ int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { dDebug("start to drop mnode"); int32_t code = dndDropMnode(pDnode); - dndReleaseMnode(pDnode, pMnode); + mmRelease(pDnode, pMnode); return code; } @@ -546,66 +417,15 @@ int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SMnode *pMnode = dndAcquireMnode(pDnode); + SMnode *pMnode = mmAcquire(pDnode); if (pMnode != NULL) { mndProcessMsg(pMsg); - dndReleaseMnode(pDnode, pMnode); + mmRelease(pDnode, pMnode); } else { mndSendRsp(pMsg, terrno); } - mndCleanupMsg(pMsg); -} - -static void dndWriteMnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpcMsg) { - int32_t code = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; - - SMnode *pMnode = dndAcquireMnode(pDnode); - if (pMnode != NULL) { - SMnodeMsg *pMsg = mndInitMsg(pMnode, pRpcMsg); - if (pMsg == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - } else { - code = dndWriteMsgToWorker(pWorker, pMsg, 0); - if (code != 0) code = terrno; - } - - if (code != 0) { - mndCleanupMsg(pMsg); - } - } - dndReleaseMnode(pDnode, pMnode); - - if (code != 0) { - if (pRpcMsg->msgType & 1u) { - if (code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || code == TSDB_CODE_APP_NOT_READY) { - dndSendRedirectRsp(pDnode, pRpcMsg); - } else { - SRpcMsg rsp = {.handle = pRpcMsg->handle, .ahandle = pRpcMsg->ahandle, .code = code}; - rpcSendResponse(&rsp); - } - } - rpcFreeCont(pRpcMsg->pCont); - } -} - -static void dndMnodeWriteToChildQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - taosProcPushChild(pMgmt->pProcess, pMsg, sizeof(SRpcMsg)); -} - -void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - dndMnodeWriteToChildQueue(&pDnode->mmgmt, pMsg); - // dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg); -} - -void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - dndMnodeWriteToChildQueue(&pDnode->mmgmt, pMsg); - // dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMsg); -} - -void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - dndMnodeWriteToChildQueue(&pDnode->mmgmt, pMsg); - // dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMsg); + // mndCleanupMsg(pMsg); } int32_t mmInit(SDnode *pDnode) { @@ -614,6 +434,7 @@ int32_t mmInit(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; taosInitRWLatch(&pMgmt->latch); + mmInitMsgFp(pMgmt); if (mmReadFile(pDnode) != 0) { goto _OVER; @@ -670,7 +491,7 @@ void mmCleanup(SDnode *pDnode) { int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SMnode *pMnode = dndAcquireMnode(pDnode); + SMnode *pMnode = mmAcquire(pDnode); if (pMnode == NULL) { terrno = TSDB_CODE_APP_NOT_READY; dTrace("failed to get user auth since %s", terrstr()); @@ -678,18 +499,18 @@ int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *enc } int32_t code = mndRetriveAuth(pMnode, user, spi, encrypt, secret, ckey); - dndReleaseMnode(pDnode, pMnode); + mmRelease(pDnode, pMnode); dTrace("user:%s, retrieve auth spi:%d encrypt:%d", user, *spi, *encrypt); return code; } int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, - SMonGrantInfo *pGrantInfo) { - SMnode *pMnode = dndAcquireMnode(pDnode); + SMonGrantInfo *pGrantInfo) { + SMnode *pMnode = mmAcquire(pDnode); if (pMnode == NULL) return -1; int32_t code = mndGetMonitorInfo(pMnode, pClusterInfo, pVgroupInfo, pGrantInfo); - dndReleaseMnode(pDnode, pMnode); + mmRelease(pDnode, pMnode); return code; } diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMsg.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMsg.c index e69de29bb2..0e8934fe4e 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMsg.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMsg.c @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "mm.h" + +#include "dndMgmt.h" +#include "dndTransport.h" +#include "dndWorker.h" + +static int32_t mmProcessWriteMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg); +static int32_t mmProcessSyncMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg); +static int32_t mmProcessReadMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg); + +void mmInitMsgFp(SMnodeMgmt *pMgmt) { + // Requests handled by DNODE + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE_RSP)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_QNODE_RSP)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_QNODE_RSP)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_SNODE_RSP)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_SNODE_RSP)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_BNODE_RSP)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_BNODE_RSP)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE_RSP)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE_RSP)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE_RSP)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE_RSP)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE_RSP)] = mmProcessWriteMsg; + + // Requests handled by MNODE + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONNECT)] = mmProcessReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_ACCT)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_ACCT)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_ACCT)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_USER)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_USER)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_USER)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = mmProcessReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_DNODE)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONFIG_DNODE)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_DNODE)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_MNODE)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_MNODE)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_QNODE)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_QNODE)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_SNODE)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_SNODE)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_BNODE)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_BNODE)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_USE_DB)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_DB)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SYNC_DB)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_COMPACT_DB)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_FUNC)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_FUNC)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_STB)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TABLE_META)] = mmProcessReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_VGROUP_LIST)] = mmProcessReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_QUERY)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_CONN)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = mmProcessReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = mmProcessReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = mmProcessReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_TRANS)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = mmProcessReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TOPIC)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_MQ_COMMIT_OFFSET)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_SUB_EP)] = mmProcessReadMsg; + + // Requests handled by VNODE + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB_RSP)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB_RSP)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB_RSP)] = mmProcessWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = mmProcessWriteMsg; +} + +void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) { + SMnodeMgmt *pMgmt = &pDnode->mmgmt; + int32_t code = -1; + SMnodeMsg *pMnodeMsg = NULL; + + MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpcMsg->msgType)]; + if (msgFp == NULL) { + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + goto _OVER; + } + + int32_t contLen = sizeof(SMnodeMsg) + pRpcMsg->contLen; + pMnodeMsg = taosAllocateQitem(contLen); + if (pMnodeMsg == NULL) { + goto _OVER; + } + + if (mndBuildMsg(pMnodeMsg, pRpcMsg) != 0) { + goto _OVER; + } + + if (pMgmt->singleProc) { + code = (*msgFp)(pDnode, pMnodeMsg); + } else { + code = taosProcPushChild(pMgmt->pProcess, pMnodeMsg, contLen); + } + +_OVER: + + if (code == 0) { + if (!pMgmt->singleProc) { + taosFreeQitem(pMnodeMsg); + } + } else { + bool isReq = (pRpcMsg->msgType & 1U); + + if (isReq) { + if (terrno == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || terrno == TSDB_CODE_APP_NOT_READY) { + dndSendRedirectRsp(pDnode, pRpcMsg); + } else { + SRpcMsg rsp = {.handle = pRpcMsg->handle, .ahandle = pRpcMsg->ahandle, .code = terrno}; + rpcSendResponse(&rsp); + } + } + taosFreeQitem(pMnodeMsg); + } + + rpcFreeCont(pRpcMsg->pCont); +} + +int32_t mmProcessWriteMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg) { + return mmWriteToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMnodeMsg); +} + +int32_t mmProcessSyncMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg) { + return mmWriteToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMnodeMsg); +} + +int32_t mmProcessReadMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg) { + return mmWriteToWorker(pDnode, &pDnode->mmgmt.readWorker, pMnodeMsg); +} + +int32_t mmWriteToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMnodeMsg) { + SMnode *pMnode = mmAcquire(pDnode); + if (pMnode == NULL) return -1; + + pMnodeMsg->pMnode = pMnode; + int32_t code = dndWriteMsgToWorker(pWorker, pMnodeMsg, 0); + + mmRelease(pDnode, pMnode); + return code; +} diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 0c30010bd5..825a9ad72d 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -32,91 +32,91 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { // Requests handled by DNODE pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE)] = dndProcessMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE)] = dndProcessMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE)] = dndProcessMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE_RSP)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_QNODE)] = dndProcessMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_QNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_QNODE_RSP)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_QNODE)] = dndProcessMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_QNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_QNODE_RSP)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_SNODE)] = dndProcessMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_SNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_SNODE_RSP)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_SNODE)] = dndProcessMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_SNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_SNODE_RSP)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_BNODE)] = dndProcessMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_BNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_BNODE_RSP)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_BNODE)] = dndProcessMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_BNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_BNODE_RSP)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE)] = dndProcessMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE_RSP)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE)] = dndProcessMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE_RSP)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE)] = dndProcessMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE_RSP)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE)] = dndProcessMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE_RSP)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE)] = dndProcessMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE)] = dndProcessMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE_RSP)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_NETWORK_TEST)] = dndProcessMgmtMsg; // Requests handled by MNODE - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONNECT)] = dndProcessMnodeReadMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_ACCT)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_ACCT)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_ACCT)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_USER)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_USER)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_USER)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = dndProcessMnodeReadMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_DNODE)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONFIG_DNODE)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_DNODE)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_MNODE)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_MNODE)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_QNODE)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_QNODE)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_SNODE)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_SNODE)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_BNODE)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_BNODE)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_USE_DB)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_DB)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SYNC_DB)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_COMPACT_DB)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_FUNC)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_FUNC)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_STB)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TABLE_META)] = dndProcessMnodeReadMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_VGROUP_LIST)] = dndProcessMnodeReadMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_QUERY)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_CONN)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONNECT)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_ACCT)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_ACCT)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_ACCT)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_USER)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_USER)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_USER)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_DNODE)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONFIG_DNODE)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_DNODE)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_MNODE)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_MNODE)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_QNODE)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_QNODE)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_SNODE)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_SNODE)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_BNODE)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_BNODE)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_USE_DB)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_DB)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SYNC_DB)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_COMPACT_DB)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_FUNC)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_FUNC)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_STB)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TABLE_META)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_VGROUP_LIST)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_QUERY)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_CONN)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_TRANS)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_TRANS)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH_RSP)] = dndProcessMgmtMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TOPIC)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_MQ_COMMIT_OFFSET)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB_RSP)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_SUB_EP)] = dndProcessMnodeReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TOPIC)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_MQ_COMMIT_OFFSET)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB_RSP)] = mmProcessRpcMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_SUB_EP)] = mmProcessRpcMsg; // Requests handled by VNODE pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg; @@ -138,11 +138,11 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CANCEL_TASK)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TASK)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB)] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB_RSP)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB)] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB_RSP)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB)] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = mmProcessRpcMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TABLE)] = dndProcessVnodeWriteMsg; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index a36168b622..76f8189070 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -664,16 +664,6 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons typedef struct { } SStreamScheduler; -typedef struct SMnodeMsg { - char user[TSDB_USER_LEN]; - char db[TSDB_DB_FNAME_LEN]; - int32_t acctId; - SMnode* pMnode; - int64_t createdTime; - SRpcMsg rpcMsg; - int32_t contLen; - void* pCont; -} SMnodeMsg; #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index db68de5f72..c8da62ae16 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -390,41 +390,26 @@ void mndDestroy(const char *path) { mDebug("mnode is destroyed"); } -SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { - SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg)); - if (pMsg == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpcMsg->ahandle, pRpcMsg->handle); - return NULL; - } - +int32_t mndBuildMsg(SMnodeMsg *pMnodeMsg, SRpcMsg *pRpcMsg) { if (pRpcMsg->msgType != TDMT_MND_TRANS_TIMER && pRpcMsg->msgType != TDMT_MND_MQ_TIMER && pRpcMsg->msgType != TDMT_MND_MQ_DO_REBALANCE && pRpcMsg->msgType != TDMT_MND_TELEM_TIMER) { SRpcConnInfo connInfo = {0}; if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) { - taosFreeQitem(pMsg); terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; mError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpcMsg->ahandle, pRpcMsg->handle); - return NULL; + return -1; } - memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN); + memcpy(pMnodeMsg->user, connInfo.user, TSDB_USER_LEN); } - pMsg->pMnode = pMnode; - pMsg->rpcMsg = *pRpcMsg; - pMsg->createdTime = taosGetTimestampSec(); + pMnodeMsg->rpcMsg = *pRpcMsg; + pMnodeMsg->createdTime = taosGetTimestampSec(); + pMnodeMsg->pCont = (char*)pMnodeMsg + sizeof(pMnodeMsg); if (pRpcMsg != NULL) { - mTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMsg, pRpcMsg->ahandle, pRpcMsg->handle, pMsg->user); + mTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMnodeMsg, pRpcMsg->ahandle, pRpcMsg->handle, pMnodeMsg->user); } - return pMsg; -} - -void mndCleanupMsg(SMnodeMsg *pMsg) { - mTrace("msg:%p, is destroyed", pMsg); - rpcFreeCont(pMsg->rpcMsg.pCont); - pMsg->rpcMsg.pCont = NULL; - taosFreeQitem(pMsg); + return 0; } void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {