proc test
This commit is contained in:
parent
5382648ba6
commit
1638220667
|
@ -22,12 +22,12 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef enum { PROC_QUEUE, PROC_REQ, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType;
|
typedef enum { PROC_REQ = 1, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType;
|
||||||
|
|
||||||
typedef struct SProcObj SProcObj;
|
typedef struct SProcObj SProcObj;
|
||||||
typedef void *(*ProcMallocFp)(int32_t contLen);
|
typedef void *(*ProcMallocFp)(int32_t contLen);
|
||||||
typedef void *(*ProcFreeFp)(void *pCont);
|
typedef void *(*ProcFreeFp)(void *pCont);
|
||||||
typedef void *(*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
|
typedef void (*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
|
||||||
ProcFuncType ftype);
|
ProcFuncType ftype);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -50,6 +50,7 @@ typedef struct {
|
||||||
SProcObj *taosProcInit(const SProcCfg *pCfg);
|
SProcObj *taosProcInit(const SProcCfg *pCfg);
|
||||||
void taosProcCleanup(SProcObj *pProc);
|
void taosProcCleanup(SProcObj *pProc);
|
||||||
int32_t taosProcRun(SProcObj *pProc);
|
int32_t taosProcRun(SProcObj *pProc);
|
||||||
|
void taosProcStop(SProcObj *pProc);
|
||||||
|
|
||||||
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
|
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
|
||||||
void *handle, ProcFuncType ftype);
|
void *handle, ProcFuncType ftype);
|
||||||
|
|
|
@ -68,7 +68,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, "Client and server's t
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready")
|
TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
|
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version")
|
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_COMPRESS_ERROR, "Failed to compress msg")
|
|
||||||
|
|
||||||
//common & util
|
//common & util
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not supported")
|
TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not supported")
|
||||||
|
@ -96,6 +95,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, "Ref is not there")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_NUMBER, "Invalid version number")
|
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_NUMBER, "Invalid version number")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_STRING, "Invalid version string")
|
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_STRING, "Invalid version string")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VERSION_NOT_COMPATIBLE, "Version not compatible")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VERSION_NOT_COMPATIBLE, "Version not compatible")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_COMPRESS_ERROR, "Failed to compress msg")
|
||||||
|
|
||||||
//client
|
//client
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")
|
||||||
|
|
|
@ -156,6 +156,11 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) {
|
||||||
|
|
||||||
static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen,
|
static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen,
|
||||||
const char *pBody, int32_t rawBodyLen, int64_t handle, ProcFuncType ftype) {
|
const char *pBody, int32_t rawBodyLen, int64_t handle, ProcFuncType ftype) {
|
||||||
|
if (rawHeadLen == 0 || pHead == NULL) {
|
||||||
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
const int32_t headLen = CEIL8(rawHeadLen);
|
const int32_t headLen = CEIL8(rawHeadLen);
|
||||||
const int32_t bodyLen = CEIL8(rawBodyLen);
|
const int32_t bodyLen = CEIL8(rawBodyLen);
|
||||||
const int32_t fullLen = headLen + bodyLen + 8;
|
const int32_t fullLen = headLen + bodyLen + 8;
|
||||||
|
@ -177,13 +182,13 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char
|
||||||
|
|
||||||
const int32_t pos = pQueue->tail;
|
const int32_t pos = pQueue->tail;
|
||||||
if (pQueue->tail < pQueue->total) {
|
if (pQueue->tail < pQueue->total) {
|
||||||
*(int16_t *)(pQueue->pBuffer + pQueue->tail) = headLen;
|
*(int16_t *)(pQueue->pBuffer + pQueue->tail) = rawHeadLen;
|
||||||
*(int8_t *)(pQueue->pBuffer + pQueue->tail + 2) = (int8_t)ftype;
|
*(int8_t *)(pQueue->pBuffer + pQueue->tail + 2) = (int8_t)ftype;
|
||||||
*(int32_t *)(pQueue->pBuffer + pQueue->tail + 4) = bodyLen;
|
*(int32_t *)(pQueue->pBuffer + pQueue->tail + 4) = rawBodyLen;
|
||||||
} else {
|
} else {
|
||||||
*(int16_t *)(pQueue->pBuffer) = headLen;
|
*(int16_t *)(pQueue->pBuffer) = rawHeadLen;
|
||||||
*(int8_t *)(pQueue->pBuffer + 2) = (int8_t)ftype;
|
*(int8_t *)(pQueue->pBuffer + 2) = (int8_t)ftype;
|
||||||
*(int32_t *)(pQueue->pBuffer + 4) = bodyLen;
|
*(int32_t *)(pQueue->pBuffer + 4) = rawBodyLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pQueue->tail < pQueue->head) {
|
if (pQueue->tail < pQueue->head) {
|
||||||
|
@ -239,18 +244,20 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int16_t headLen = 0;
|
int16_t rawHeadLen = 0;
|
||||||
int8_t ftype = 0;
|
int8_t ftype = 0;
|
||||||
int32_t bodyLen = 0;
|
int32_t rawBodyLen = 0;
|
||||||
if (pQueue->head < pQueue->total) {
|
if (pQueue->head < pQueue->total) {
|
||||||
headLen = *(int16_t *)(pQueue->pBuffer + pQueue->head);
|
rawHeadLen = *(int16_t *)(pQueue->pBuffer + pQueue->head);
|
||||||
ftype = *(int8_t *)(pQueue->pBuffer + pQueue->head + 2);
|
ftype = *(int8_t *)(pQueue->pBuffer + pQueue->head + 2);
|
||||||
bodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4);
|
rawBodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4);
|
||||||
} else {
|
} else {
|
||||||
headLen = *(int16_t *)(pQueue->pBuffer);
|
rawHeadLen = *(int16_t *)(pQueue->pBuffer);
|
||||||
ftype = *(int8_t *)(pQueue->pBuffer + 2);
|
ftype = *(int8_t *)(pQueue->pBuffer + 2);
|
||||||
bodyLen = *(int32_t *)(pQueue->pBuffer + 4);
|
rawBodyLen = *(int32_t *)(pQueue->pBuffer + 4);
|
||||||
}
|
}
|
||||||
|
int16_t headLen = CEIL8(rawHeadLen);
|
||||||
|
int16_t bodyLen = CEIL8(rawBodyLen);
|
||||||
|
|
||||||
void *pHead = (*mallocHeadFp)(headLen);
|
void *pHead = (*mallocHeadFp)(headLen);
|
||||||
void *pBody = (*mallocBodyFp)(bodyLen);
|
void *pBody = (*mallocBodyFp)(bodyLen);
|
||||||
|
@ -301,12 +308,12 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
|
||||||
|
|
||||||
*ppHead = pHead;
|
*ppHead = pHead;
|
||||||
*ppBody = pBody;
|
*ppBody = pBody;
|
||||||
*pHeadLen = headLen;
|
*pHeadLen = rawHeadLen;
|
||||||
*pBodyLen = bodyLen;
|
*pBodyLen = rawBodyLen;
|
||||||
*pFuncType = (ProcFuncType)ftype;
|
*pFuncType = (ProcFuncType)ftype;
|
||||||
|
|
||||||
uTrace("proc:%s, pop msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype,
|
uTrace("proc:%s, pop msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype,
|
||||||
pQueue->items, headLen, pHead, bodyLen, pBody);
|
pQueue->items, rawHeadLen, pHead, rawBodyLen, pBody);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -383,7 +390,7 @@ static void taosProcThreadLoop(SProcObj *pProc) {
|
||||||
freeBodyFp = pProc->parentFreeBodyFp;
|
freeBodyFp = pProc->parentFreeBodyFp;
|
||||||
}
|
}
|
||||||
|
|
||||||
uDebug("proc:%s, start to get msg from queue:%p", pProc->name, pQueue);
|
uDebug("proc:%s, start to get msg from queue:%p, thread:%" PRId64, pProc->name, pQueue, pProc->thread);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype, mallocHeadFp, freeHeadFp,
|
int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype, mallocHeadFp, freeHeadFp,
|
||||||
|
@ -412,11 +419,11 @@ int32_t taosProcRun(SProcObj *pProc) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
uDebug("proc:%s, start to consume queue:%p, thread:%" PRId64, pProc->name, pProc->pChildQueue, pProc->thread);
|
uDebug("proc:%s, start to consume, thread:%" PRId64, pProc->name, pProc->thread);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void taosProcStop(SProcObj *pProc) {
|
void taosProcStop(SProcObj *pProc) {
|
||||||
if (!taosCheckPthreadValid(pProc->thread)) return;
|
if (!taosCheckPthreadValid(pProc->thread)) return;
|
||||||
|
|
||||||
uDebug("proc:%s, start to join thread:%" PRId64, pProc->name, pProc->thread);
|
uDebug("proc:%s, start to join thread:%" PRId64, pProc->name, pProc->thread);
|
||||||
|
@ -428,6 +435,7 @@ static void taosProcStop(SProcObj *pProc) {
|
||||||
}
|
}
|
||||||
tsem_post(&pQueue->sem);
|
tsem_post(&pQueue->sem);
|
||||||
taosThreadJoin(pProc->thread, NULL);
|
taosThreadJoin(pProc->thread, NULL);
|
||||||
|
pProc->thread = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosProcCleanup(SProcObj *pProc) {
|
void taosProcCleanup(SProcObj *pProc) {
|
||||||
|
@ -448,6 +456,10 @@ void taosProcCleanup(SProcObj *pProc) {
|
||||||
|
|
||||||
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
|
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
|
||||||
void *handle, ProcFuncType ftype) {
|
void *handle, ProcFuncType ftype) {
|
||||||
|
if (ftype != PROC_REQ) {
|
||||||
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
return taosProcQueuePush(pProc, pProc->pChildQueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ftype);
|
return taosProcQueuePush(pProc, pProc->pChildQueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ftype);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -472,8 +484,7 @@ void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, c
|
||||||
ProcFuncType ftype) {
|
ProcFuncType ftype) {
|
||||||
int32_t retry = 0;
|
int32_t retry = 0;
|
||||||
while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, ftype) != 0) {
|
while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, ftype) != 0) {
|
||||||
uInfo("proc:%s, failed to put msg to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(),
|
uInfo("proc:%s, failed to put to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), retry);
|
||||||
retry);
|
|
||||||
retry++;
|
retry++;
|
||||||
taosMsleep(retry);
|
taosMsleep(retry);
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,23 +10,31 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
|
#include "sut.h"
|
||||||
#include "tprocess.h"
|
#include "tprocess.h"
|
||||||
#include "tqueue.h"
|
#include "tqueue.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "sut.h"
|
|
||||||
|
|
||||||
class UtilTesProc : public ::testing::Test {
|
class UtilTesProc : public ::testing::Test {
|
||||||
public:
|
public:
|
||||||
void SetUp() override {
|
void SetUp() override {
|
||||||
test.InitLog("/tmp/td");
|
test.InitLog("/tmp/td");
|
||||||
uDebugFlag = 207;
|
// uDebugFlag = 207;
|
||||||
shm.id = -1;
|
shm.id = -1;
|
||||||
|
for (int32_t i = 0; i < 4000; ++i) {
|
||||||
|
body[i] = i % 26 + 'a';
|
||||||
|
}
|
||||||
|
head.pCont = body;
|
||||||
|
head.code = 1;
|
||||||
|
head.msgType = 2;
|
||||||
|
head.noResp = 3;
|
||||||
|
head.persistHandle = 4;
|
||||||
}
|
}
|
||||||
void TearDown() override {
|
void TearDown() override { taosDropShm(&shm); }
|
||||||
taosDropShm(&shm);
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
static SRpcMsg head;
|
||||||
|
static char body[4000];
|
||||||
static Testbase test;
|
static Testbase test;
|
||||||
static SShm shm;
|
static SShm shm;
|
||||||
static void SetUpTestSuite() {}
|
static void SetUpTestSuite() {}
|
||||||
|
@ -35,8 +43,10 @@ class UtilTesProc : public ::testing::Test {
|
||||||
|
|
||||||
Testbase UtilTesProc::test;
|
Testbase UtilTesProc::test;
|
||||||
SShm UtilTesProc::shm;
|
SShm UtilTesProc::shm;
|
||||||
|
char UtilTesProc::body[4000];
|
||||||
|
SRpcMsg UtilTesProc::head;
|
||||||
|
|
||||||
TEST_F(UtilTesProc, 01_Create_Drop_Proc) {
|
TEST_F(UtilTesProc, 00_Init_Cleanup) {
|
||||||
ASSERT_EQ(taosCreateShm(&shm, 1234, 1024 * 1024 * 2), 0);
|
ASSERT_EQ(taosCreateShm(&shm, 1234, 1024 * 1024 * 2), 0);
|
||||||
|
|
||||||
shm.size = 1023;
|
shm.size = 1023;
|
||||||
|
@ -61,5 +71,116 @@ TEST_F(UtilTesProc, 01_Create_Drop_Proc) {
|
||||||
proc = taosProcInit(&cfg);
|
proc = taosProcInit(&cfg);
|
||||||
ASSERT_NE(proc, nullptr);
|
ASSERT_NE(proc, nullptr);
|
||||||
|
|
||||||
|
ASSERT_EQ(taosProcRun(proc), 0);
|
||||||
|
taosProcCleanup(proc);
|
||||||
|
taosDropShm(&shm);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConsumeChild1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) {
|
||||||
|
SRpcMsg msg;
|
||||||
|
memcpy(&msg, pHead, headLen);
|
||||||
|
char body[2000] = {0};
|
||||||
|
memcpy(body, pBody, bodyLen);
|
||||||
|
|
||||||
|
uDebug("====> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d head:%d:%d:%d:%d body:%s <====", (int64_t)parent,
|
||||||
|
ftype, headLen, bodyLen, msg.code, msg.msgType, msg.noResp, msg.persistHandle, body);
|
||||||
|
rpcFreeCont(pBody);
|
||||||
|
taosFreeQitem(pHead);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(UtilTesProc, 01_Push_Pop_Child) {
|
||||||
|
shm.size = 3000;
|
||||||
|
ASSERT_EQ(taosCreateShm(&shm, 1235, shm.size), 0);
|
||||||
|
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)ConsumeChild1,
|
||||||
|
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
|
||||||
|
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
|
||||||
|
.childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
|
||||||
|
.childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
|
||||||
|
.parentConsumeFp = (ProcConsumeFp)NULL,
|
||||||
|
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
|
||||||
|
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
|
||||||
|
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
|
||||||
|
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
|
||||||
|
.shm = shm,
|
||||||
|
.parent = (void *)((int64_t)1235),
|
||||||
|
.name = "child_queue"};
|
||||||
|
SProcObj *cproc = taosProcInit(&cfg);
|
||||||
|
ASSERT_NE(cproc, nullptr);
|
||||||
|
|
||||||
|
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_RSP), 0);
|
||||||
|
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_REGIST), 0);
|
||||||
|
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_RELEASE), 0);
|
||||||
|
ASSERT_NE(taosProcPutToChildQ(cproc, NULL, 12, body, 0, 0, PROC_REQ), 0);
|
||||||
|
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_REQ), 0);
|
||||||
|
ASSERT_NE(taosProcPutToChildQ(cproc, &head, shm.size, body, 0, 0, PROC_REQ), 0);
|
||||||
|
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, shm.size, 0, PROC_REQ), 0);
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < 1000; j++) {
|
||||||
|
int32_t i = 0;
|
||||||
|
for (i = 0; i < 20; ++i) {
|
||||||
|
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, i, 0, PROC_REQ), 0);
|
||||||
|
}
|
||||||
|
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, i, 0, PROC_REQ), 0);
|
||||||
|
|
||||||
|
cfg.isChild = true;
|
||||||
|
cfg.name = "child_queue";
|
||||||
|
SProcObj *pproc = taosProcInit(&cfg);
|
||||||
|
ASSERT_NE(pproc, nullptr);
|
||||||
|
taosProcRun(pproc);
|
||||||
|
taosProcCleanup(pproc);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosProcCleanup(cproc);
|
||||||
|
taosDropShm(&shm);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConsumeParent1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) {
|
||||||
|
SRpcMsg msg;
|
||||||
|
memcpy(&msg, pHead, headLen);
|
||||||
|
char body[2000] = {0};
|
||||||
|
memcpy(body, pBody, bodyLen);
|
||||||
|
|
||||||
|
uDebug("----> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d head:%d:%d:%d:%d body:%s <----", (int64_t)parent,
|
||||||
|
ftype, headLen, bodyLen, msg.code, msg.msgType, msg.noResp, msg.persistHandle, body);
|
||||||
|
rpcFreeCont(pBody);
|
||||||
|
taosMemoryFree(pHead);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(UtilTesProc, 02_Push_Pop_Parent) {
|
||||||
|
shm.size = 3000;
|
||||||
|
ASSERT_EQ(taosCreateShm(&shm, 1236, shm.size), 0);
|
||||||
|
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)NULL,
|
||||||
|
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
|
||||||
|
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
|
||||||
|
.childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
|
||||||
|
.childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
|
||||||
|
.parentConsumeFp = (ProcConsumeFp)ConsumeParent1,
|
||||||
|
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
|
||||||
|
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
|
||||||
|
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
|
||||||
|
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
|
||||||
|
.shm = shm,
|
||||||
|
.parent = (void *)((int64_t)1235),
|
||||||
|
.name = "child_queue"};
|
||||||
|
SProcObj *cproc = taosProcInit(&cfg);
|
||||||
|
ASSERT_NE(cproc, nullptr);
|
||||||
|
|
||||||
|
cfg.name = "parent_queue";
|
||||||
|
cfg.isChild = true;
|
||||||
|
SProcObj *pproc = taosProcInit(&cfg);
|
||||||
|
ASSERT_NE(pproc, nullptr);
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < 1000; j++) {
|
||||||
|
int32_t i = 0;
|
||||||
|
for (i = 0; i < 20; ++i) {
|
||||||
|
taosProcPutToParentQ(pproc, &head, sizeof(SRpcMsg), body, i, PROC_REQ);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosProcRun(cproc);
|
||||||
|
taosProcStop(cproc);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosProcCleanup(pproc);
|
||||||
|
taosProcCleanup(cproc);
|
||||||
taosDropShm(&shm);
|
taosDropShm(&shm);
|
||||||
}
|
}
|
Loading…
Reference in New Issue