rename node type
This commit is contained in:
parent
c53489204c
commit
79963b9c34
|
@ -118,7 +118,7 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) {
|
static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, EDndType ntype, SNodeMsg *pMsg) {
|
||||||
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
|
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
|
||||||
if (pWrapper != NULL) {
|
if (pWrapper != NULL) {
|
||||||
dndReleaseWrapper(pWrapper);
|
dndReleaseWrapper(pWrapper);
|
||||||
|
@ -146,7 +146,7 @@ static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dmProcessDropNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) {
|
static int32_t dmProcessDropNodeMsg(SDnode *pDnode, EDndType ntype, SNodeMsg *pMsg) {
|
||||||
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
|
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
|
||||||
if (pWrapper == NULL) {
|
if (pWrapper == NULL) {
|
||||||
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
|
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
|
||||||
|
|
|
@ -26,7 +26,7 @@ static struct {
|
||||||
char apolloUrl[PATH_MAX];
|
char apolloUrl[PATH_MAX];
|
||||||
SArray *pArgs; // SConfigPair
|
SArray *pArgs; // SConfigPair
|
||||||
SDnode *pDnode;
|
SDnode *pDnode;
|
||||||
ENodeType ntype;
|
EDndType ntype;
|
||||||
} global = {0};
|
} global = {0};
|
||||||
|
|
||||||
static void dndStopDnode(int signum, void *info, void *ctx) {
|
static void dndStopDnode(int signum, void *info, void *ctx) {
|
||||||
|
|
|
@ -49,7 +49,7 @@ extern "C" {
|
||||||
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
|
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
|
||||||
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
|
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
|
||||||
|
|
||||||
typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } ENodeType;
|
typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } EDndType;
|
||||||
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus;
|
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus;
|
||||||
typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStatus;
|
typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStatus;
|
||||||
typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType;
|
typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType;
|
||||||
|
@ -92,7 +92,7 @@ typedef struct SMgmtWrapper {
|
||||||
char *path;
|
char *path;
|
||||||
int32_t refCount;
|
int32_t refCount;
|
||||||
SRWLatch latch;
|
SRWLatch latch;
|
||||||
ENodeType ntype;
|
EDndType ntype;
|
||||||
bool deployed;
|
bool deployed;
|
||||||
bool required;
|
bool required;
|
||||||
EProcType procType;
|
EProcType procType;
|
||||||
|
@ -126,7 +126,7 @@ typedef struct SDnode {
|
||||||
int32_t numOfDisks;
|
int32_t numOfDisks;
|
||||||
uint16_t serverPort;
|
uint16_t serverPort;
|
||||||
bool dropped;
|
bool dropped;
|
||||||
ENodeType ntype;
|
EDndType ntype;
|
||||||
EDndStatus status;
|
EDndStatus status;
|
||||||
EDndEvent event;
|
EDndEvent event;
|
||||||
SStartupReq startup;
|
SStartupReq startup;
|
||||||
|
@ -137,8 +137,8 @@ typedef struct SDnode {
|
||||||
|
|
||||||
// dndEnv.c
|
// dndEnv.c
|
||||||
const char *dndStatStr(EDndStatus stat);
|
const char *dndStatStr(EDndStatus stat);
|
||||||
const char *dndNodeLogStr(ENodeType ntype);
|
const char *dndNodeLogStr(EDndType ntype);
|
||||||
const char *dndNodeProcStr(ENodeType ntype);
|
const char *dndNodeProcStr(EDndType ntype);
|
||||||
const char *dndEventStr(EDndEvent ev);
|
const char *dndEventStr(EDndEvent ev);
|
||||||
|
|
||||||
// dndExec.c
|
// dndExec.c
|
||||||
|
@ -156,7 +156,7 @@ int32_t dndWriteShmFile(SDnode *pDnode);
|
||||||
EDndStatus dndGetStatus(SDnode *pDnode);
|
EDndStatus dndGetStatus(SDnode *pDnode);
|
||||||
void dndSetStatus(SDnode *pDnode, EDndStatus stat);
|
void dndSetStatus(SDnode *pDnode, EDndStatus stat);
|
||||||
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId);
|
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId);
|
||||||
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
|
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndType nType);
|
||||||
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper);
|
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper);
|
||||||
void dndReleaseWrapper(SMgmtWrapper *pWrapper);
|
void dndReleaseWrapper(SMgmtWrapper *pWrapper);
|
||||||
void dndHandleEvent(SDnode *pDnode, EDndEvent event);
|
void dndHandleEvent(SDnode *pDnode, EDndEvent event);
|
||||||
|
|
|
@ -71,7 +71,7 @@ const char *dndStatStr(EDndStatus status) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const char *dndNodeLogStr(ENodeType ntype) {
|
const char *dndNodeLogStr(EDndType ntype) {
|
||||||
switch (ntype) {
|
switch (ntype) {
|
||||||
case VNODES:
|
case VNODES:
|
||||||
return "vnode";
|
return "vnode";
|
||||||
|
@ -88,7 +88,7 @@ const char *dndNodeLogStr(ENodeType ntype) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const char *dndNodeProcStr(ENodeType ntype) {
|
const char *dndNodeProcStr(EDndType ntype) {
|
||||||
switch (ntype) {
|
switch (ntype) {
|
||||||
case VNODES:
|
case VNODES:
|
||||||
return "taosv";
|
return "taosv";
|
||||||
|
|
|
@ -66,7 +66,7 @@ void dndCloseNode(SMgmtWrapper *pWrapper) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) {
|
static int32_t dndNewProc(SMgmtWrapper *pWrapper, EDndType n) {
|
||||||
char tstr[8] = {0};
|
char tstr[8] = {0};
|
||||||
char *args[6] = {0};
|
char *args[6] = {0};
|
||||||
snprintf(tstr, sizeof(tstr), "%d", n);
|
snprintf(tstr, sizeof(tstr), "%d", n);
|
||||||
|
@ -97,7 +97,7 @@ static void dndProcessProcHandle(void *handle) {
|
||||||
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
|
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
|
||||||
dInfo("dnode run in single process");
|
dInfo("dnode run in single process");
|
||||||
|
|
||||||
for (ENodeType n = DNODE; n < NODE_MAX; ++n) {
|
for (EDndType n = DNODE; n < NODE_MAX; ++n) {
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||||
pWrapper->required = dndRequireNode(pWrapper);
|
pWrapper->required = dndRequireNode(pWrapper);
|
||||||
if (!pWrapper->required) continue;
|
if (!pWrapper->required) continue;
|
||||||
|
@ -110,7 +110,7 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
|
||||||
|
|
||||||
dndSetStatus(pDnode, DND_STAT_RUNNING);
|
dndSetStatus(pDnode, DND_STAT_RUNNING);
|
||||||
|
|
||||||
for (ENodeType n = 0; n < NODE_MAX; ++n) {
|
for (EDndType n = 0; n < NODE_MAX; ++n) {
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||||
if (!pWrapper->required) continue;
|
if (!pWrapper->required) continue;
|
||||||
if (pWrapper->fp.startFp == NULL) continue;
|
if (pWrapper->fp.startFp == NULL) continue;
|
||||||
|
@ -142,7 +142,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
|
for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||||
pWrapper->required = dndRequireNode(pWrapper);
|
pWrapper->required = dndRequireNode(pWrapper);
|
||||||
if (!pWrapper->required) continue;
|
if (!pWrapper->required) continue;
|
||||||
|
@ -170,7 +170,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
|
for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||||
if (!pWrapper->required) continue;
|
if (!pWrapper->required) continue;
|
||||||
|
|
||||||
|
@ -203,7 +203,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
|
||||||
dInfo("dnode is about to stop");
|
dInfo("dnode is about to stop");
|
||||||
dndSetStatus(pDnode, DND_STAT_STOPPED);
|
dndSetStatus(pDnode, DND_STAT_STOPPED);
|
||||||
|
|
||||||
for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
|
for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||||
if (!pWrapper->required) continue;
|
if (!pWrapper->required) continue;
|
||||||
if (pDnode->ntype == NODE_MAX) continue;
|
if (pDnode->ntype == NODE_MAX) continue;
|
||||||
|
@ -218,13 +218,13 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
|
for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||||
if (!pWrapper->required) continue;
|
if (!pWrapper->required) continue;
|
||||||
if (pDnode->ntype == NODE_MAX) continue;
|
if (pDnode->ntype == NODE_MAX) continue;
|
||||||
|
|
||||||
if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
|
if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
|
||||||
dInfo("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
|
dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
|
||||||
taosProcCloseHandles(pWrapper->pProc, dndProcessProcHandle);
|
taosProcCloseHandles(pWrapper->pProc, dndProcessProcHandle);
|
||||||
dndNewProc(pWrapper, n);
|
dndNewProc(pWrapper, n);
|
||||||
}
|
}
|
||||||
|
|
|
@ -164,7 +164,7 @@ int32_t dndReadShmFile(SDnode *pDnode) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) {
|
for (EDndType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) {
|
||||||
snprintf(itemName, sizeof(itemName), "%s_shmid", dndNodeProcStr(ntype));
|
snprintf(itemName, sizeof(itemName), "%s_shmid", dndNodeProcStr(ntype));
|
||||||
cJSON *shmid = cJSON_GetObjectItem(root, itemName);
|
cJSON *shmid = cJSON_GetObjectItem(root, itemName);
|
||||||
if (shmid && shmid->type == cJSON_Number) {
|
if (shmid && shmid->type == cJSON_Number) {
|
||||||
|
@ -180,7 +180,7 @@ int32_t dndReadShmFile(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) {
|
if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) {
|
||||||
for (ENodeType ntype = DNODE; ntype < NODE_MAX; ++ntype) {
|
for (EDndType ntype = DNODE; ntype < NODE_MAX; ++ntype) {
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
||||||
if (pWrapper->shm.id >= 0) {
|
if (pWrapper->shm.id >= 0) {
|
||||||
dDebug("shmid:%d, is closed, size:%d", pWrapper->shm.id, pWrapper->shm.size);
|
dDebug("shmid:%d, is closed, size:%d", pWrapper->shm.id, pWrapper->shm.size);
|
||||||
|
@ -226,7 +226,7 @@ int32_t dndWriteShmFile(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
len += snprintf(content + len, MAXLEN - len, "{\n");
|
len += snprintf(content + len, MAXLEN - len, "{\n");
|
||||||
for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) {
|
for (EDndType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) {
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
||||||
len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\":%d,\n", dndNodeProcStr(ntype), pWrapper->shm.id);
|
len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\":%d,\n", dndNodeProcStr(ntype), pWrapper->shm.id);
|
||||||
if (ntype == NODE_MAX - 1) {
|
if (ntype == NODE_MAX - 1) {
|
||||||
|
|
|
@ -46,7 +46,7 @@ static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndClearVars(SDnode *pDnode) {
|
static void dndClearVars(SDnode *pDnode) {
|
||||||
for (ENodeType n = 0; n < NODE_MAX; ++n) {
|
for (EDndType n = 0; n < NODE_MAX; ++n) {
|
||||||
SMgmtWrapper *pMgmt = &pDnode->wrappers[n];
|
SMgmtWrapper *pMgmt = &pDnode->wrappers[n];
|
||||||
taosMemoryFreeClear(pMgmt->path);
|
taosMemoryFreeClear(pMgmt->path);
|
||||||
}
|
}
|
||||||
|
@ -89,7 +89,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
|
||||||
smSetMgmtFp(&pDnode->wrappers[SNODE]);
|
smSetMgmtFp(&pDnode->wrappers[SNODE]);
|
||||||
bmSetMgmtFp(&pDnode->wrappers[BNODE]);
|
bmSetMgmtFp(&pDnode->wrappers[BNODE]);
|
||||||
|
|
||||||
for (ENodeType n = 0; n < NODE_MAX; ++n) {
|
for (EDndType n = 0; n < NODE_MAX; ++n) {
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||||
snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name);
|
snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name);
|
||||||
pWrapper->path = strdup(path);
|
pWrapper->path = strdup(path);
|
||||||
|
@ -134,7 +134,7 @@ _OVER:
|
||||||
void dndClose(SDnode *pDnode) {
|
void dndClose(SDnode *pDnode) {
|
||||||
if (pDnode == NULL) return;
|
if (pDnode == NULL) return;
|
||||||
|
|
||||||
for (ENodeType n = 0; n < NODE_MAX; ++n) {
|
for (EDndType n = 0; n < NODE_MAX; ++n) {
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||||
dndCloseNode(pWrapper);
|
dndCloseNode(pWrapper);
|
||||||
}
|
}
|
||||||
|
@ -149,7 +149,7 @@ void dndHandleEvent(SDnode *pDnode, EDndEvent event) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType ntype) {
|
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndType ntype) {
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
||||||
SMgmtWrapper *pRetWrapper = pWrapper;
|
SMgmtWrapper *pRetWrapper = pWrapper;
|
||||||
|
|
||||||
|
|
|
@ -307,7 +307,7 @@ void dndCleanupTrans(SDnode *pDnode) {
|
||||||
int32_t dndInitMsgHandle(SDnode *pDnode) {
|
int32_t dndInitMsgHandle(SDnode *pDnode) {
|
||||||
STransMgmt *pMgmt = &pDnode->trans;
|
STransMgmt *pMgmt = &pDnode->trans;
|
||||||
|
|
||||||
for (ENodeType n = 0; n < NODE_MAX; ++n) {
|
for (EDndType n = 0; n < NODE_MAX; ++n) {
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||||
|
|
||||||
for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
|
for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
|
||||||
|
|
|
@ -478,6 +478,7 @@ void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) {
|
||||||
(*HandleFp)(handle);
|
(*HandleFp)(handle);
|
||||||
h = taosHashIterate(pProc->hash, h);
|
h = taosHashIterate(pProc->hash, h);
|
||||||
}
|
}
|
||||||
|
taosHashClear(pProc->hash);
|
||||||
taosThreadMutexUnlock(&pProc->pChildQueue->mutex);
|
taosThreadMutexUnlock(&pProc->pChildQueue->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -485,7 +486,7 @@ void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, c
|
||||||
ProcFuncType ftype) {
|
ProcFuncType ftype) {
|
||||||
int32_t retry = 0;
|
int32_t retry = 0;
|
||||||
while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, ftype) != 0) {
|
while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, ftype) != 0) {
|
||||||
uInfo("proc:%s, failed to put to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), retry);
|
uWarn("proc:%s, failed to put to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), retry);
|
||||||
retry++;
|
retry++;
|
||||||
taosMsleep(retry);
|
taosMsleep(retry);
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,10 +10,20 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
|
#include "tlog.h"
|
||||||
#include "tprocess.h"
|
#include "tprocess.h"
|
||||||
#include "tqueue.h"
|
#include "tqueue.h"
|
||||||
#include "trpc.h"
|
|
||||||
#include "tlog.h"
|
typedef struct STestMsg {
|
||||||
|
uint16_t msgType;
|
||||||
|
void *pCont;
|
||||||
|
int contLen;
|
||||||
|
int32_t code;
|
||||||
|
void *handle; // rpc handle returned to app
|
||||||
|
void *ahandle; // app handle set by client
|
||||||
|
int noResp; // has response or not(default 0, 0: resp, 1: no resp);
|
||||||
|
int persistHandle; // persist handle or not
|
||||||
|
} STestMsg;
|
||||||
|
|
||||||
class UtilTesProc : public ::testing::Test {
|
class UtilTesProc : public ::testing::Test {
|
||||||
public:
|
public:
|
||||||
|
@ -38,7 +48,7 @@ class UtilTesProc : public ::testing::Test {
|
||||||
void TearDown() override { taosDropShm(&shm); }
|
void TearDown() override { taosDropShm(&shm); }
|
||||||
|
|
||||||
public:
|
public:
|
||||||
static SRpcMsg head;
|
static STestMsg head;
|
||||||
static char body[4000];
|
static char body[4000];
|
||||||
static SShm shm;
|
static SShm shm;
|
||||||
static void SetUpTestSuite() {}
|
static void SetUpTestSuite() {}
|
||||||
|
@ -47,7 +57,7 @@ class UtilTesProc : public ::testing::Test {
|
||||||
|
|
||||||
SShm UtilTesProc::shm;
|
SShm UtilTesProc::shm;
|
||||||
char UtilTesProc::body[4000];
|
char UtilTesProc::body[4000];
|
||||||
SRpcMsg UtilTesProc::head;
|
STestMsg UtilTesProc::head;
|
||||||
|
|
||||||
TEST_F(UtilTesProc, 00_Init_Cleanup) {
|
TEST_F(UtilTesProc, 00_Init_Cleanup) {
|
||||||
ASSERT_EQ(taosCreateShm(&shm, 1234, 1024 * 1024 * 2), 0);
|
ASSERT_EQ(taosCreateShm(&shm, 1234, 1024 * 1024 * 2), 0);
|
||||||
|
@ -56,13 +66,13 @@ TEST_F(UtilTesProc, 00_Init_Cleanup) {
|
||||||
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)NULL,
|
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)NULL,
|
||||||
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
|
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
|
||||||
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
|
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
|
||||||
.childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
|
.childMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
|
||||||
.childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
|
.childFreeBodyFp = (ProcFreeFp)taosMemoryMalloc,
|
||||||
.parentConsumeFp = (ProcConsumeFp)NULL,
|
.parentConsumeFp = (ProcConsumeFp)NULL,
|
||||||
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
|
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
|
||||||
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
|
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
|
||||||
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
|
.parentMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
|
||||||
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
|
.parentFreeBodyFp = (ProcFreeFp)taosMemoryMalloc,
|
||||||
.shm = shm,
|
.shm = shm,
|
||||||
.parent = &shm,
|
.parent = &shm,
|
||||||
.name = "1234"};
|
.name = "1234"};
|
||||||
|
@ -80,14 +90,14 @@ TEST_F(UtilTesProc, 00_Init_Cleanup) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void ConsumeChild1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) {
|
void ConsumeChild1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) {
|
||||||
SRpcMsg msg;
|
STestMsg msg;
|
||||||
memcpy(&msg, pHead, headLen);
|
memcpy(&msg, pHead, headLen);
|
||||||
char body[2000] = {0};
|
char body[2000] = {0};
|
||||||
memcpy(body, pBody, bodyLen);
|
memcpy(body, pBody, bodyLen);
|
||||||
|
|
||||||
uDebug("====> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d head:%d:%d:%d:%d body:%s <====", (int64_t)parent,
|
uDebug("====> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d head:%d:%d:%d:%d body:%s <====", (int64_t)parent,
|
||||||
ftype, headLen, bodyLen, msg.code, msg.msgType, msg.noResp, msg.persistHandle, body);
|
ftype, headLen, bodyLen, msg.code, msg.msgType, msg.noResp, msg.persistHandle, body);
|
||||||
rpcFreeCont(pBody);
|
taosMemoryFree(pBody);
|
||||||
taosFreeQitem(pHead);
|
taosFreeQitem(pHead);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,13 +107,13 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) {
|
||||||
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)ConsumeChild1,
|
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)ConsumeChild1,
|
||||||
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
|
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
|
||||||
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
|
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
|
||||||
.childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
|
.childMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
|
||||||
.childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
|
.childFreeBodyFp = (ProcFreeFp)taosMemoryFree,
|
||||||
.parentConsumeFp = (ProcConsumeFp)NULL,
|
.parentConsumeFp = (ProcConsumeFp)NULL,
|
||||||
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
|
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
|
||||||
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
|
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
|
||||||
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
|
.parentMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
|
||||||
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
|
.parentFreeBodyFp = (ProcFreeFp)taosMemoryFree,
|
||||||
.shm = shm,
|
.shm = shm,
|
||||||
.parent = (void *)((int64_t)1235),
|
.parent = (void *)((int64_t)1235),
|
||||||
.name = "1235_c"};
|
.name = "1235_c"};
|
||||||
|
@ -116,14 +126,14 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) {
|
||||||
ASSERT_NE(taosProcPutToChildQ(cproc, NULL, 12, body, 0, 0, PROC_REQ), 0);
|
ASSERT_NE(taosProcPutToChildQ(cproc, NULL, 12, body, 0, 0, PROC_REQ), 0);
|
||||||
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_REQ), 0);
|
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_REQ), 0);
|
||||||
ASSERT_NE(taosProcPutToChildQ(cproc, &head, shm.size, body, 0, 0, PROC_REQ), 0);
|
ASSERT_NE(taosProcPutToChildQ(cproc, &head, shm.size, body, 0, 0, PROC_REQ), 0);
|
||||||
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, shm.size, 0, PROC_REQ), 0);
|
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, shm.size, 0, PROC_REQ), 0);
|
||||||
|
|
||||||
for (int32_t j = 0; j < 1000; j++) {
|
for (int32_t j = 0; j < 1000; j++) {
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
for (i = 0; i < 20; ++i) {
|
for (i = 0; i < 20; ++i) {
|
||||||
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, i, 0, PROC_REQ), 0);
|
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_REQ), 0);
|
||||||
}
|
}
|
||||||
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, i, 0, PROC_REQ), 0);
|
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_REQ), 0);
|
||||||
|
|
||||||
cfg.isChild = true;
|
cfg.isChild = true;
|
||||||
cfg.name = "1235_p";
|
cfg.name = "1235_p";
|
||||||
|
@ -138,14 +148,14 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void ConsumeParent1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) {
|
void ConsumeParent1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) {
|
||||||
SRpcMsg msg;
|
STestMsg msg;
|
||||||
memcpy(&msg, pHead, headLen);
|
memcpy(&msg, pHead, headLen);
|
||||||
char body[2000] = {0};
|
char body[2000] = {0};
|
||||||
memcpy(body, pBody, bodyLen);
|
memcpy(body, pBody, bodyLen);
|
||||||
|
|
||||||
uDebug("----> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d head:%d:%d:%d:%d body:%s <----", (int64_t)parent,
|
uDebug("----> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d head:%d:%d:%d:%d body:%s <----", (int64_t)parent,
|
||||||
ftype, headLen, bodyLen, msg.code, msg.msgType, msg.noResp, msg.persistHandle, body);
|
ftype, headLen, bodyLen, msg.code, msg.msgType, msg.noResp, msg.persistHandle, body);
|
||||||
rpcFreeCont(pBody);
|
taosMemoryFree(pBody);
|
||||||
taosMemoryFree(pHead);
|
taosMemoryFree(pHead);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,13 +165,13 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) {
|
||||||
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)NULL,
|
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)NULL,
|
||||||
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
|
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
|
||||||
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
|
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
|
||||||
.childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
|
.childMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
|
||||||
.childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
|
.childFreeBodyFp = (ProcFreeFp)taosMemoryFree,
|
||||||
.parentConsumeFp = (ProcConsumeFp)ConsumeParent1,
|
.parentConsumeFp = (ProcConsumeFp)ConsumeParent1,
|
||||||
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
|
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
|
||||||
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
|
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
|
||||||
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
|
.parentMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
|
||||||
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
|
.parentFreeBodyFp = (ProcFreeFp)taosMemoryFree,
|
||||||
.shm = shm,
|
.shm = shm,
|
||||||
.parent = (void *)((int64_t)1236),
|
.parent = (void *)((int64_t)1236),
|
||||||
.name = "1236_c"};
|
.name = "1236_c"};
|
||||||
|
@ -176,7 +186,7 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) {
|
||||||
for (int32_t j = 0; j < 1000; j++) {
|
for (int32_t j = 0; j < 1000; j++) {
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
for (i = 0; i < 20; ++i) {
|
for (i = 0; i < 20; ++i) {
|
||||||
taosProcPutToParentQ(pproc, &head, sizeof(SRpcMsg), body, i, PROC_REQ);
|
taosProcPutToParentQ(pproc, &head, sizeof(STestMsg), body, i, PROC_REQ);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosProcRun(cproc);
|
taosProcRun(cproc);
|
||||||
|
@ -189,14 +199,14 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void ConsumeChild3(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) {
|
void ConsumeChild3(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) {
|
||||||
SRpcMsg msg;
|
STestMsg msg;
|
||||||
memcpy(&msg, pHead, headLen);
|
memcpy(&msg, pHead, headLen);
|
||||||
char body[2000] = {0};
|
char body[2000] = {0};
|
||||||
memcpy(body, pBody, bodyLen);
|
memcpy(body, pBody, bodyLen);
|
||||||
|
|
||||||
uDebug("====> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d handle:%" PRId64 " body:%s <====", (int64_t)parent,
|
uDebug("====> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d handle:%" PRId64 " body:%s <====", (int64_t)parent,
|
||||||
ftype, headLen, bodyLen, (int64_t)msg.handle, body);
|
ftype, headLen, bodyLen, (int64_t)msg.handle, body);
|
||||||
rpcFreeCont(pBody);
|
taosMemoryFree(pBody);
|
||||||
taosFreeQitem(pHead);
|
taosFreeQitem(pHead);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -209,13 +219,13 @@ TEST_F(UtilTesProc, 03_Handle) {
|
||||||
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)ConsumeChild3,
|
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)ConsumeChild3,
|
||||||
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
|
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
|
||||||
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
|
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
|
||||||
.childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
|
.childMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
|
||||||
.childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
|
.childFreeBodyFp = (ProcFreeFp)taosMemoryFree,
|
||||||
.parentConsumeFp = (ProcConsumeFp)NULL,
|
.parentConsumeFp = (ProcConsumeFp)NULL,
|
||||||
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
|
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
|
||||||
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
|
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
|
||||||
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
|
.parentMallocBodyFp = (ProcMallocFp)taosMemoryMalloc,
|
||||||
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
|
.parentFreeBodyFp = (ProcFreeFp)taosMemoryFree,
|
||||||
.shm = shm,
|
.shm = shm,
|
||||||
.parent = (void *)((int64_t)1235),
|
.parent = (void *)((int64_t)1235),
|
||||||
.name = "1237_p"};
|
.name = "1237_p"};
|
||||||
|
@ -226,7 +236,7 @@ TEST_F(UtilTesProc, 03_Handle) {
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
for (i = 0; i < 20; ++i) {
|
for (i = 0; i < 20; ++i) {
|
||||||
head.handle = (void *)((int64_t)i);
|
head.handle = (void *)((int64_t)i);
|
||||||
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, i, (void *)((int64_t)i), PROC_REQ), 0);
|
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), PROC_REQ), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg.isChild = true;
|
cfg.isChild = true;
|
||||||
|
|
Loading…
Reference in New Issue