shm
This commit is contained in:
parent
a4c761259b
commit
da145d67cf
|
@ -14,157 +14,13 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
// #include "dndSnode.h"
|
#include "smInt.h"
|
||||||
// #include "dm.h"
|
|
||||||
// #include "dndTransport.h"
|
|
||||||
// #include "dndWorker.h"
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int32_t vgId;
|
|
||||||
int32_t refCount;
|
|
||||||
int32_t snVersion;
|
|
||||||
int8_t dropped;
|
|
||||||
char *path;
|
|
||||||
SSnode *pImpl;
|
|
||||||
STaosQueue *pSharedQ;
|
|
||||||
STaosQueue *pUniqueQ;
|
|
||||||
} SSnodeObj;
|
|
||||||
|
|
||||||
static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg);
|
static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg);
|
||||||
|
|
||||||
static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs);
|
static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs);
|
||||||
|
|
||||||
static SSnode *dndAcquireSnode(SDnode *pDnode) {
|
|
||||||
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
|
||||||
SSnode *pSnode = NULL;
|
|
||||||
int32_t refCount = 0;
|
|
||||||
|
|
||||||
taosRLockLatch(&pMgmt->latch);
|
|
||||||
if (pMgmt->deployed && !pMgmt->dropped && pMgmt->pSnode != NULL) {
|
|
||||||
refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
|
|
||||||
pSnode = pMgmt->pSnode;
|
|
||||||
} else {
|
|
||||||
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
|
|
||||||
}
|
|
||||||
taosRUnLockLatch(&pMgmt->latch);
|
|
||||||
|
|
||||||
if (pSnode != NULL) {
|
|
||||||
dTrace("acquire snode, refCount:%d", refCount);
|
|
||||||
}
|
|
||||||
return pSnode;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dndReleaseSnode(SDnode *pDnode, SSnode *pSnode) {
|
|
||||||
if (pSnode == NULL) return;
|
|
||||||
|
|
||||||
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
|
||||||
taosRLockLatch(&pMgmt->latch);
|
|
||||||
int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
|
|
||||||
taosRUnLockLatch(&pMgmt->latch);
|
|
||||||
dTrace("release snode, refCount:%d", refCount);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dndReadSnodeFile(SDnode *pDnode) {
|
|
||||||
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
|
||||||
int32_t code = TSDB_CODE_NODE_PARSE_FILE_ERROR;
|
|
||||||
int32_t len = 0;
|
|
||||||
int32_t maxLen = 1024;
|
|
||||||
char *content = calloc(1, maxLen + 1);
|
|
||||||
cJSON *root = NULL;
|
|
||||||
|
|
||||||
char file[PATH_MAX + 20];
|
|
||||||
snprintf(file, PATH_MAX + 20, "%s/snode.json", pDnode->dir.dnode);
|
|
||||||
|
|
||||||
// FILE *fp = fopen(file, "r");
|
|
||||||
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
|
|
||||||
if (pFile == NULL) {
|
|
||||||
dDebug("file %s not exist", file);
|
|
||||||
code = 0;
|
|
||||||
goto PRASE_SNODE_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
len = (int32_t)taosReadFile(pFile, content, maxLen);
|
|
||||||
if (len <= 0) {
|
|
||||||
dError("failed to read %s since content is null", file);
|
|
||||||
goto PRASE_SNODE_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
content[len] = 0;
|
|
||||||
root = cJSON_Parse(content);
|
|
||||||
if (root == NULL) {
|
|
||||||
dError("failed to read %s since invalid json format", file);
|
|
||||||
goto PRASE_SNODE_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
|
|
||||||
if (!deployed || deployed->type != cJSON_Number) {
|
|
||||||
dError("failed to read %s since deployed not found", file);
|
|
||||||
goto PRASE_SNODE_OVER;
|
|
||||||
}
|
|
||||||
pMgmt->deployed = deployed->valueint;
|
|
||||||
|
|
||||||
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
|
|
||||||
if (!dropped || dropped->type != cJSON_Number) {
|
|
||||||
dError("failed to read %s since dropped not found", file);
|
|
||||||
goto PRASE_SNODE_OVER;
|
|
||||||
}
|
|
||||||
pMgmt->dropped = dropped->valueint;
|
|
||||||
|
|
||||||
code = 0;
|
|
||||||
dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped);
|
|
||||||
|
|
||||||
PRASE_SNODE_OVER:
|
|
||||||
if (content != NULL) free(content);
|
|
||||||
if (root != NULL) cJSON_Delete(root);
|
|
||||||
if (pFile != NULL) taosCloseFile(&pFile);
|
|
||||||
|
|
||||||
terrno = code;
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dndWriteSnodeFile(SDnode *pDnode) {
|
|
||||||
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
|
||||||
|
|
||||||
char file[PATH_MAX + 20];
|
|
||||||
snprintf(file, PATH_MAX + 20, "%s/snode.json", pDnode->dir.dnode);
|
|
||||||
|
|
||||||
// FILE *fp = fopen(file, "w");
|
|
||||||
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
|
||||||
if (pFile == NULL) {
|
|
||||||
terrno = TSDB_CODE_NODE_WRITE_FILE_ERROR;
|
|
||||||
dError("failed to write %s since %s", file, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t len = 0;
|
|
||||||
int32_t maxLen = 1024;
|
|
||||||
char *content = calloc(1, maxLen + 1);
|
|
||||||
|
|
||||||
len += snprintf(content + len, maxLen - len, "{\n");
|
|
||||||
len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed);
|
|
||||||
len += snprintf(content + len, maxLen - len, " \"dropped\": %d\n", pMgmt->dropped);
|
|
||||||
len += snprintf(content + len, maxLen - len, "}\n");
|
|
||||||
|
|
||||||
taosWriteFile(pFile, content, len);
|
|
||||||
taosFsyncFile(pFile);
|
|
||||||
taosCloseFile(&pFile);
|
|
||||||
free(content);
|
|
||||||
|
|
||||||
char realfile[PATH_MAX + 20];
|
|
||||||
snprintf(realfile, PATH_MAX + 20, "%s/snode.json", pDnode->dir.dnode);
|
|
||||||
|
|
||||||
if (taosRenameFile(file, realfile) != 0) {
|
|
||||||
terrno = TSDB_CODE_NODE_WRITE_FILE_ERROR;
|
|
||||||
dError("failed to rename %s since %s", file, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dndStartSnodeWorker(SDnode *pDnode) {
|
static int32_t dndStartSnodeWorker(SDnode *pDnode) {
|
||||||
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
||||||
pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *));
|
pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *));
|
||||||
|
@ -206,124 +62,6 @@ static void dndStopSnodeWorker(SDnode *pDnode) {
|
||||||
taosArrayDestroy(pMgmt->uniqueWorkers);
|
taosArrayDestroy(pMgmt->uniqueWorkers);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) {
|
|
||||||
pOption->pDnode = pDnode;
|
|
||||||
pOption->sendReqFp = dndSendReqToDnode;
|
|
||||||
pOption->sendMnodeReqFp = dndSendReqToMnode;
|
|
||||||
pOption->sendRedirectRspFp = dndSendRedirectRsp;
|
|
||||||
pOption->dnodeId = pDnode->dnodeId;
|
|
||||||
pOption->clusterId = pDnode->clusterId;
|
|
||||||
pOption->sver = tsVersion;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dndOpenSnode(SDnode *pDnode) {
|
|
||||||
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
|
||||||
SSnode *pSnode = dndAcquireSnode(pDnode);
|
|
||||||
if (pSnode != NULL) {
|
|
||||||
dndReleaseSnode(pDnode, pSnode);
|
|
||||||
terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
|
|
||||||
dError("failed to create snode since %s", terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSnodeOpt option = {0};
|
|
||||||
dndBuildSnodeOption(pDnode, &option);
|
|
||||||
|
|
||||||
pSnode = sndOpen(pDnode->dir.snode, &option);
|
|
||||||
if (pSnode == NULL) {
|
|
||||||
dError("failed to open snode since %s", terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dndStartSnodeWorker(pDnode) != 0) {
|
|
||||||
dError("failed to start snode worker since %s", terrstr());
|
|
||||||
sndClose(pSnode);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pMgmt->deployed = 1;
|
|
||||||
if (dndWriteSnodeFile(pDnode) != 0) {
|
|
||||||
pMgmt->deployed = 0;
|
|
||||||
dError("failed to write snode file since %s", terrstr());
|
|
||||||
dndStopSnodeWorker(pDnode);
|
|
||||||
sndClose(pSnode);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosWLockLatch(&pMgmt->latch);
|
|
||||||
pMgmt->pSnode = pSnode;
|
|
||||||
taosWUnLockLatch(&pMgmt->latch);
|
|
||||||
|
|
||||||
dInfo("snode open successfully");
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dndDropSnode(SDnode *pDnode) {
|
|
||||||
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
|
||||||
|
|
||||||
SSnode *pSnode = dndAcquireSnode(pDnode);
|
|
||||||
if (pSnode == NULL) {
|
|
||||||
dError("failed to drop snode since %s", terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosRLockLatch(&pMgmt->latch);
|
|
||||||
pMgmt->dropped = 1;
|
|
||||||
taosRUnLockLatch(&pMgmt->latch);
|
|
||||||
|
|
||||||
if (dndWriteSnodeFile(pDnode) != 0) {
|
|
||||||
taosRLockLatch(&pMgmt->latch);
|
|
||||||
pMgmt->dropped = 0;
|
|
||||||
taosRUnLockLatch(&pMgmt->latch);
|
|
||||||
|
|
||||||
dndReleaseSnode(pDnode, pSnode);
|
|
||||||
dError("failed to drop snode since %s", terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
dndReleaseSnode(pDnode, pSnode);
|
|
||||||
dndStopSnodeWorker(pDnode);
|
|
||||||
pMgmt->deployed = 0;
|
|
||||||
dndWriteSnodeFile(pDnode);
|
|
||||||
sndClose(pSnode);
|
|
||||||
pMgmt->pSnode = NULL;
|
|
||||||
sndDestroy(pDnode->dir.snode);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t smProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) {
|
|
||||||
SDCreateSnodeReq createReq = {0};
|
|
||||||
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
|
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (createReq.dnodeId != pDnode->dnodeId) {
|
|
||||||
terrno = TSDB_CODE_NODE_INVALID_OPTION;
|
|
||||||
dError("failed to create snode since %s", terrstr());
|
|
||||||
return -1;
|
|
||||||
} else {
|
|
||||||
return dndOpenSnode(pDnode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t smProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) {
|
|
||||||
SDDropSnodeReq dropReq = {0};
|
|
||||||
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dropReq.dnodeId != pDnode->dnodeId) {
|
|
||||||
terrno = TSDB_CODE_NODE_INVALID_OPTION;
|
|
||||||
dError("failed to drop snode since %s", terrstr());
|
|
||||||
return -1;
|
|
||||||
} else {
|
|
||||||
return dndDropSnode(pDnode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) {
|
static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
/*SSnodeMgmt *pMgmt = &pDnode->smgmt;*/
|
/*SSnodeMgmt *pMgmt = &pDnode->smgmt;*/
|
||||||
int32_t code = TSDB_CODE_NODE_NOT_DEPLOYED;
|
int32_t code = TSDB_CODE_NODE_NOT_DEPLOYED;
|
||||||
|
@ -459,33 +197,3 @@ void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg);
|
dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dndInitSnode(SDnode *pDnode) {
|
|
||||||
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
|
||||||
taosInitRWLatch(&pMgmt->latch);
|
|
||||||
|
|
||||||
if (dndReadSnodeFile(pDnode) != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pMgmt->dropped) {
|
|
||||||
dInfo("snode has been deployed and needs to be deleted");
|
|
||||||
sndDestroy(pDnode->dir.snode);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!pMgmt->deployed) return 0;
|
|
||||||
|
|
||||||
return dndOpenSnode(pDnode);
|
|
||||||
}
|
|
||||||
|
|
||||||
void dndCleanupSnode(SDnode *pDnode) {
|
|
||||||
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
|
||||||
if (pMgmt->pSnode) {
|
|
||||||
dndStopSnodeWorker(pDnode);
|
|
||||||
sndClose(pMgmt->pSnode);
|
|
||||||
pMgmt->pSnode = NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif
|
|
Loading…
Reference in New Issue