From 16382206671b0378140b603a2f47a662a817251e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 6 Apr 2022 19:23:38 +0800 Subject: [PATCH] proc test --- include/util/tprocess.h | 5 +- source/util/src/terror.c | 2 +- source/util/src/tprocess.c | 47 +++++++----- source/util/test/procTest.cpp | 135 ++++++++++++++++++++++++++++++++-- 4 files changed, 161 insertions(+), 28 deletions(-) diff --git a/include/util/tprocess.h b/include/util/tprocess.h index 80c855b78c..c5f33140dd 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -22,12 +22,12 @@ extern "C" { #endif -typedef enum { PROC_QUEUE, PROC_REQ, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType; +typedef enum { PROC_REQ = 1, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType; typedef struct SProcObj SProcObj; typedef void *(*ProcMallocFp)(int32_t contLen); typedef void *(*ProcFreeFp)(void *pCont); -typedef void *(*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, +typedef void (*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype); typedef struct { @@ -50,6 +50,7 @@ typedef struct { SProcObj *taosProcInit(const SProcCfg *pCfg); void taosProcCleanup(SProcObj *pProc); int32_t taosProcRun(SProcObj *pProc); +void taosProcStop(SProcObj *pProc); int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, void *handle, ProcFuncType ftype); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 90fa624a8d..a89c3c8eb4 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -68,7 +68,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, "Client and server's t TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version") -TAOS_DEFINE_ERROR(TSDB_CODE_COMPRESS_ERROR, "Failed to compress msg") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not supported") @@ -96,6 +95,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, "Ref is not there") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_NUMBER, "Invalid version number") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_STRING, "Invalid version string") TAOS_DEFINE_ERROR(TSDB_CODE_VERSION_NOT_COMPATIBLE, "Version not compatible") +TAOS_DEFINE_ERROR(TSDB_CODE_COMPRESS_ERROR, "Failed to compress msg") //client TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation") diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 51eb918c54..1e71e7a8a9 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -156,6 +156,11 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) { static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody, int32_t rawBodyLen, int64_t handle, ProcFuncType ftype) { + if (rawHeadLen == 0 || pHead == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return -1; + } + const int32_t headLen = CEIL8(rawHeadLen); const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t fullLen = headLen + bodyLen + 8; @@ -177,13 +182,13 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char const int32_t pos = pQueue->tail; if (pQueue->tail < pQueue->total) { - *(int16_t *)(pQueue->pBuffer + pQueue->tail) = headLen; + *(int16_t *)(pQueue->pBuffer + pQueue->tail) = rawHeadLen; *(int8_t *)(pQueue->pBuffer + pQueue->tail + 2) = (int8_t)ftype; - *(int32_t *)(pQueue->pBuffer + pQueue->tail + 4) = bodyLen; + *(int32_t *)(pQueue->pBuffer + pQueue->tail + 4) = rawBodyLen; } else { - *(int16_t *)(pQueue->pBuffer) = headLen; + *(int16_t *)(pQueue->pBuffer) = rawHeadLen; *(int8_t *)(pQueue->pBuffer + 2) = (int8_t)ftype; - *(int32_t *)(pQueue->pBuffer + 4) = bodyLen; + *(int32_t *)(pQueue->pBuffer + 4) = rawBodyLen; } if (pQueue->tail < pQueue->head) { @@ -239,18 +244,20 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea return 0; } - int16_t headLen = 0; + int16_t rawHeadLen = 0; int8_t ftype = 0; - int32_t bodyLen = 0; + int32_t rawBodyLen = 0; if (pQueue->head < pQueue->total) { - headLen = *(int16_t *)(pQueue->pBuffer + pQueue->head); + rawHeadLen = *(int16_t *)(pQueue->pBuffer + pQueue->head); ftype = *(int8_t *)(pQueue->pBuffer + pQueue->head + 2); - bodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4); + rawBodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4); } else { - headLen = *(int16_t *)(pQueue->pBuffer); + rawHeadLen = *(int16_t *)(pQueue->pBuffer); ftype = *(int8_t *)(pQueue->pBuffer + 2); - bodyLen = *(int32_t *)(pQueue->pBuffer + 4); + rawBodyLen = *(int32_t *)(pQueue->pBuffer + 4); } + int16_t headLen = CEIL8(rawHeadLen); + int16_t bodyLen = CEIL8(rawBodyLen); void *pHead = (*mallocHeadFp)(headLen); void *pBody = (*mallocBodyFp)(bodyLen); @@ -301,12 +308,12 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea *ppHead = pHead; *ppBody = pBody; - *pHeadLen = headLen; - *pBodyLen = bodyLen; + *pHeadLen = rawHeadLen; + *pBodyLen = rawBodyLen; *pFuncType = (ProcFuncType)ftype; uTrace("proc:%s, pop msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype, - pQueue->items, headLen, pHead, bodyLen, pBody); + pQueue->items, rawHeadLen, pHead, rawBodyLen, pBody); return 1; } @@ -383,7 +390,7 @@ static void taosProcThreadLoop(SProcObj *pProc) { freeBodyFp = pProc->parentFreeBodyFp; } - uDebug("proc:%s, start to get msg from queue:%p", pProc->name, pQueue); + uDebug("proc:%s, start to get msg from queue:%p, thread:%" PRId64, pProc->name, pQueue, pProc->thread); while (1) { int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype, mallocHeadFp, freeHeadFp, @@ -412,11 +419,11 @@ int32_t taosProcRun(SProcObj *pProc) { return -1; } - uDebug("proc:%s, start to consume queue:%p, thread:%" PRId64, pProc->name, pProc->pChildQueue, pProc->thread); + uDebug("proc:%s, start to consume, thread:%" PRId64, pProc->name, pProc->thread); return 0; } -static void taosProcStop(SProcObj *pProc) { +void taosProcStop(SProcObj *pProc) { if (!taosCheckPthreadValid(pProc->thread)) return; uDebug("proc:%s, start to join thread:%" PRId64, pProc->name, pProc->thread); @@ -428,6 +435,7 @@ static void taosProcStop(SProcObj *pProc) { } tsem_post(&pQueue->sem); taosThreadJoin(pProc->thread, NULL); + pProc->thread = 0; } void taosProcCleanup(SProcObj *pProc) { @@ -448,6 +456,10 @@ void taosProcCleanup(SProcObj *pProc) { int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, void *handle, ProcFuncType ftype) { + if (ftype != PROC_REQ) { + terrno = TSDB_CODE_INVALID_PARA; + return -1; + } return taosProcQueuePush(pProc, pProc->pChildQueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ftype); } @@ -472,8 +484,7 @@ void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, c ProcFuncType ftype) { int32_t retry = 0; while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, ftype) != 0) { - uInfo("proc:%s, failed to put msg to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), - retry); + uInfo("proc:%s, failed to put to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), retry); retry++; taosMsleep(retry); } diff --git a/source/util/test/procTest.cpp b/source/util/test/procTest.cpp index 0a0ae32882..9924b1bf4e 100644 --- a/source/util/test/procTest.cpp +++ b/source/util/test/procTest.cpp @@ -10,23 +10,31 @@ */ #include +#include "sut.h" #include "tprocess.h" #include "tqueue.h" #include "trpc.h" -#include "sut.h" class UtilTesProc : public ::testing::Test { public: void SetUp() override { test.InitLog("/tmp/td"); - uDebugFlag = 207; + // uDebugFlag = 207; shm.id = -1; + for (int32_t i = 0; i < 4000; ++i) { + body[i] = i % 26 + 'a'; + } + head.pCont = body; + head.code = 1; + head.msgType = 2; + head.noResp = 3; + head.persistHandle = 4; } - void TearDown() override { - taosDropShm(&shm); - } + void TearDown() override { taosDropShm(&shm); } public: + static SRpcMsg head; + static char body[4000]; static Testbase test; static SShm shm; static void SetUpTestSuite() {} @@ -35,8 +43,10 @@ class UtilTesProc : public ::testing::Test { Testbase UtilTesProc::test; SShm UtilTesProc::shm; +char UtilTesProc::body[4000]; +SRpcMsg UtilTesProc::head; -TEST_F(UtilTesProc, 01_Create_Drop_Proc) { +TEST_F(UtilTesProc, 00_Init_Cleanup) { ASSERT_EQ(taosCreateShm(&shm, 1234, 1024 * 1024 * 2), 0); shm.size = 1023; @@ -61,5 +71,116 @@ TEST_F(UtilTesProc, 01_Create_Drop_Proc) { proc = taosProcInit(&cfg); ASSERT_NE(proc, nullptr); + ASSERT_EQ(taosProcRun(proc), 0); + taosProcCleanup(proc); taosDropShm(&shm); -} \ No newline at end of file +} + +void ConsumeChild1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) { + SRpcMsg msg; + memcpy(&msg, pHead, headLen); + char body[2000] = {0}; + memcpy(body, pBody, bodyLen); + + uDebug("====> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d head:%d:%d:%d:%d body:%s <====", (int64_t)parent, + ftype, headLen, bodyLen, msg.code, msg.msgType, msg.noResp, msg.persistHandle, body); + rpcFreeCont(pBody); + taosFreeQitem(pHead); +} + +TEST_F(UtilTesProc, 01_Push_Pop_Child) { + shm.size = 3000; + ASSERT_EQ(taosCreateShm(&shm, 1235, shm.size), 0); + SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)ConsumeChild1, + .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, + .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, + .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .parentConsumeFp = (ProcConsumeFp)NULL, + .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, + .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, + .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .shm = shm, + .parent = (void *)((int64_t)1235), + .name = "child_queue"}; + SProcObj *cproc = taosProcInit(&cfg); + ASSERT_NE(cproc, nullptr); + + ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_RSP), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_REGIST), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_RELEASE), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, NULL, 12, body, 0, 0, PROC_REQ), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_REQ), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, shm.size, body, 0, 0, PROC_REQ), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, shm.size, 0, PROC_REQ), 0); + + for (int32_t j = 0; j < 1000; j++) { + int32_t i = 0; + for (i = 0; i < 20; ++i) { + ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, i, 0, PROC_REQ), 0); + } + ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, i, 0, PROC_REQ), 0); + + cfg.isChild = true; + cfg.name = "child_queue"; + SProcObj *pproc = taosProcInit(&cfg); + ASSERT_NE(pproc, nullptr); + taosProcRun(pproc); + taosProcCleanup(pproc); + } + + taosProcCleanup(cproc); + taosDropShm(&shm); +} + +void ConsumeParent1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) { + SRpcMsg msg; + memcpy(&msg, pHead, headLen); + char body[2000] = {0}; + memcpy(body, pBody, bodyLen); + + uDebug("----> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d head:%d:%d:%d:%d body:%s <----", (int64_t)parent, + ftype, headLen, bodyLen, msg.code, msg.msgType, msg.noResp, msg.persistHandle, body); + rpcFreeCont(pBody); + taosMemoryFree(pHead); +} + +TEST_F(UtilTesProc, 02_Push_Pop_Parent) { + shm.size = 3000; + ASSERT_EQ(taosCreateShm(&shm, 1236, shm.size), 0); + SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)NULL, + .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, + .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, + .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .parentConsumeFp = (ProcConsumeFp)ConsumeParent1, + .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, + .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, + .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .shm = shm, + .parent = (void *)((int64_t)1235), + .name = "child_queue"}; + SProcObj *cproc = taosProcInit(&cfg); + ASSERT_NE(cproc, nullptr); + + cfg.name = "parent_queue"; + cfg.isChild = true; + SProcObj *pproc = taosProcInit(&cfg); + ASSERT_NE(pproc, nullptr); + + for (int32_t j = 0; j < 1000; j++) { + int32_t i = 0; + for (i = 0; i < 20; ++i) { + taosProcPutToParentQ(pproc, &head, sizeof(SRpcMsg), body, i, PROC_REQ); + } + + taosProcRun(cproc); + taosProcStop(cproc); + } + + taosProcCleanup(pproc); + taosProcCleanup(cproc); + taosDropShm(&shm); +}