diff --git a/source/dnode/mgmt/snode/src/smWorker.c b/source/dnode/mgmt/snode/src/smWorker.c index 35bc79da3d..67476efd46 100644 --- a/source/dnode/mgmt/snode/src/smWorker.c +++ b/source/dnode/mgmt/snode/src/smWorker.c @@ -14,157 +14,13 @@ */ #define _DEFAULT_SOURCE -// #include "dndSnode.h" -// #include "dm.h" -// #include "dndTransport.h" -// #include "dndWorker.h" +#include "smInt.h" -#if 0 - -typedef struct { - int32_t vgId; - int32_t refCount; - int32_t snVersion; - int8_t dropped; - char *path; - SSnode *pImpl; - STaosQueue *pSharedQ; - STaosQueue *pUniqueQ; -} SSnodeObj; static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg); static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs); -static SSnode *dndAcquireSnode(SDnode *pDnode) { - SSnodeMgmt *pMgmt = &pDnode->smgmt; - SSnode *pSnode = NULL; - int32_t refCount = 0; - - taosRLockLatch(&pMgmt->latch); - if (pMgmt->deployed && !pMgmt->dropped && pMgmt->pSnode != NULL) { - refCount = atomic_add_fetch_32(&pMgmt->refCount, 1); - pSnode = pMgmt->pSnode; - } else { - terrno = TSDB_CODE_NODE_NOT_DEPLOYED; - } - taosRUnLockLatch(&pMgmt->latch); - - if (pSnode != NULL) { - dTrace("acquire snode, refCount:%d", refCount); - } - return pSnode; -} - -static void dndReleaseSnode(SDnode *pDnode, SSnode *pSnode) { - if (pSnode == NULL) return; - - SSnodeMgmt *pMgmt = &pDnode->smgmt; - taosRLockLatch(&pMgmt->latch); - int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); - taosRUnLockLatch(&pMgmt->latch); - dTrace("release snode, refCount:%d", refCount); -} - -static int32_t dndReadSnodeFile(SDnode *pDnode) { - SSnodeMgmt *pMgmt = &pDnode->smgmt; - int32_t code = TSDB_CODE_NODE_PARSE_FILE_ERROR; - int32_t len = 0; - int32_t maxLen = 1024; - char *content = calloc(1, maxLen + 1); - cJSON *root = NULL; - - char file[PATH_MAX + 20]; - snprintf(file, PATH_MAX + 20, "%s/snode.json", pDnode->dir.dnode); - - // FILE *fp = fopen(file, "r"); - TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); - if (pFile == NULL) { - dDebug("file %s not exist", file); - code = 0; - goto PRASE_SNODE_OVER; - } - - len = (int32_t)taosReadFile(pFile, content, maxLen); - if (len <= 0) { - dError("failed to read %s since content is null", file); - goto PRASE_SNODE_OVER; - } - - content[len] = 0; - root = cJSON_Parse(content); - if (root == NULL) { - dError("failed to read %s since invalid json format", file); - goto PRASE_SNODE_OVER; - } - - cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); - if (!deployed || deployed->type != cJSON_Number) { - dError("failed to read %s since deployed not found", file); - goto PRASE_SNODE_OVER; - } - pMgmt->deployed = deployed->valueint; - - cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); - if (!dropped || dropped->type != cJSON_Number) { - dError("failed to read %s since dropped not found", file); - goto PRASE_SNODE_OVER; - } - pMgmt->dropped = dropped->valueint; - - code = 0; - dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); - -PRASE_SNODE_OVER: - if (content != NULL) free(content); - if (root != NULL) cJSON_Delete(root); - if (pFile != NULL) taosCloseFile(&pFile); - - terrno = code; - return code; -} - -static int32_t dndWriteSnodeFile(SDnode *pDnode) { - SSnodeMgmt *pMgmt = &pDnode->smgmt; - - char file[PATH_MAX + 20]; - snprintf(file, PATH_MAX + 20, "%s/snode.json", pDnode->dir.dnode); - - // FILE *fp = fopen(file, "w"); - TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (pFile == NULL) { - terrno = TSDB_CODE_NODE_WRITE_FILE_ERROR; - dError("failed to write %s since %s", file, terrstr()); - return -1; - } - - int32_t len = 0; - int32_t maxLen = 1024; - char *content = calloc(1, maxLen + 1); - - len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed); - len += snprintf(content + len, maxLen - len, " \"dropped\": %d\n", pMgmt->dropped); - len += snprintf(content + len, maxLen - len, "}\n"); - - taosWriteFile(pFile, content, len); - taosFsyncFile(pFile); - taosCloseFile(&pFile); - free(content); - - char realfile[PATH_MAX + 20]; - snprintf(realfile, PATH_MAX + 20, "%s/snode.json", pDnode->dir.dnode); - - if (taosRenameFile(file, realfile) != 0) { - terrno = TSDB_CODE_NODE_WRITE_FILE_ERROR; - dError("failed to rename %s since %s", file, terrstr()); - return -1; - } - - dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped); - return 0; -} - static int32_t dndStartSnodeWorker(SDnode *pDnode) { SSnodeMgmt *pMgmt = &pDnode->smgmt; pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *)); @@ -206,124 +62,6 @@ static void dndStopSnodeWorker(SDnode *pDnode) { taosArrayDestroy(pMgmt->uniqueWorkers); } -static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) { - pOption->pDnode = pDnode; - pOption->sendReqFp = dndSendReqToDnode; - pOption->sendMnodeReqFp = dndSendReqToMnode; - pOption->sendRedirectRspFp = dndSendRedirectRsp; - pOption->dnodeId = pDnode->dnodeId; - pOption->clusterId = pDnode->clusterId; - pOption->sver = tsVersion; -} - -static int32_t dndOpenSnode(SDnode *pDnode) { - SSnodeMgmt *pMgmt = &pDnode->smgmt; - SSnode *pSnode = dndAcquireSnode(pDnode); - if (pSnode != NULL) { - dndReleaseSnode(pDnode, pSnode); - terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED; - dError("failed to create snode since %s", terrstr()); - return -1; - } - - SSnodeOpt option = {0}; - dndBuildSnodeOption(pDnode, &option); - - pSnode = sndOpen(pDnode->dir.snode, &option); - if (pSnode == NULL) { - dError("failed to open snode since %s", terrstr()); - return -1; - } - - if (dndStartSnodeWorker(pDnode) != 0) { - dError("failed to start snode worker since %s", terrstr()); - sndClose(pSnode); - return -1; - } - - pMgmt->deployed = 1; - if (dndWriteSnodeFile(pDnode) != 0) { - pMgmt->deployed = 0; - dError("failed to write snode file since %s", terrstr()); - dndStopSnodeWorker(pDnode); - sndClose(pSnode); - return -1; - } - - taosWLockLatch(&pMgmt->latch); - pMgmt->pSnode = pSnode; - taosWUnLockLatch(&pMgmt->latch); - - dInfo("snode open successfully"); - return 0; -} - -static int32_t dndDropSnode(SDnode *pDnode) { - SSnodeMgmt *pMgmt = &pDnode->smgmt; - - SSnode *pSnode = dndAcquireSnode(pDnode); - if (pSnode == NULL) { - dError("failed to drop snode since %s", terrstr()); - return -1; - } - - taosRLockLatch(&pMgmt->latch); - pMgmt->dropped = 1; - taosRUnLockLatch(&pMgmt->latch); - - if (dndWriteSnodeFile(pDnode) != 0) { - taosRLockLatch(&pMgmt->latch); - pMgmt->dropped = 0; - taosRUnLockLatch(&pMgmt->latch); - - dndReleaseSnode(pDnode, pSnode); - dError("failed to drop snode since %s", terrstr()); - return -1; - } - - dndReleaseSnode(pDnode, pSnode); - dndStopSnodeWorker(pDnode); - pMgmt->deployed = 0; - dndWriteSnodeFile(pDnode); - sndClose(pSnode); - pMgmt->pSnode = NULL; - sndDestroy(pDnode->dir.snode); - - return 0; -} - -int32_t smProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) { - SDCreateSnodeReq createReq = {0}; - if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - if (createReq.dnodeId != pDnode->dnodeId) { - terrno = TSDB_CODE_NODE_INVALID_OPTION; - dError("failed to create snode since %s", terrstr()); - return -1; - } else { - return dndOpenSnode(pDnode); - } -} - -int32_t smProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) { - SDDropSnodeReq dropReq = {0}; - if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - if (dropReq.dnodeId != pDnode->dnodeId) { - terrno = TSDB_CODE_NODE_INVALID_OPTION; - dError("failed to drop snode since %s", terrstr()); - return -1; - } else { - return dndDropSnode(pDnode); - } -} - static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) { /*SSnodeMgmt *pMgmt = &pDnode->smgmt;*/ int32_t code = TSDB_CODE_NODE_NOT_DEPLOYED; @@ -459,33 +197,3 @@ void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg); } - -int32_t dndInitSnode(SDnode *pDnode) { - SSnodeMgmt *pMgmt = &pDnode->smgmt; - taosInitRWLatch(&pMgmt->latch); - - if (dndReadSnodeFile(pDnode) != 0) { - return -1; - } - - if (pMgmt->dropped) { - dInfo("snode has been deployed and needs to be deleted"); - sndDestroy(pDnode->dir.snode); - return 0; - } - - if (!pMgmt->deployed) return 0; - - return dndOpenSnode(pDnode); -} - -void dndCleanupSnode(SDnode *pDnode) { - SSnodeMgmt *pMgmt = &pDnode->smgmt; - if (pMgmt->pSnode) { - dndStopSnodeWorker(pDnode); - sndClose(pMgmt->pSnode); - pMgmt->pSnode = NULL; - } -} - -#endif \ No newline at end of file