From 279af752c310137b29bdfae7ed4586be09ebeda0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 6 Apr 2022 10:22:16 +0800 Subject: [PATCH 01/10] rename test --- source/util/src/tprocess.c | 7 +++++-- source/util/test/CMakeLists.txt | 8 ++++---- source/util/test/{queueTest.cpp => procTest.cpp} | 0 3 files changed, 9 insertions(+), 6 deletions(-) rename source/util/test/{queueTest.cpp => procTest.cpp} (100%) diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 139c35de45..cacedce857 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -392,7 +392,7 @@ static void taosProcThreadLoop(SProcObj *pProc) { uDebug("proc:%s, get no msg from queue:%p and exit the proc thread", pProc->name, pQueue); break; } else if (numOfMsgs < 0) { - uTrace("proc:%s, get no msg from queue:%p since %s", pProc->name, pQueue, terrstr()); + uError("proc:%s, get no msg from queue:%p since %s", pProc->name, pQueue, terrstr()); taosMsleep(1); continue; } else { @@ -470,7 +470,10 @@ void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) { void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, ProcFuncType ftype) { + int32_t retry = 0; while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, ftype) != 0) { - taosMsleep(1); + uInfo("proc:%s, failed to put msg to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), retry); + retry++; + taosMsleep(retry); } } diff --git a/source/util/test/CMakeLists.txt b/source/util/test/CMakeLists.txt index d6b779b6e3..0d990204d2 100644 --- a/source/util/test/CMakeLists.txt +++ b/source/util/test/CMakeLists.txt @@ -46,11 +46,11 @@ add_executable(encodeTest "encodeTest.cpp") target_link_libraries(encodeTest os util gtest gtest_main) # queueTest -add_executable(queue_test "queueTest.cpp") -target_link_libraries(queue_test os util gtest_main) +add_executable(proc_test "procTest.cpp") +target_link_libraries(proc_test os util gtest_main) add_test( - NAME queue_test - COMMAND queue_test + NAME proc_test + COMMAND proc_test ) # cfgTest diff --git a/source/util/test/queueTest.cpp b/source/util/test/procTest.cpp similarity index 100% rename from source/util/test/queueTest.cpp rename to source/util/test/procTest.cpp From 5382648ba61ae1682a058fde0023f6fbbcadf2e6 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 6 Apr 2022 11:01:16 +0800 Subject: [PATCH 02/10] proc test --- source/dnode/mgmt/test/sut/inc/sut.h | 4 +- source/util/src/tprocess.c | 5 ++- source/util/test/CMakeLists.txt | 8 ++-- source/util/test/procTest.cpp | 57 +++++++++++++++++++++++----- 4 files changed, 55 insertions(+), 19 deletions(-) diff --git a/source/dnode/mgmt/test/sut/inc/sut.h b/source/dnode/mgmt/test/sut/inc/sut.h index f7e2831160..1dd68574fe 100644 --- a/source/dnode/mgmt/test/sut/inc/sut.h +++ b/source/dnode/mgmt/test/sut/inc/sut.h @@ -40,9 +40,7 @@ class Testbase { void ServerStart(); void ClientRestart(); SRpcMsg* SendReq(tmsg_t msgType, void* pCont, int32_t contLen); - - private: - void InitLog(const char* path); + void InitLog(const char* path); private: TestServer server; diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index cacedce857..51eb918c54 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -80,7 +80,7 @@ static int32_t taosProcInitMutex(SProcQueue *pQueue) { } if (taosThreadMutexInit(&pQueue->mutex, &mattr) != 0) { - taosThreadMutexDestroy(&pQueue->mutex); + taosThreadMutexAttrDestroy(&mattr); terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to init mutex since %s", terrstr()); return -1; @@ -472,7 +472,8 @@ void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, c ProcFuncType ftype) { int32_t retry = 0; while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, ftype) != 0) { - uInfo("proc:%s, failed to put msg to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), retry); + uInfo("proc:%s, failed to put msg to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), + retry); retry++; taosMsleep(retry); } diff --git a/source/util/test/CMakeLists.txt b/source/util/test/CMakeLists.txt index 0d990204d2..d98f9f677d 100644 --- a/source/util/test/CMakeLists.txt +++ b/source/util/test/CMakeLists.txt @@ -46,11 +46,11 @@ add_executable(encodeTest "encodeTest.cpp") target_link_libraries(encodeTest os util gtest gtest_main) # queueTest -add_executable(proc_test "procTest.cpp") -target_link_libraries(proc_test os util gtest_main) +add_executable(procTest "procTest.cpp") +target_link_libraries(procTest os util transport sut gtest_main) add_test( - NAME proc_test - COMMAND proc_test + NAME procTest + COMMAND procTest ) # cfgTest diff --git a/source/util/test/procTest.cpp b/source/util/test/procTest.cpp index 0c4bcf84ad..0a0ae32882 100644 --- a/source/util/test/procTest.cpp +++ b/source/util/test/procTest.cpp @@ -10,19 +10,56 @@ */ #include - -#include "os.h" +#include "tprocess.h" #include "tqueue.h" +#include "trpc.h" +#include "sut.h" -#include -#include - -class UtilTestQueue : public ::testing::Test { +class UtilTesProc : public ::testing::Test { public: - void SetUp() override {} - void TearDown() override {} + void SetUp() override { + test.InitLog("/tmp/td"); + uDebugFlag = 207; + shm.id = -1; + } + void TearDown() override { + taosDropShm(&shm); + } public: - static void SetUpTestSuite() {} - static void TearDownTestSuite() {} + static Testbase test; + static SShm shm; + static void SetUpTestSuite() {} + static void TearDownTestSuite() {} }; + +Testbase UtilTesProc::test; +SShm UtilTesProc::shm; + +TEST_F(UtilTesProc, 01_Create_Drop_Proc) { + ASSERT_EQ(taosCreateShm(&shm, 1234, 1024 * 1024 * 2), 0); + + shm.size = 1023; + SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)NULL, + .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, + .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, + .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .parentConsumeFp = (ProcConsumeFp)NULL, + .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, + .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, + .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .shm = shm, + .parent = &shm, + .name = "1234"}; + SProcObj *proc = taosProcInit(&cfg); + ASSERT_EQ(proc, nullptr); + + shm.size = 2468; + cfg.shm = shm; + proc = taosProcInit(&cfg); + ASSERT_NE(proc, nullptr); + + taosDropShm(&shm); +} \ No newline at end of file From 16382206671b0378140b603a2f47a662a817251e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 6 Apr 2022 19:23:38 +0800 Subject: [PATCH 03/10] proc test --- include/util/tprocess.h | 5 +- source/util/src/terror.c | 2 +- source/util/src/tprocess.c | 47 +++++++----- source/util/test/procTest.cpp | 135 ++++++++++++++++++++++++++++++++-- 4 files changed, 161 insertions(+), 28 deletions(-) diff --git a/include/util/tprocess.h b/include/util/tprocess.h index 80c855b78c..c5f33140dd 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -22,12 +22,12 @@ extern "C" { #endif -typedef enum { PROC_QUEUE, PROC_REQ, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType; +typedef enum { PROC_REQ = 1, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType; typedef struct SProcObj SProcObj; typedef void *(*ProcMallocFp)(int32_t contLen); typedef void *(*ProcFreeFp)(void *pCont); -typedef void *(*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, +typedef void (*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype); typedef struct { @@ -50,6 +50,7 @@ typedef struct { SProcObj *taosProcInit(const SProcCfg *pCfg); void taosProcCleanup(SProcObj *pProc); int32_t taosProcRun(SProcObj *pProc); +void taosProcStop(SProcObj *pProc); int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, void *handle, ProcFuncType ftype); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 90fa624a8d..a89c3c8eb4 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -68,7 +68,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, "Client and server's t TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version") -TAOS_DEFINE_ERROR(TSDB_CODE_COMPRESS_ERROR, "Failed to compress msg") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not supported") @@ -96,6 +95,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REF_NOT_EXIST, "Ref is not there") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_NUMBER, "Invalid version number") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VERSION_STRING, "Invalid version string") TAOS_DEFINE_ERROR(TSDB_CODE_VERSION_NOT_COMPATIBLE, "Version not compatible") +TAOS_DEFINE_ERROR(TSDB_CODE_COMPRESS_ERROR, "Failed to compress msg") //client TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation") diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 51eb918c54..1e71e7a8a9 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -156,6 +156,11 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) { static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody, int32_t rawBodyLen, int64_t handle, ProcFuncType ftype) { + if (rawHeadLen == 0 || pHead == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return -1; + } + const int32_t headLen = CEIL8(rawHeadLen); const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t fullLen = headLen + bodyLen + 8; @@ -177,13 +182,13 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char const int32_t pos = pQueue->tail; if (pQueue->tail < pQueue->total) { - *(int16_t *)(pQueue->pBuffer + pQueue->tail) = headLen; + *(int16_t *)(pQueue->pBuffer + pQueue->tail) = rawHeadLen; *(int8_t *)(pQueue->pBuffer + pQueue->tail + 2) = (int8_t)ftype; - *(int32_t *)(pQueue->pBuffer + pQueue->tail + 4) = bodyLen; + *(int32_t *)(pQueue->pBuffer + pQueue->tail + 4) = rawBodyLen; } else { - *(int16_t *)(pQueue->pBuffer) = headLen; + *(int16_t *)(pQueue->pBuffer) = rawHeadLen; *(int8_t *)(pQueue->pBuffer + 2) = (int8_t)ftype; - *(int32_t *)(pQueue->pBuffer + 4) = bodyLen; + *(int32_t *)(pQueue->pBuffer + 4) = rawBodyLen; } if (pQueue->tail < pQueue->head) { @@ -239,18 +244,20 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea return 0; } - int16_t headLen = 0; + int16_t rawHeadLen = 0; int8_t ftype = 0; - int32_t bodyLen = 0; + int32_t rawBodyLen = 0; if (pQueue->head < pQueue->total) { - headLen = *(int16_t *)(pQueue->pBuffer + pQueue->head); + rawHeadLen = *(int16_t *)(pQueue->pBuffer + pQueue->head); ftype = *(int8_t *)(pQueue->pBuffer + pQueue->head + 2); - bodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4); + rawBodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4); } else { - headLen = *(int16_t *)(pQueue->pBuffer); + rawHeadLen = *(int16_t *)(pQueue->pBuffer); ftype = *(int8_t *)(pQueue->pBuffer + 2); - bodyLen = *(int32_t *)(pQueue->pBuffer + 4); + rawBodyLen = *(int32_t *)(pQueue->pBuffer + 4); } + int16_t headLen = CEIL8(rawHeadLen); + int16_t bodyLen = CEIL8(rawBodyLen); void *pHead = (*mallocHeadFp)(headLen); void *pBody = (*mallocBodyFp)(bodyLen); @@ -301,12 +308,12 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea *ppHead = pHead; *ppBody = pBody; - *pHeadLen = headLen; - *pBodyLen = bodyLen; + *pHeadLen = rawHeadLen; + *pBodyLen = rawBodyLen; *pFuncType = (ProcFuncType)ftype; uTrace("proc:%s, pop msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype, - pQueue->items, headLen, pHead, bodyLen, pBody); + pQueue->items, rawHeadLen, pHead, rawBodyLen, pBody); return 1; } @@ -383,7 +390,7 @@ static void taosProcThreadLoop(SProcObj *pProc) { freeBodyFp = pProc->parentFreeBodyFp; } - uDebug("proc:%s, start to get msg from queue:%p", pProc->name, pQueue); + uDebug("proc:%s, start to get msg from queue:%p, thread:%" PRId64, pProc->name, pQueue, pProc->thread); while (1) { int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype, mallocHeadFp, freeHeadFp, @@ -412,11 +419,11 @@ int32_t taosProcRun(SProcObj *pProc) { return -1; } - uDebug("proc:%s, start to consume queue:%p, thread:%" PRId64, pProc->name, pProc->pChildQueue, pProc->thread); + uDebug("proc:%s, start to consume, thread:%" PRId64, pProc->name, pProc->thread); return 0; } -static void taosProcStop(SProcObj *pProc) { +void taosProcStop(SProcObj *pProc) { if (!taosCheckPthreadValid(pProc->thread)) return; uDebug("proc:%s, start to join thread:%" PRId64, pProc->name, pProc->thread); @@ -428,6 +435,7 @@ static void taosProcStop(SProcObj *pProc) { } tsem_post(&pQueue->sem); taosThreadJoin(pProc->thread, NULL); + pProc->thread = 0; } void taosProcCleanup(SProcObj *pProc) { @@ -448,6 +456,10 @@ void taosProcCleanup(SProcObj *pProc) { int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, void *handle, ProcFuncType ftype) { + if (ftype != PROC_REQ) { + terrno = TSDB_CODE_INVALID_PARA; + return -1; + } return taosProcQueuePush(pProc, pProc->pChildQueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ftype); } @@ -472,8 +484,7 @@ void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, c ProcFuncType ftype) { int32_t retry = 0; while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, ftype) != 0) { - uInfo("proc:%s, failed to put msg to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), - retry); + uInfo("proc:%s, failed to put to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), retry); retry++; taosMsleep(retry); } diff --git a/source/util/test/procTest.cpp b/source/util/test/procTest.cpp index 0a0ae32882..9924b1bf4e 100644 --- a/source/util/test/procTest.cpp +++ b/source/util/test/procTest.cpp @@ -10,23 +10,31 @@ */ #include +#include "sut.h" #include "tprocess.h" #include "tqueue.h" #include "trpc.h" -#include "sut.h" class UtilTesProc : public ::testing::Test { public: void SetUp() override { test.InitLog("/tmp/td"); - uDebugFlag = 207; + // uDebugFlag = 207; shm.id = -1; + for (int32_t i = 0; i < 4000; ++i) { + body[i] = i % 26 + 'a'; + } + head.pCont = body; + head.code = 1; + head.msgType = 2; + head.noResp = 3; + head.persistHandle = 4; } - void TearDown() override { - taosDropShm(&shm); - } + void TearDown() override { taosDropShm(&shm); } public: + static SRpcMsg head; + static char body[4000]; static Testbase test; static SShm shm; static void SetUpTestSuite() {} @@ -35,8 +43,10 @@ class UtilTesProc : public ::testing::Test { Testbase UtilTesProc::test; SShm UtilTesProc::shm; +char UtilTesProc::body[4000]; +SRpcMsg UtilTesProc::head; -TEST_F(UtilTesProc, 01_Create_Drop_Proc) { +TEST_F(UtilTesProc, 00_Init_Cleanup) { ASSERT_EQ(taosCreateShm(&shm, 1234, 1024 * 1024 * 2), 0); shm.size = 1023; @@ -61,5 +71,116 @@ TEST_F(UtilTesProc, 01_Create_Drop_Proc) { proc = taosProcInit(&cfg); ASSERT_NE(proc, nullptr); + ASSERT_EQ(taosProcRun(proc), 0); + taosProcCleanup(proc); taosDropShm(&shm); -} \ No newline at end of file +} + +void ConsumeChild1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) { + SRpcMsg msg; + memcpy(&msg, pHead, headLen); + char body[2000] = {0}; + memcpy(body, pBody, bodyLen); + + uDebug("====> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d head:%d:%d:%d:%d body:%s <====", (int64_t)parent, + ftype, headLen, bodyLen, msg.code, msg.msgType, msg.noResp, msg.persistHandle, body); + rpcFreeCont(pBody); + taosFreeQitem(pHead); +} + +TEST_F(UtilTesProc, 01_Push_Pop_Child) { + shm.size = 3000; + ASSERT_EQ(taosCreateShm(&shm, 1235, shm.size), 0); + SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)ConsumeChild1, + .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, + .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, + .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .parentConsumeFp = (ProcConsumeFp)NULL, + .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, + .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, + .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .shm = shm, + .parent = (void *)((int64_t)1235), + .name = "child_queue"}; + SProcObj *cproc = taosProcInit(&cfg); + ASSERT_NE(cproc, nullptr); + + ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_RSP), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_REGIST), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_RELEASE), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, NULL, 12, body, 0, 0, PROC_REQ), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_REQ), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, shm.size, body, 0, 0, PROC_REQ), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, shm.size, 0, PROC_REQ), 0); + + for (int32_t j = 0; j < 1000; j++) { + int32_t i = 0; + for (i = 0; i < 20; ++i) { + ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, i, 0, PROC_REQ), 0); + } + ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, i, 0, PROC_REQ), 0); + + cfg.isChild = true; + cfg.name = "child_queue"; + SProcObj *pproc = taosProcInit(&cfg); + ASSERT_NE(pproc, nullptr); + taosProcRun(pproc); + taosProcCleanup(pproc); + } + + taosProcCleanup(cproc); + taosDropShm(&shm); +} + +void ConsumeParent1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) { + SRpcMsg msg; + memcpy(&msg, pHead, headLen); + char body[2000] = {0}; + memcpy(body, pBody, bodyLen); + + uDebug("----> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d head:%d:%d:%d:%d body:%s <----", (int64_t)parent, + ftype, headLen, bodyLen, msg.code, msg.msgType, msg.noResp, msg.persistHandle, body); + rpcFreeCont(pBody); + taosMemoryFree(pHead); +} + +TEST_F(UtilTesProc, 02_Push_Pop_Parent) { + shm.size = 3000; + ASSERT_EQ(taosCreateShm(&shm, 1236, shm.size), 0); + SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)NULL, + .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, + .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, + .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .parentConsumeFp = (ProcConsumeFp)ConsumeParent1, + .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, + .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, + .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .shm = shm, + .parent = (void *)((int64_t)1235), + .name = "child_queue"}; + SProcObj *cproc = taosProcInit(&cfg); + ASSERT_NE(cproc, nullptr); + + cfg.name = "parent_queue"; + cfg.isChild = true; + SProcObj *pproc = taosProcInit(&cfg); + ASSERT_NE(pproc, nullptr); + + for (int32_t j = 0; j < 1000; j++) { + int32_t i = 0; + for (i = 0; i < 20; ++i) { + taosProcPutToParentQ(pproc, &head, sizeof(SRpcMsg), body, i, PROC_REQ); + } + + taosProcRun(cproc); + taosProcStop(cproc); + } + + taosProcCleanup(pproc); + taosProcCleanup(cproc); + taosDropShm(&shm); +} From 563d2ec6134e4d727638ed7c0ff167775badce7c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 6 Apr 2022 19:41:41 +0800 Subject: [PATCH 04/10] proc test --- source/dnode/mgmt/main/dndExec.c | 1 + source/util/src/tprocess.c | 1 + source/util/test/procTest.cpp | 68 +++++++++++++++++++++++++++++--- 3 files changed, 65 insertions(+), 5 deletions(-) diff --git a/source/dnode/mgmt/main/dndExec.c b/source/dnode/mgmt/main/dndExec.c index d76ef37a5d..b2d5732706 100644 --- a/source/dnode/mgmt/main/dndExec.c +++ b/source/dnode/mgmt/main/dndExec.c @@ -89,6 +89,7 @@ static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) { } static void dndProcessProcHandle(void *handle) { + dInfo("handle:%p, the child process dies and send an offline rsp", handle); SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_DND_OFFLINE}; rpcSendResponse(&rpcMsg); } diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 1e71e7a8a9..e8abb38b9f 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -476,6 +476,7 @@ void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) { while (h != NULL) { void *handle = *((void **)h); (*HandleFp)(handle); + h = taosHashIterate(pProc->hash, h); } taosThreadMutexUnlock(&pProc->pChildQueue->mutex); } diff --git a/source/util/test/procTest.cpp b/source/util/test/procTest.cpp index 9924b1bf4e..04a227e52b 100644 --- a/source/util/test/procTest.cpp +++ b/source/util/test/procTest.cpp @@ -103,7 +103,7 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) { .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, .shm = shm, .parent = (void *)((int64_t)1235), - .name = "child_queue"}; + .name = "1235_c"}; SProcObj *cproc = taosProcInit(&cfg); ASSERT_NE(cproc, nullptr); @@ -123,7 +123,7 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) { ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, i, 0, PROC_REQ), 0); cfg.isChild = true; - cfg.name = "child_queue"; + cfg.name = "1235_p"; SProcObj *pproc = taosProcInit(&cfg); ASSERT_NE(pproc, nullptr); taosProcRun(pproc); @@ -160,12 +160,12 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) { .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, .shm = shm, - .parent = (void *)((int64_t)1235), - .name = "child_queue"}; + .parent = (void *)((int64_t)1236), + .name = "1236_c"}; SProcObj *cproc = taosProcInit(&cfg); ASSERT_NE(cproc, nullptr); - cfg.name = "parent_queue"; + cfg.name = "1236_p"; cfg.isChild = true; SProcObj *pproc = taosProcInit(&cfg); ASSERT_NE(pproc, nullptr); @@ -184,3 +184,61 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) { taosProcCleanup(cproc); taosDropShm(&shm); } + +void ConsumeChild3(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) { + SRpcMsg msg; + memcpy(&msg, pHead, headLen); + char body[2000] = {0}; + memcpy(body, pBody, bodyLen); + + uDebug("====> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d handle:%" PRId64 " body:%s <====", (int64_t)parent, + ftype, headLen, bodyLen, (int64_t)msg.handle, body); + rpcFreeCont(pBody); + taosFreeQitem(pHead); +} + +void processHandle(void *handle) { uDebug("----> remove handle:%" PRId64 " <----", (int64_t)handle); } + +TEST_F(UtilTesProc, 03_Handle) { + // uDebugFlag = 207; + shm.size = 3000; + ASSERT_EQ(taosCreateShm(&shm, 1237, shm.size), 0); + SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)ConsumeChild3, + .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, + .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, + .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .parentConsumeFp = (ProcConsumeFp)NULL, + .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, + .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, + .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .shm = shm, + .parent = (void *)((int64_t)1235), + .name = "1237_p"}; + SProcObj *cproc = taosProcInit(&cfg); + ASSERT_NE(cproc, nullptr); + + for (int32_t j = 0; j < 1; j++) { + int32_t i = 0; + for (i = 0; i < 20; ++i) { + head.handle = (void *)((int64_t)i); + ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, i, (void *)((int64_t)i), PROC_REQ), 0); + } + + cfg.isChild = true; + cfg.name = "child_queue"; + SProcObj *pproc = taosProcInit(&cfg); + ASSERT_NE(pproc, nullptr); + taosProcRun(pproc); + taosProcCleanup(pproc); + + taosProcRemoveHandle(cproc, (void *)((int64_t)3)); + taosProcRemoveHandle(cproc, (void *)((int64_t)5)); + taosProcRemoveHandle(cproc, (void *)((int64_t)6)); + taosProcCloseHandles(cproc, processHandle); + } + + taosProcCleanup(cproc); + taosDropShm(&shm); +} From c53489204ce955e32e9f4c081a2e40aca37bd766 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 6 Apr 2022 20:01:00 +0800 Subject: [PATCH 05/10] minor changes --- source/dnode/mgmt/main/dndExec.c | 2 +- source/dnode/mgmt/main/dndInt.c | 2 +- source/util/test/procTest.cpp | 13 ++++++++----- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/source/dnode/mgmt/main/dndExec.c b/source/dnode/mgmt/main/dndExec.c index b2d5732706..830cca34e0 100644 --- a/source/dnode/mgmt/main/dndExec.c +++ b/source/dnode/mgmt/main/dndExec.c @@ -89,7 +89,7 @@ static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) { } static void dndProcessProcHandle(void *handle) { - dInfo("handle:%p, the child process dies and send an offline rsp", handle); + dWarn("handle:%p, the child process dies and send an offline rsp", handle); SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_DND_OFFLINE}; rpcSendResponse(&rpcMsg); } diff --git a/source/dnode/mgmt/main/dndInt.c b/source/dnode/mgmt/main/dndInt.c index e85fe8a9fc..089377c302 100644 --- a/source/dnode/mgmt/main/dndInt.c +++ b/source/dnode/mgmt/main/dndInt.c @@ -106,7 +106,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { } if (dndInitMsgHandle(pDnode) != 0) { - dError("failed to msg handles since %s", terrstr()); + dError("failed to init msg handles since %s", terrstr()); goto _OVER; } diff --git a/source/util/test/procTest.cpp b/source/util/test/procTest.cpp index 04a227e52b..1d1c9f66ae 100644 --- a/source/util/test/procTest.cpp +++ b/source/util/test/procTest.cpp @@ -10,16 +10,14 @@ */ #include -#include "sut.h" #include "tprocess.h" #include "tqueue.h" #include "trpc.h" +#include "tlog.h" class UtilTesProc : public ::testing::Test { public: void SetUp() override { - test.InitLog("/tmp/td"); - // uDebugFlag = 207; shm.id = -1; for (int32_t i = 0; i < 4000; ++i) { body[i] = i % 26 + 'a'; @@ -29,19 +27,24 @@ class UtilTesProc : public ::testing::Test { head.msgType = 2; head.noResp = 3; head.persistHandle = 4; + + taosRemoveDir("/tmp/td"); + taosMkDir("/tmp/td"); + tstrncpy(tsLogDir, "/tmp/td", PATH_MAX); + if (taosInitLog("taosdlog", 1) != 0) { + printf("failed to init log file\n"); + } } void TearDown() override { taosDropShm(&shm); } public: static SRpcMsg head; static char body[4000]; - static Testbase test; static SShm shm; static void SetUpTestSuite() {} static void TearDownTestSuite() {} }; -Testbase UtilTesProc::test; SShm UtilTesProc::shm; char UtilTesProc::body[4000]; SRpcMsg UtilTesProc::head; From 79963b9c34e456f25b68c2015df3bf425ea02cd4 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 6 Apr 2022 20:37:50 +0800 Subject: [PATCH 06/10] rename node type --- source/dnode/mgmt/dm/dmHandle.c | 4 +- source/dnode/mgmt/exe/dndMain.c | 18 +++---- source/dnode/mgmt/inc/dndInt.h | 12 ++--- source/dnode/mgmt/main/dndEnv.c | 4 +- source/dnode/mgmt/main/dndExec.c | 16 +++--- source/dnode/mgmt/main/dndFile.c | 6 +-- source/dnode/mgmt/main/dndInt.c | 8 +-- source/dnode/mgmt/main/dndTransport.c | 2 +- source/util/src/tprocess.c | 3 +- source/util/test/procTest.cpp | 72 +++++++++++++++------------ 10 files changed, 78 insertions(+), 67 deletions(-) diff --git a/source/dnode/mgmt/dm/dmHandle.c b/source/dnode/mgmt/dm/dmHandle.c index 7d627e5870..cb712bfb48 100644 --- a/source/dnode/mgmt/dm/dmHandle.c +++ b/source/dnode/mgmt/dm/dmHandle.c @@ -118,7 +118,7 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { } -static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) { +static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, EDndType ntype, SNodeMsg *pMsg) { SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype); if (pWrapper != NULL) { dndReleaseWrapper(pWrapper); @@ -146,7 +146,7 @@ static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg return code; } -static int32_t dmProcessDropNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) { +static int32_t dmProcessDropNodeMsg(SDnode *pDnode, EDndType ntype, SNodeMsg *pMsg) { SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype); if (pWrapper == NULL) { terrno = TSDB_CODE_NODE_NOT_DEPLOYED; diff --git a/source/dnode/mgmt/exe/dndMain.c b/source/dnode/mgmt/exe/dndMain.c index 48510e9622..997c56f9fb 100644 --- a/source/dnode/mgmt/exe/dndMain.c +++ b/source/dnode/mgmt/exe/dndMain.c @@ -18,15 +18,15 @@ #include "tconfig.h" static struct { - bool dumpConfig; - bool generateGrant; - bool printAuth; - bool printVersion; - char envFile[PATH_MAX]; - char apolloUrl[PATH_MAX]; - SArray *pArgs; // SConfigPair - SDnode *pDnode; - ENodeType ntype; + bool dumpConfig; + bool generateGrant; + bool printAuth; + bool printVersion; + char envFile[PATH_MAX]; + char apolloUrl[PATH_MAX]; + SArray *pArgs; // SConfigPair + SDnode *pDnode; + EDndType ntype; } global = {0}; static void dndStopDnode(int signum, void *info, void *ctx) { diff --git a/source/dnode/mgmt/inc/dndInt.h b/source/dnode/mgmt/inc/dndInt.h index 20e61c43dd..5659becb20 100644 --- a/source/dnode/mgmt/inc/dndInt.h +++ b/source/dnode/mgmt/inc/dndInt.h @@ -49,7 +49,7 @@ extern "C" { #define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} -typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } ENodeType; +typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } EDndType; typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus; typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStatus; typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType; @@ -92,7 +92,7 @@ typedef struct SMgmtWrapper { char *path; int32_t refCount; SRWLatch latch; - ENodeType ntype; + EDndType ntype; bool deployed; bool required; EProcType procType; @@ -126,7 +126,7 @@ typedef struct SDnode { int32_t numOfDisks; uint16_t serverPort; bool dropped; - ENodeType ntype; + EDndType ntype; EDndStatus status; EDndEvent event; SStartupReq startup; @@ -137,8 +137,8 @@ typedef struct SDnode { // dndEnv.c const char *dndStatStr(EDndStatus stat); -const char *dndNodeLogStr(ENodeType ntype); -const char *dndNodeProcStr(ENodeType ntype); +const char *dndNodeLogStr(EDndType ntype); +const char *dndNodeProcStr(EDndType ntype); const char *dndEventStr(EDndEvent ev); // dndExec.c @@ -156,7 +156,7 @@ int32_t dndWriteShmFile(SDnode *pDnode); EDndStatus dndGetStatus(SDnode *pDnode); void dndSetStatus(SDnode *pDnode, EDndStatus stat); void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId); -SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType); +SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndType nType); int32_t dndMarkWrapper(SMgmtWrapper *pWrapper); void dndReleaseWrapper(SMgmtWrapper *pWrapper); void dndHandleEvent(SDnode *pDnode, EDndEvent event); diff --git a/source/dnode/mgmt/main/dndEnv.c b/source/dnode/mgmt/main/dndEnv.c index 8bdc2867d2..3c3f2144ab 100644 --- a/source/dnode/mgmt/main/dndEnv.c +++ b/source/dnode/mgmt/main/dndEnv.c @@ -71,7 +71,7 @@ const char *dndStatStr(EDndStatus status) { } } -const char *dndNodeLogStr(ENodeType ntype) { +const char *dndNodeLogStr(EDndType ntype) { switch (ntype) { case VNODES: return "vnode"; @@ -88,7 +88,7 @@ const char *dndNodeLogStr(ENodeType ntype) { } } -const char *dndNodeProcStr(ENodeType ntype) { +const char *dndNodeProcStr(EDndType ntype) { switch (ntype) { case VNODES: return "taosv"; diff --git a/source/dnode/mgmt/main/dndExec.c b/source/dnode/mgmt/main/dndExec.c index 830cca34e0..6976b318c6 100644 --- a/source/dnode/mgmt/main/dndExec.c +++ b/source/dnode/mgmt/main/dndExec.c @@ -66,7 +66,7 @@ void dndCloseNode(SMgmtWrapper *pWrapper) { } -static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) { +static int32_t dndNewProc(SMgmtWrapper *pWrapper, EDndType n) { char tstr[8] = {0}; char *args[6] = {0}; snprintf(tstr, sizeof(tstr), "%d", n); @@ -97,7 +97,7 @@ static void dndProcessProcHandle(void *handle) { static int32_t dndRunInSingleProcess(SDnode *pDnode) { dInfo("dnode run in single process"); - for (ENodeType n = DNODE; n < NODE_MAX; ++n) { + for (EDndType n = DNODE; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; pWrapper->required = dndRequireNode(pWrapper); if (!pWrapper->required) continue; @@ -110,7 +110,7 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) { dndSetStatus(pDnode, DND_STAT_RUNNING); - for (ENodeType n = 0; n < NODE_MAX; ++n) { + for (EDndType n = 0; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; if (!pWrapper->required) continue; if (pWrapper->fp.startFp == NULL) continue; @@ -142,7 +142,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { return -1; } - for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) { + for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; pWrapper->required = dndRequireNode(pWrapper); if (!pWrapper->required) continue; @@ -170,7 +170,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { return -1; } - for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) { + for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; if (!pWrapper->required) continue; @@ -203,7 +203,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { dInfo("dnode is about to stop"); dndSetStatus(pDnode, DND_STAT_STOPPED); - for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) { + for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; if (!pWrapper->required) continue; if (pDnode->ntype == NODE_MAX) continue; @@ -218,13 +218,13 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { } break; } else { - for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) { + for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; if (!pWrapper->required) continue; if (pDnode->ntype == NODE_MAX) continue; if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) { - dInfo("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId); + dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId); taosProcCloseHandles(pWrapper->pProc, dndProcessProcHandle); dndNewProc(pWrapper, n); } diff --git a/source/dnode/mgmt/main/dndFile.c b/source/dnode/mgmt/main/dndFile.c index 92e6cea3e1..4c42119ec4 100644 --- a/source/dnode/mgmt/main/dndFile.c +++ b/source/dnode/mgmt/main/dndFile.c @@ -164,7 +164,7 @@ int32_t dndReadShmFile(SDnode *pDnode) { goto _OVER; } - for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) { + for (EDndType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) { snprintf(itemName, sizeof(itemName), "%s_shmid", dndNodeProcStr(ntype)); cJSON *shmid = cJSON_GetObjectItem(root, itemName); if (shmid && shmid->type == cJSON_Number) { @@ -180,7 +180,7 @@ int32_t dndReadShmFile(SDnode *pDnode) { } if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) { - for (ENodeType ntype = DNODE; ntype < NODE_MAX; ++ntype) { + for (EDndType ntype = DNODE; ntype < NODE_MAX; ++ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; if (pWrapper->shm.id >= 0) { dDebug("shmid:%d, is closed, size:%d", pWrapper->shm.id, pWrapper->shm.size); @@ -226,7 +226,7 @@ int32_t dndWriteShmFile(SDnode *pDnode) { } len += snprintf(content + len, MAXLEN - len, "{\n"); - for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) { + for (EDndType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\":%d,\n", dndNodeProcStr(ntype), pWrapper->shm.id); if (ntype == NODE_MAX - 1) { diff --git a/source/dnode/mgmt/main/dndInt.c b/source/dnode/mgmt/main/dndInt.c index 089377c302..d406b0c02e 100644 --- a/source/dnode/mgmt/main/dndInt.c +++ b/source/dnode/mgmt/main/dndInt.c @@ -46,7 +46,7 @@ static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { } static void dndClearVars(SDnode *pDnode) { - for (ENodeType n = 0; n < NODE_MAX; ++n) { + for (EDndType n = 0; n < NODE_MAX; ++n) { SMgmtWrapper *pMgmt = &pDnode->wrappers[n]; taosMemoryFreeClear(pMgmt->path); } @@ -89,7 +89,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { smSetMgmtFp(&pDnode->wrappers[SNODE]); bmSetMgmtFp(&pDnode->wrappers[BNODE]); - for (ENodeType n = 0; n < NODE_MAX; ++n) { + for (EDndType n = 0; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name); pWrapper->path = strdup(path); @@ -134,7 +134,7 @@ _OVER: void dndClose(SDnode *pDnode) { if (pDnode == NULL) return; - for (ENodeType n = 0; n < NODE_MAX; ++n) { + for (EDndType n = 0; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; dndCloseNode(pWrapper); } @@ -149,7 +149,7 @@ void dndHandleEvent(SDnode *pDnode, EDndEvent event) { } } -SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType ntype) { +SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, EDndType ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pRetWrapper = pWrapper; diff --git a/source/dnode/mgmt/main/dndTransport.c b/source/dnode/mgmt/main/dndTransport.c index 22895dffcc..f3065bdcad 100644 --- a/source/dnode/mgmt/main/dndTransport.c +++ b/source/dnode/mgmt/main/dndTransport.c @@ -307,7 +307,7 @@ void dndCleanupTrans(SDnode *pDnode) { int32_t dndInitMsgHandle(SDnode *pDnode) { STransMgmt *pMgmt = &pDnode->trans; - for (ENodeType n = 0; n < NODE_MAX; ++n) { + for (EDndType n = 0; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) { diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index e8abb38b9f..74adfbd976 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -478,6 +478,7 @@ void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) { (*HandleFp)(handle); h = taosHashIterate(pProc->hash, h); } + taosHashClear(pProc->hash); taosThreadMutexUnlock(&pProc->pChildQueue->mutex); } @@ -485,7 +486,7 @@ void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, c ProcFuncType ftype) { int32_t retry = 0; while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, ftype) != 0) { - uInfo("proc:%s, failed to put to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), retry); + uWarn("proc:%s, failed to put to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), retry); retry++; taosMsleep(retry); } diff --git a/source/util/test/procTest.cpp b/source/util/test/procTest.cpp index 1d1c9f66ae..54aaf49673 100644 --- a/source/util/test/procTest.cpp +++ b/source/util/test/procTest.cpp @@ -10,10 +10,20 @@ */ #include +#include "tlog.h" #include "tprocess.h" #include "tqueue.h" -#include "trpc.h" -#include "tlog.h" + +typedef struct STestMsg { + uint16_t msgType; + void *pCont; + int contLen; + int32_t code; + void *handle; // rpc handle returned to app + void *ahandle; // app handle set by client + int noResp; // has response or not(default 0, 0: resp, 1: no resp); + int persistHandle; // persist handle or not +} STestMsg; class UtilTesProc : public ::testing::Test { public: @@ -38,7 +48,7 @@ class UtilTesProc : public ::testing::Test { void TearDown() override { taosDropShm(&shm); } public: - static SRpcMsg head; + static STestMsg head; static char body[4000]; static SShm shm; static void SetUpTestSuite() {} @@ -47,7 +57,7 @@ class UtilTesProc : public ::testing::Test { SShm UtilTesProc::shm; char UtilTesProc::body[4000]; -SRpcMsg UtilTesProc::head; +STestMsg UtilTesProc::head; TEST_F(UtilTesProc, 00_Init_Cleanup) { ASSERT_EQ(taosCreateShm(&shm, 1234, 1024 * 1024 * 2), 0); @@ -56,13 +66,13 @@ TEST_F(UtilTesProc, 00_Init_Cleanup) { SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)NULL, .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, - .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .childMallocBodyFp = (ProcMallocFp)taosMemoryMalloc, + .childFreeBodyFp = (ProcFreeFp)taosMemoryMalloc, .parentConsumeFp = (ProcConsumeFp)NULL, .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, - .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .parentMallocBodyFp = (ProcMallocFp)taosMemoryMalloc, + .parentFreeBodyFp = (ProcFreeFp)taosMemoryMalloc, .shm = shm, .parent = &shm, .name = "1234"}; @@ -80,14 +90,14 @@ TEST_F(UtilTesProc, 00_Init_Cleanup) { } void ConsumeChild1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) { - SRpcMsg msg; + STestMsg msg; memcpy(&msg, pHead, headLen); char body[2000] = {0}; memcpy(body, pBody, bodyLen); uDebug("====> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d head:%d:%d:%d:%d body:%s <====", (int64_t)parent, ftype, headLen, bodyLen, msg.code, msg.msgType, msg.noResp, msg.persistHandle, body); - rpcFreeCont(pBody); + taosMemoryFree(pBody); taosFreeQitem(pHead); } @@ -97,13 +107,13 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) { SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)ConsumeChild1, .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, - .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .childMallocBodyFp = (ProcMallocFp)taosMemoryMalloc, + .childFreeBodyFp = (ProcFreeFp)taosMemoryFree, .parentConsumeFp = (ProcConsumeFp)NULL, .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, - .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .parentMallocBodyFp = (ProcMallocFp)taosMemoryMalloc, + .parentFreeBodyFp = (ProcFreeFp)taosMemoryFree, .shm = shm, .parent = (void *)((int64_t)1235), .name = "1235_c"}; @@ -116,14 +126,14 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) { ASSERT_NE(taosProcPutToChildQ(cproc, NULL, 12, body, 0, 0, PROC_REQ), 0); ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_REQ), 0); ASSERT_NE(taosProcPutToChildQ(cproc, &head, shm.size, body, 0, 0, PROC_REQ), 0); - ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, shm.size, 0, PROC_REQ), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, shm.size, 0, PROC_REQ), 0); for (int32_t j = 0; j < 1000; j++) { int32_t i = 0; for (i = 0; i < 20; ++i) { - ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, i, 0, PROC_REQ), 0); + ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_REQ), 0); } - ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, i, 0, PROC_REQ), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_REQ), 0); cfg.isChild = true; cfg.name = "1235_p"; @@ -138,14 +148,14 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) { } void ConsumeParent1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) { - SRpcMsg msg; + STestMsg msg; memcpy(&msg, pHead, headLen); char body[2000] = {0}; memcpy(body, pBody, bodyLen); uDebug("----> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d head:%d:%d:%d:%d body:%s <----", (int64_t)parent, ftype, headLen, bodyLen, msg.code, msg.msgType, msg.noResp, msg.persistHandle, body); - rpcFreeCont(pBody); + taosMemoryFree(pBody); taosMemoryFree(pHead); } @@ -155,13 +165,13 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) { SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)NULL, .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, - .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .childMallocBodyFp = (ProcMallocFp)taosMemoryMalloc, + .childFreeBodyFp = (ProcFreeFp)taosMemoryFree, .parentConsumeFp = (ProcConsumeFp)ConsumeParent1, .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, - .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .parentMallocBodyFp = (ProcMallocFp)taosMemoryMalloc, + .parentFreeBodyFp = (ProcFreeFp)taosMemoryFree, .shm = shm, .parent = (void *)((int64_t)1236), .name = "1236_c"}; @@ -176,7 +186,7 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) { for (int32_t j = 0; j < 1000; j++) { int32_t i = 0; for (i = 0; i < 20; ++i) { - taosProcPutToParentQ(pproc, &head, sizeof(SRpcMsg), body, i, PROC_REQ); + taosProcPutToParentQ(pproc, &head, sizeof(STestMsg), body, i, PROC_REQ); } taosProcRun(cproc); @@ -189,14 +199,14 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) { } void ConsumeChild3(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) { - SRpcMsg msg; + STestMsg msg; memcpy(&msg, pHead, headLen); char body[2000] = {0}; memcpy(body, pBody, bodyLen); uDebug("====> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d handle:%" PRId64 " body:%s <====", (int64_t)parent, ftype, headLen, bodyLen, (int64_t)msg.handle, body); - rpcFreeCont(pBody); + taosMemoryFree(pBody); taosFreeQitem(pHead); } @@ -209,13 +219,13 @@ TEST_F(UtilTesProc, 03_Handle) { SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)ConsumeChild3, .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, - .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .childMallocBodyFp = (ProcMallocFp)taosMemoryMalloc, + .childFreeBodyFp = (ProcFreeFp)taosMemoryFree, .parentConsumeFp = (ProcConsumeFp)NULL, .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, - .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .parentMallocBodyFp = (ProcMallocFp)taosMemoryMalloc, + .parentFreeBodyFp = (ProcFreeFp)taosMemoryFree, .shm = shm, .parent = (void *)((int64_t)1235), .name = "1237_p"}; @@ -226,7 +236,7 @@ TEST_F(UtilTesProc, 03_Handle) { int32_t i = 0; for (i = 0; i < 20; ++i) { head.handle = (void *)((int64_t)i); - ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, i, (void *)((int64_t)i), PROC_REQ), 0); + ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), PROC_REQ), 0); } cfg.isChild = true; From 5d835bc5b1b5acf344d7398bee43a0e4b39ba166 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 6 Apr 2022 21:16:31 +0800 Subject: [PATCH 07/10] cfg for multi process --- include/common/tglobal.h | 7 +++++++ source/common/src/tglobal.c | 21 ++++++++++++++++++++- source/util/src/tprocess.c | 1 - 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 089cb5bb94..94f294d2bf 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -51,7 +51,14 @@ extern int32_t tsCompatibleModel; extern bool tsEnableSlaveQuery; extern bool tsPrintAuth; extern int64_t tsTickPerDay[3]; + +// multi-process extern bool tsMultiProcess; +extern int32_t tsMnodeShmSize; +extern int32_t tsVnodeShmSize; +extern int32_t tsQnodeShmSize; +extern int32_t tsSnodeShmSize; +extern int32_t tsBnodeShmSize; // monitor extern bool tsEnableMonitor; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index c79e153122..3f4639a9dc 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -45,7 +45,14 @@ float tsRatioOfQueryCores = 1.0f; int32_t tsMaxBinaryDisplayWidth = 30; bool tsEnableSlaveQuery = 1; bool tsPrintAuth = 0; -bool tsMultiProcess = 0; + +// multi process +bool tsMultiProcess = false; +int32_t tsMnodeShmSize = TSDB_MAX_WAL_SIZE * 1; +int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10; +int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4; +int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4; +int32_t tsBnodeShmSize = TSDB_MAX_WAL_SIZE * 4; // monitor bool tsEnableMonitor = 1; @@ -347,7 +354,13 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1; if (cfgAddBool(pCfg, "slaveQuery", tsEnableSlaveQuery, 0) != 0) return -1; if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1; + if (cfgAddBool(pCfg, "multiProcess", tsMultiProcess, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1; + // if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, 4096, INT32_MAX, 0) != 0) return -1; if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1; @@ -466,7 +479,13 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval; tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval; tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->bval; + tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->bval; + tsMnodeShmSize = cfgGetItem(pCfg, "mnodeShmSize")->i32; + tsVnodeShmSize = cfgGetItem(pCfg, "vnodeShmSize")->i32; + tsQnodeShmSize = cfgGetItem(pCfg, "qnodeShmSize")->i32; + tsSnodeShmSize = cfgGetItem(pCfg, "snodeShmSize")->i32; + tsBnodeShmSize = cfgGetItem(pCfg, "bnodeShmSize")->i32; tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32; diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 74adfbd976..ad7ed51d4e 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -21,7 +21,6 @@ #include "tlog.h" #include "tqueue.h" -#define SHM_DEFAULT_SIZE (20 * 1024 * 1024) typedef void *(*ProcThreadFp)(void *param); typedef struct SProcQueue { From 82023019922b999905e48f6d0e219be628334da0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 6 Apr 2022 21:53:58 +0800 Subject: [PATCH 08/10] fix dead lock in shm --- source/common/src/tglobal.c | 4 ++-- source/dnode/mgmt/main/dndExec.c | 15 ++++++++++++++- source/util/src/tprocess.c | 2 +- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3f4639a9dc..97306f2da0 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -48,7 +48,7 @@ bool tsPrintAuth = 0; // multi process bool tsMultiProcess = false; -int32_t tsMnodeShmSize = TSDB_MAX_WAL_SIZE * 1; +int32_t tsMnodeShmSize = TSDB_MAX_WAL_SIZE * 2; int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10; int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4; int32_t tsSnodeShmSize = TSDB_MAX_WAL_SIZE * 4; @@ -485,7 +485,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsVnodeShmSize = cfgGetItem(pCfg, "vnodeShmSize")->i32; tsQnodeShmSize = cfgGetItem(pCfg, "qnodeShmSize")->i32; tsSnodeShmSize = cfgGetItem(pCfg, "snodeShmSize")->i32; - tsBnodeShmSize = cfgGetItem(pCfg, "bnodeShmSize")->i32; + // tsBnodeShmSize = cfgGetItem(pCfg, "bnodeShmSize")->i32; tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32; diff --git a/source/dnode/mgmt/main/dndExec.c b/source/dnode/mgmt/main/dndExec.c index 6976b318c6..28b24e97c9 100644 --- a/source/dnode/mgmt/main/dndExec.c +++ b/source/dnode/mgmt/main/dndExec.c @@ -147,7 +147,20 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { pWrapper->required = dndRequireNode(pWrapper); if (!pWrapper->required) continue; - int32_t shmsize = 1024 * 1024 * 2; // size will be a configuration item + int32_t shmsize = tsMnodeShmSize; + if (n == VNODES) { + shmsize = tsVnodeShmSize; + } else if (n == QNODE) { + shmsize = tsQnodeShmSize; + } else if (n == SNODE) { + shmsize = tsSnodeShmSize; + } else if (n == MNODE) { + shmsize = tsMnodeShmSize; + } else if (n == BNODE) { + shmsize = tsBnodeShmSize; + } else { + } + if (taosCreateShm(&pWrapper->shm, n, shmsize) != 0) { terrno = TAOS_SYSTEM_ERROR(terrno); dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr()); diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index ad7ed51d4e..2cafd3f7f6 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -256,7 +256,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea rawBodyLen = *(int32_t *)(pQueue->pBuffer + 4); } int16_t headLen = CEIL8(rawHeadLen); - int16_t bodyLen = CEIL8(rawBodyLen); + int32_t bodyLen = CEIL8(rawBodyLen); void *pHead = (*mallocHeadFp)(headLen); void *pBody = (*mallocBodyFp)(bodyLen); From f6d3ba6265f874298da10e9178453957f29fdb39 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 7 Apr 2022 10:04:32 +0800 Subject: [PATCH 09/10] fix CI error --- source/util/src/tconfig.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index ce9ef5b1c0..e7e870e998 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -348,6 +348,7 @@ SConfigItem *cfgGetItem(SConfig *pCfg, const char *name) { } } + uError("name:%s, cfg not found", name); terrno = TSDB_CODE_CFG_NOT_FOUND; return NULL; } From 0d63e9c2fc150cb8ad9f671123814e9388a095b6 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 7 Apr 2022 10:21:27 +0800 Subject: [PATCH 10/10] minor changes --- source/os/src/osShm.c | 8 +++++++- tests/script/jenkins/basic.txt | 6 +++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/source/os/src/osShm.c b/source/os/src/osShm.c index bf784f14ac..35aec30f39 100644 --- a/source/os/src/osShm.c +++ b/source/os/src/osShm.c @@ -20,7 +20,13 @@ int32_t taosCreateShm(SShm* pShm, int32_t key, int32_t shmsize) { pShm->id = -1; - int32_t shmid = shmget(0X95270000 + key, shmsize, IPC_CREAT | 0600); + // key_t shkey = IPC_PRIVATE; + // int32_t __shmflag = IPC_CREAT | IPC_EXCL | 0600; + + key_t __shkey = 0X95270000 + key; + int32_t __shmflag = IPC_CREAT | 0600; + + int32_t shmid = shmget(__shkey, shmsize, __shmflag); if (shmid < 0) { return -1; } diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 5a3ee003f0..4e04543fd8 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -55,8 +55,8 @@ # --- for multi process mode -./test.sh -f tsim/user/basic1.sim -m -./test.sh -f tsim/stable/vnode3.sim -m -./test.sh -f tsim/tmq/basic.sim -m +# ./test.sh -f tsim/user/basic1.sim -m +# ./test.sh -f tsim/stable/vnode3.sim -m +# ./test.sh -f tsim/tmq/basic.sim -m #======================b1-end===============