add qnode
This commit is contained in:
parent
6d050231d8
commit
938bd01b17
|
@ -78,24 +78,14 @@ void qndClose(SQnode *pQnode);
|
|||
int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad);
|
||||
|
||||
/**
|
||||
* @brief Process a query message.
|
||||
* @brief Process a query or fetch message.
|
||||
*
|
||||
* @param pQnode The qnode object.
|
||||
* @param pMsg The request message
|
||||
* @param pRsp The response message
|
||||
* @return int32_t 0 for success, -1 for failure
|
||||
*/
|
||||
int32_t qndProcessQueryReq(SQnode *pQnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
|
||||
/**
|
||||
* @brief Process a fetch message.
|
||||
*
|
||||
* @param pQnode The qnode object.
|
||||
* @param pMsg The request message
|
||||
* @param pRsp The response message
|
||||
* @return int32_t 0 for success, -1 for failure
|
||||
*/
|
||||
int32_t qndProcessFetchReq(SQnode *pQnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
int32_t qndProcessMsg(SQnode *pQnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -69,6 +69,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x0107)
|
||||
#define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0108)
|
||||
#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0109)
|
||||
#define TSDB_CODE_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x010A)
|
||||
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0110)
|
||||
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0111)
|
||||
#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0112)
|
||||
|
|
|
@ -54,8 +54,20 @@ extern int32_t dDebugFlag;
|
|||
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
|
||||
|
||||
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EStat;
|
||||
typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EDndWorkerType;
|
||||
typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps);
|
||||
|
||||
typedef struct {
|
||||
EDndWorkerType type;
|
||||
const char *name;
|
||||
int32_t minNum;
|
||||
int32_t maxNum;
|
||||
FProcessItem fp;
|
||||
SDnode *pDnode;
|
||||
taos_queue queue;
|
||||
SWorkerPool pool;
|
||||
} SDnodeWorker;
|
||||
|
||||
typedef struct {
|
||||
char *dnode;
|
||||
char *mnode;
|
||||
|
@ -100,16 +112,13 @@ typedef struct {
|
|||
} SMnodeMgmt;
|
||||
|
||||
typedef struct {
|
||||
int32_t refCount;
|
||||
int8_t deployed;
|
||||
int8_t dropped;
|
||||
char *file;
|
||||
SQnode *pQnode;
|
||||
SRWLatch latch;
|
||||
taos_queue pQueryQ;
|
||||
taos_queue pFetchQ;
|
||||
SWorkerPool queryPool;
|
||||
SWorkerPool fetchPool;
|
||||
int32_t refCount;
|
||||
int8_t deployed;
|
||||
int8_t dropped;
|
||||
SQnode *pQnode;
|
||||
SRWLatch latch;
|
||||
SDnodeWorker queryWorker;
|
||||
SDnodeWorker fetchWorker;
|
||||
} SQnodeMgmt;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_DND_WORKER_H_
|
||||
#define _TD_DND_WORKER_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "dndInt.h"
|
||||
|
||||
int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type, const char *name, int32_t minNum,
|
||||
int32_t maxNum, FProcessItem fp);
|
||||
void dndCleanupWorker(SDnodeWorker *pWorker);
|
||||
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_DND_WORKER_H_*/
|
|
@ -17,30 +17,9 @@
|
|||
#include "dndQnode.h"
|
||||
#include "dndDnode.h"
|
||||
#include "dndTransport.h"
|
||||
#include "dndWorker.h"
|
||||
|
||||
static int32_t dndInitQnodeQueryWorker(SDnode *pDnode);
|
||||
static int32_t dndInitQnodeFetchWorker(SDnode *pDnode);
|
||||
static void dndCleanupQnodeQueryWorker(SDnode *pDnode);
|
||||
static void dndCleanupQnodeFetchWorker(SDnode *pDnode);
|
||||
static int32_t dndAllocQnodeQueryQueue(SDnode *pDnode);
|
||||
static int32_t dndAllocQnodeFetchQueue(SDnode *pDnode);
|
||||
static void dndFreeQnodeQueryQueue(SDnode *pDnode);
|
||||
static void dndFreeQnodeFetchQueue(SDnode *pDnode);
|
||||
|
||||
static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg);
|
||||
static int32_t dndWriteQnodeMsgToQueue(SQnode *pQnode, taos_queue pQueue, SRpcMsg *pRpcMsg);
|
||||
|
||||
static int32_t dndStartQnodeWorker(SDnode *pDnode);
|
||||
static void dndStopQnodeWorker(SDnode *pDnode);
|
||||
|
||||
static SQnode *dndAcquireQnode(SDnode *pDnode);
|
||||
static void dndReleaseQnode(SDnode *pDnode, SQnode *pQnode);
|
||||
|
||||
static int32_t dndReadQnodeFile(SDnode *pDnode);
|
||||
static int32_t dndWriteQnodeFile(SDnode *pDnode);
|
||||
|
||||
static int32_t dndOpenQnode(SDnode *pDnode);
|
||||
static int32_t dndDropQnode(SDnode *pDnode);
|
||||
static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg);
|
||||
|
||||
static SQnode *dndAcquireQnode(SDnode *pDnode) {
|
||||
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
||||
|
@ -85,44 +64,47 @@ static int32_t dndReadQnodeFile(SDnode *pDnode) {
|
|||
char *content = calloc(1, maxLen + 1);
|
||||
cJSON *root = NULL;
|
||||
|
||||
FILE *fp = fopen(pMgmt->file, "r");
|
||||
char file[PATH_MAX + 20];
|
||||
snprintf(file, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode);
|
||||
|
||||
FILE *fp = fopen(file, "r");
|
||||
if (fp == NULL) {
|
||||
dDebug("file %s not exist", pMgmt->file);
|
||||
dDebug("file %s not exist", file);
|
||||
code = 0;
|
||||
goto PRASE_MNODE_OVER;
|
||||
goto PRASE_QNODE_OVER;
|
||||
}
|
||||
|
||||
len = (int32_t)fread(content, 1, maxLen, fp);
|
||||
if (len <= 0) {
|
||||
dError("failed to read %s since content is null", pMgmt->file);
|
||||
goto PRASE_MNODE_OVER;
|
||||
dError("failed to read %s since content is null", file);
|
||||
goto PRASE_QNODE_OVER;
|
||||
}
|
||||
|
||||
content[len] = 0;
|
||||
root = cJSON_Parse(content);
|
||||
if (root == NULL) {
|
||||
dError("failed to read %s since invalid json format", pMgmt->file);
|
||||
goto PRASE_MNODE_OVER;
|
||||
dError("failed to read %s since invalid json format", file);
|
||||
goto PRASE_QNODE_OVER;
|
||||
}
|
||||
|
||||
cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
|
||||
if (!deployed || deployed->type != cJSON_Number) {
|
||||
dError("failed to read %s since deployed not found", pMgmt->file);
|
||||
goto PRASE_MNODE_OVER;
|
||||
dError("failed to read %s since deployed not found", file);
|
||||
goto PRASE_QNODE_OVER;
|
||||
}
|
||||
pMgmt->deployed = deployed->valueint;
|
||||
|
||||
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
|
||||
if (!dropped || dropped->type != cJSON_Number) {
|
||||
dError("failed to read %s since dropped not found", pMgmt->file);
|
||||
goto PRASE_MNODE_OVER;
|
||||
dError("failed to read %s since dropped not found", file);
|
||||
goto PRASE_QNODE_OVER;
|
||||
}
|
||||
pMgmt->dropped = dropped->valueint;
|
||||
|
||||
code = 0;
|
||||
dDebug("succcessed to read file %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped);
|
||||
dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped);
|
||||
|
||||
PRASE_MNODE_OVER:
|
||||
PRASE_QNODE_OVER:
|
||||
if (content != NULL) free(content);
|
||||
if (root != NULL) cJSON_Delete(root);
|
||||
if (fp != NULL) fclose(fp);
|
||||
|
@ -134,8 +116,8 @@ PRASE_MNODE_OVER:
|
|||
static int32_t dndWriteQnodeFile(SDnode *pDnode) {
|
||||
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
||||
|
||||
char file[PATH_MAX + 20] = {0};
|
||||
snprintf(file, sizeof(file), "%s.bak", pMgmt->file);
|
||||
char file[PATH_MAX + 20];
|
||||
snprintf(file, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode);
|
||||
|
||||
FILE *fp = fopen(file, "w");
|
||||
if (fp == NULL) {
|
||||
|
@ -154,41 +136,34 @@ static int32_t dndWriteQnodeFile(SDnode *pDnode) {
|
|||
len += snprintf(content + len, maxLen - len, "}\n");
|
||||
|
||||
fwrite(content, 1, len, fp);
|
||||
taosFfetchFile(fileno(fp));
|
||||
taosFsyncFile(fileno(fp));
|
||||
fclose(fp);
|
||||
free(content);
|
||||
|
||||
if (taosRenameFile(file, pMgmt->file) != 0) {
|
||||
if (taosRenameFile(file, file) != 0) {
|
||||
terrno = TSDB_CODE_DND_QNODE_WRITE_FILE_ERROR;
|
||||
dError("failed to rename %s since %s", pMgmt->file, terrstr());
|
||||
dError("failed to rename %s since %s", file, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
dInfo("successed to write %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped);
|
||||
dInfo("successed to write %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t dndStartQnodeWorker(SDnode *pDnode) {
|
||||
if (dndInitQnodeQueryWorker(pDnode) != 0) {
|
||||
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
||||
if (dndInitWorker(pDnode, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", 0, 1,
|
||||
(FProcessItem)dndProcessQnodeQueue) != 0) {
|
||||
dError("failed to start qnode query worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (dndInitQnodeFetchWorker(pDnode) != 0) {
|
||||
if (dndInitWorker(pDnode, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", 0, 1,
|
||||
(FProcessItem)dndProcessQnodeQueue) != 0) {
|
||||
dError("failed to start qnode fetch worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (dndAllocQnodeQueryQueue(pDnode) != 0) {
|
||||
dError("failed to alloc qnode query queue since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (dndAllocQnodeFetchQueue(pDnode) != 0) {
|
||||
dError("failed to alloc qnode fetch queue since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -199,15 +174,12 @@ static void dndStopQnodeWorker(SDnode *pDnode) {
|
|||
pMgmt->deployed = 0;
|
||||
taosWUnLockLatch(&pMgmt->latch);
|
||||
|
||||
while (pMgmt->refCount > 1) taosMsleep(10);
|
||||
while (!taosQueueEmpty(pMgmt->pQueryQ)) taosMsleep(10);
|
||||
while (!taosQueueEmpty(pMgmt->pFetchQ)) taosMsleep(10);
|
||||
while (pMgmt->refCount > 1) {
|
||||
taosMsleep(10);
|
||||
}
|
||||
|
||||
dndCleanupQnodeQueryWorker(pDnode);
|
||||
dndCleanupQnodeFetchWorker(pDnode);
|
||||
|
||||
dndFreeQnodeQueryQueue(pDnode);
|
||||
dndFreeQnodeFetchQueue(pDnode);
|
||||
dndCleanupWorker(&pMgmt->queryWorker);
|
||||
dndCleanupWorker(&pMgmt->fetchWorker);
|
||||
}
|
||||
|
||||
static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) {
|
||||
|
@ -230,28 +202,17 @@ static int32_t dndOpenQnode(SDnode *pDnode) {
|
|||
dError("failed to open qnode since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
pMgmt->deployed = 1;
|
||||
|
||||
int32_t code = dndWriteQnodeFile(pDnode);
|
||||
if (code != 0) {
|
||||
dError("failed to write qnode file since %s", terrstr());
|
||||
code = terrno;
|
||||
pMgmt->deployed = 0;
|
||||
if (dndStartQnodeWorker(pDnode) != 0) {
|
||||
dError("failed to start qnode worker since %s", terrstr());
|
||||
qndClose(pQnode);
|
||||
// qndDestroy(pDnode->dir.qnode);
|
||||
terrno = code;
|
||||
return -1;
|
||||
}
|
||||
|
||||
code = dndStartQnodeWorker(pDnode);
|
||||
if (code != 0) {
|
||||
dError("failed to start qnode worker since %s", terrstr());
|
||||
code = terrno;
|
||||
pMgmt->deployed = 0;
|
||||
if (dndWriteQnodeFile(pDnode) != 0) {
|
||||
dError("failed to write qnode file since %s", terrstr());
|
||||
dndStopQnodeWorker(pDnode);
|
||||
qndClose(pQnode);
|
||||
// qndDestroy(pDnode->dir.qnode);
|
||||
terrno = code;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -289,7 +250,6 @@ static int32_t dndDropQnode(SDnode *pDnode) {
|
|||
|
||||
dndReleaseQnode(pDnode, pQnode);
|
||||
dndStopQnodeWorker(pDnode);
|
||||
dndWriteQnodeFile(pDnode);
|
||||
qndClose(pQnode);
|
||||
pMgmt->pQnode = NULL;
|
||||
// qndDestroy(pDnode->dir.qnode);
|
||||
|
@ -324,13 +284,11 @@ int32_t dndProcessDropQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
|
|||
static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
||||
SRpcMsg *pRsp = NULL;
|
||||
int32_t code = 0;
|
||||
int32_t code = TSDB_CODE_DND_QNODE_NOT_DEPLOYED;
|
||||
|
||||
SQnode *pQnode = dndAcquireQnode(pDnode);
|
||||
if (pQnode == NULL) {
|
||||
code = -1;
|
||||
} else {
|
||||
code = qndProcessQueryReq(pQnode, pMsg, &pRsp);
|
||||
if (pQnode != NULL) {
|
||||
code = qndProcessMsg(pQnode, pMsg, &pRsp);
|
||||
}
|
||||
|
||||
if (pRsp != NULL) {
|
||||
|
@ -347,135 +305,36 @@ static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) {
|
|||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static int32_t dndWriteQnodeMsgToQueue(SQnode *pQnode, taos_queue pQueue, SRpcMsg *pRpcMsg) {
|
||||
int32_t code = 0;
|
||||
static void dndWriteQnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) {
|
||||
int32_t code = TSDB_CODE_DND_QNODE_NOT_DEPLOYED;
|
||||
|
||||
if (pQnode == NULL || pQueue == NULL) {
|
||||
code = TSDB_CODE_DND_QNODE_NOT_DEPLOYED;
|
||||
} else {
|
||||
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
|
||||
if (pMsg == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
*pMsg = *pRpcMsg;
|
||||
if (taosWriteQitem(pQueue, pMsg) != 0) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
SQnode *pQnode = dndAcquireQnode(pDnode);
|
||||
if (pQnode != NULL) {
|
||||
code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg));
|
||||
}
|
||||
dndReleaseQnode(pDnode, pQnode);
|
||||
|
||||
if (code != 0) {
|
||||
if (pRpcMsg->msgType & 1u) {
|
||||
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
|
||||
if (pMsg->msgType & 1u) {
|
||||
SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
|
||||
rpcSendResponse(&rsp);
|
||||
}
|
||||
rpcFreeCont(pRpcMsg->pCont);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
||||
}
|
||||
|
||||
void dndProcessQnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
||||
SQnode *pQnode = dndAcquireQnode(pDnode);
|
||||
dndWriteQnodeMsgToQueue(pQnode, pMgmt->pQueryQ, pMsg);
|
||||
dndReleaseQnode(pDnode, pQnode);
|
||||
dndWriteQnodeMsgToWorker(pDnode, &pDnode->qmgmt.queryWorker, pMsg);
|
||||
}
|
||||
|
||||
void dndProcessQnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
||||
SQnode *pQnode = dndAcquireQnode(pDnode);
|
||||
dndWriteQnodeMsgToQueue(pQnode, pMgmt->pFetchQ, pMsg);
|
||||
dndReleaseQnode(pDnode, pQnode);
|
||||
}
|
||||
|
||||
static int32_t dndAllocQnodeQueryQueue(SDnode *pDnode) {
|
||||
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
||||
pMgmt->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pDnode, (FProcessItem)dndProcessQnodeQueue);
|
||||
if (pMgmt->pQueryQ == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void dndFreeQnodeQueryQueue(SDnode *pDnode) {
|
||||
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
||||
tWorkerFreeQueue(&pMgmt->queryPool, pMgmt->pQueryQ);
|
||||
pMgmt->pQueryQ = NULL;
|
||||
}
|
||||
|
||||
static int32_t dndInitQnodeQueryWorker(SDnode *pDnode) {
|
||||
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
||||
SWorkerPool *pPool = &pMgmt->queryPool;
|
||||
pPool->name = "qnode-query";
|
||||
pPool->min = 0;
|
||||
pPool->max = 1;
|
||||
if (tWorkerInit(pPool) != 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
dDebug("qnode query worker is initialized");
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void dndCleanupQnodeQueryWorker(SDnode *pDnode) {
|
||||
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
||||
tWorkerCleanup(&pMgmt->queryPool);
|
||||
dDebug("qnode query worker is closed");
|
||||
}
|
||||
|
||||
static int32_t dndAllocQnodeFetchQueue(SDnode *pDnode) {
|
||||
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
||||
pMgmt->pFetchQ = tWorkerAllocQueue(&pMgmt->queryPool, pDnode, (FProcessItem)dndProcessQnodeQueue);
|
||||
if (pMgmt->pFetchQ == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void dndFreeQnodeFetchQueue(SDnode *pDnode) {
|
||||
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
||||
tWorkerFreeQueue(&pMgmt->fetchPool, pMgmt->pFetchQ);
|
||||
pMgmt->pFetchQ = NULL;
|
||||
}
|
||||
|
||||
static int32_t dndInitQnodeFetchWorker(SDnode *pDnode) {
|
||||
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
||||
SWorkerPool *pPool = &pMgmt->fetchPool;
|
||||
pPool->name = "qnode-fetch";
|
||||
pPool->min = 0;
|
||||
pPool->max = 1;
|
||||
if (tWorkerInit(pPool) != 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
dDebug("qnode fetch worker is initialized");
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void dndCleanupQnodeFetchWorker(SDnode *pDnode) {
|
||||
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
||||
tWorkerCleanup(&pMgmt->fetchPool);
|
||||
dDebug("qnode fetch worker is closed");
|
||||
dndWriteQnodeMsgToWorker(pDnode, &pDnode->qmgmt.queryWorker, pMsg);
|
||||
}
|
||||
|
||||
int32_t dndInitQnode(SDnode *pDnode) {
|
||||
dInfo("dnode-qnode start to init");
|
||||
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
||||
taosInitRWLatch(&pMgmt->latch);
|
||||
|
||||
char path[PATH_MAX];
|
||||
snprintf(path, PATH_MAX, "%s/qnode.json", pDnode->dir.dnode);
|
||||
pMgmt->file = strdup(path);
|
||||
if (pMgmt->file == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (dndReadQnodeFile(pDnode) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -488,11 +347,9 @@ int32_t dndInitQnode(SDnode *pDnode) {
|
|||
|
||||
void dndCleanupQnode(SDnode *pDnode) {
|
||||
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
||||
|
||||
dInfo("dnode-qnode start to clean up");
|
||||
if (pMgmt->pQnode) dndStopQnodeWorker(pDnode);
|
||||
tfree(pMgmt->file);
|
||||
qndClose(pMgmt->pQnode);
|
||||
pMgmt->pQnode = NULL;
|
||||
dInfo("dnode-qnode is cleaned up");
|
||||
if (pMgmt->pQnode) {
|
||||
dndStopQnodeWorker(pDnode);
|
||||
qndClose(pMgmt->pQnode);
|
||||
pMgmt->pQnode = NULL;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http:www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "dndWorker.h"
|
||||
|
||||
int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type, const char *name, int32_t minNum,
|
||||
int32_t maxNum, FProcessItem fp) {
|
||||
if (pDnode == NULL || pWorker == NULL || name == NULL || minNum < 0 || maxNum <= 0 || fp == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pWorker->type = type;
|
||||
pWorker->name = name;
|
||||
pWorker->minNum = minNum;
|
||||
pWorker->maxNum = maxNum;
|
||||
pWorker->fp = fp;
|
||||
pWorker->pDnode = pDnode;
|
||||
|
||||
if (pWorker->type == DND_WORKER_SINGLE) {
|
||||
SWorkerPool *pPool = &pWorker->pool;
|
||||
pPool->min = minNum;
|
||||
pPool->max = maxNum;
|
||||
if (tWorkerInit(pPool) != 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pWorker->queue = tWorkerAllocQueue(&pPool, pDnode, fp);
|
||||
if (pWorker->queue == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void dndCleanupWorker(SDnodeWorker *pWorker) {
|
||||
if (pWorker->type == DND_WORKER_SINGLE) {
|
||||
while (!taosQueueEmpty(pWorker->queue)) {
|
||||
taosMsleep(10);
|
||||
}
|
||||
tWorkerCleanup(&pWorker->pool);
|
||||
tWorkerFreeQueue(&pWorker->pool, pWorker->queue);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen) {
|
||||
if (pWorker == NULL || pWorker->queue == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return -1;
|
||||
}
|
||||
|
||||
void *pMsg = taosAllocateQitem(contLen);
|
||||
if (pMsg == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(pMsg, pCont, contLen);
|
||||
|
||||
if (taosWriteQitem(pWorker, pMsg) != 0) {
|
||||
taosFreeItem(pMsg);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -79,6 +79,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FILE_CORRUPTED, "Data file corrupted")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_CHECKSUM_ERROR, "Checksum error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, "Invalid config message")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, "Message not processed")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_PARA, "Invalid parameters")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_ID_REMOVED, "Ref ID is removed")
|
||||
|
|
Loading…
Reference in New Issue