diff --git a/source/dnode/mgmt/main/dndExec.c b/source/dnode/mgmt/main/dndExec.c index d76ef37a5d..b2d5732706 100644 --- a/source/dnode/mgmt/main/dndExec.c +++ b/source/dnode/mgmt/main/dndExec.c @@ -89,6 +89,7 @@ static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) { } static void dndProcessProcHandle(void *handle) { + dInfo("handle:%p, the child process dies and send an offline rsp", handle); SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_DND_OFFLINE}; rpcSendResponse(&rpcMsg); } diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 1e71e7a8a9..e8abb38b9f 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -476,6 +476,7 @@ void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) { while (h != NULL) { void *handle = *((void **)h); (*HandleFp)(handle); + h = taosHashIterate(pProc->hash, h); } taosThreadMutexUnlock(&pProc->pChildQueue->mutex); } diff --git a/source/util/test/procTest.cpp b/source/util/test/procTest.cpp index 9924b1bf4e..04a227e52b 100644 --- a/source/util/test/procTest.cpp +++ b/source/util/test/procTest.cpp @@ -103,7 +103,7 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) { .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, .shm = shm, .parent = (void *)((int64_t)1235), - .name = "child_queue"}; + .name = "1235_c"}; SProcObj *cproc = taosProcInit(&cfg); ASSERT_NE(cproc, nullptr); @@ -123,7 +123,7 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) { ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, i, 0, PROC_REQ), 0); cfg.isChild = true; - cfg.name = "child_queue"; + cfg.name = "1235_p"; SProcObj *pproc = taosProcInit(&cfg); ASSERT_NE(pproc, nullptr); taosProcRun(pproc); @@ -160,12 +160,12 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) { .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, .shm = shm, - .parent = (void *)((int64_t)1235), - .name = "child_queue"}; + .parent = (void *)((int64_t)1236), + .name = "1236_c"}; SProcObj *cproc = taosProcInit(&cfg); ASSERT_NE(cproc, nullptr); - cfg.name = "parent_queue"; + cfg.name = "1236_p"; cfg.isChild = true; SProcObj *pproc = taosProcInit(&cfg); ASSERT_NE(pproc, nullptr); @@ -184,3 +184,61 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) { taosProcCleanup(cproc); taosDropShm(&shm); } + +void ConsumeChild3(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) { + SRpcMsg msg; + memcpy(&msg, pHead, headLen); + char body[2000] = {0}; + memcpy(body, pBody, bodyLen); + + uDebug("====> parent:%" PRId64 " ftype:%d, headLen:%d bodyLen:%d handle:%" PRId64 " body:%s <====", (int64_t)parent, + ftype, headLen, bodyLen, (int64_t)msg.handle, body); + rpcFreeCont(pBody); + taosFreeQitem(pHead); +} + +void processHandle(void *handle) { uDebug("----> remove handle:%" PRId64 " <----", (int64_t)handle); } + +TEST_F(UtilTesProc, 03_Handle) { + // uDebugFlag = 207; + shm.size = 3000; + ASSERT_EQ(taosCreateShm(&shm, 1237, shm.size), 0); + SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)ConsumeChild3, + .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, + .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, + .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .parentConsumeFp = (ProcConsumeFp)NULL, + .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, + .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, + .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .shm = shm, + .parent = (void *)((int64_t)1235), + .name = "1237_p"}; + SProcObj *cproc = taosProcInit(&cfg); + ASSERT_NE(cproc, nullptr); + + for (int32_t j = 0; j < 1; j++) { + int32_t i = 0; + for (i = 0; i < 20; ++i) { + head.handle = (void *)((int64_t)i); + ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(SRpcMsg), body, i, (void *)((int64_t)i), PROC_REQ), 0); + } + + cfg.isChild = true; + cfg.name = "child_queue"; + SProcObj *pproc = taosProcInit(&cfg); + ASSERT_NE(pproc, nullptr); + taosProcRun(pproc); + taosProcCleanup(pproc); + + taosProcRemoveHandle(cproc, (void *)((int64_t)3)); + taosProcRemoveHandle(cproc, (void *)((int64_t)5)); + taosProcRemoveHandle(cproc, (void *)((int64_t)6)); + taosProcCloseHandles(cproc, processHandle); + } + + taosProcCleanup(cproc); + taosDropShm(&shm); +}