From 5e7da3511d8d413a5a3a0895f9987953415e9910 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 24 Apr 2022 18:22:00 +0800 Subject: [PATCH] fix(cluster): add refid in rpc, and handle it in multi-process mode --- include/util/tprocess.h | 4 +-- source/dnode/mgmt/implement/src/dmTransport.c | 4 +-- source/util/src/tprocess.c | 23 +++++++++----- source/util/test/procTest.cpp | 31 +++++++++++-------- 4 files changed, 37 insertions(+), 25 deletions(-) diff --git a/include/util/tprocess.h b/include/util/tprocess.h index 2b0fd89aa5..7e1767441e 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -53,8 +53,8 @@ int32_t taosProcRun(SProcObj *pProc); void taosProcStop(SProcObj *pProc); int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, - void *handle, EProcFuncType ftype); -void taosProcRemoveHandle(SProcObj *pProc, void *handle); + void *handle, int64_t handleRef, EProcFuncType ftype); +int64_t taosProcRemoveHandle(SProcObj *pProc, void *handle); void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)); void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, EProcFuncType ftype); diff --git a/source/dnode/mgmt/implement/src/dmTransport.c b/source/dnode/mgmt/implement/src/dmTransport.c index 404438238b..96bf69be31 100644 --- a/source/dnode/mgmt/implement/src/dmTransport.c +++ b/source/dnode/mgmt/implement/src/dmTransport.c @@ -88,7 +88,7 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe dTrace("msg:%p, is created and put into child queue, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType), pRpc->handle, pMsg->user); code = taosProcPutToChildQ(pWrapper->procObj, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, pRpc->handle, - PROC_FUNC_REQ); + pRpc->refId, PROC_FUNC_REQ); } else { dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user); ASSERT(1); @@ -356,7 +356,7 @@ static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t dmSendRpcReq(pWrapper->pDnode, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg); break; case PROC_FUNC_RSP: - taosProcRemoveHandle(pWrapper->procObj, pMsg->handle); + pMsg->refId = taosProcRemoveHandle(pWrapper->procObj, pMsg->handle); dmSendRpcRsp(pWrapper->pDnode, pMsg); break; default: diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 03c9c255b0..d8343bc427 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -154,7 +154,8 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) { } static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, - const char *pBody, int32_t rawBodyLen, int64_t handle, EProcFuncType ftype) { + const char *pBody, int32_t rawBodyLen, int64_t handle, int64_t handleRef, + EProcFuncType ftype) { if (rawHeadLen == 0 || pHead == NULL) { terrno = TSDB_CODE_INVALID_PARA; return -1; @@ -172,7 +173,7 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char } if (handle != 0 && ftype == PROC_FUNC_REQ) { - if (taosHashPut(pProc->hash, &handle, sizeof(int64_t), &handle, sizeof(int64_t)) != 0) { + if (taosHashPut(pProc->hash, &handle, sizeof(int64_t), &handleRef, sizeof(int64_t)) != 0) { taosThreadMutexUnlock(&pQueue->mutex); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -286,13 +287,13 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea pQueue->head = headLen + bodyLen; } else if (remain < 8 + headLen) { memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, remain - 8); - memcpy((char*)pHead + remain - 8, pQueue->pBuffer, headLen - (remain - 8)); + memcpy((char *)pHead + remain - 8, pQueue->pBuffer, headLen - (remain - 8)); memcpy(pBody, pQueue->pBuffer + headLen - (remain - 8), bodyLen); pQueue->head = headLen - (remain - 8) + bodyLen; } else if (remain < 8 + headLen + bodyLen) { memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen); memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, remain - 8 - headLen); - memcpy((char*)pBody + remain - 8 - headLen, pQueue->pBuffer, bodyLen - (remain - 8 - headLen)); + memcpy((char *)pBody + remain - 8 - headLen, pQueue->pBuffer, bodyLen - (remain - 8 - headLen)); pQueue->head = bodyLen - (remain - 8 - headLen); } else { memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen); @@ -454,19 +455,25 @@ void taosProcCleanup(SProcObj *pProc) { } int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, - void *handle, EProcFuncType ftype) { + void *handle, int64_t handleRef, EProcFuncType ftype) { if (ftype != PROC_FUNC_REQ) { terrno = TSDB_CODE_INVALID_PARA; return -1; } - return taosProcQueuePush(pProc, pProc->pChildQueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ftype); + return taosProcQueuePush(pProc, pProc->pChildQueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, handleRef, + ftype); } -void taosProcRemoveHandle(SProcObj *pProc, void *handle) { +int64_t taosProcRemoveHandle(SProcObj *pProc, void *handle) { int64_t h = (int64_t)handle; taosThreadMutexLock(&pProc->pChildQueue->mutex); + + int64_t *handleRef = taosHashGet(pProc->hash, &h, sizeof(int64_t)); taosHashRemove(pProc->hash, &h, sizeof(int64_t)); taosThreadMutexUnlock(&pProc->pChildQueue->mutex); + + if (handleRef == NULL) return 0; + return *handleRef; } void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) { @@ -484,7 +491,7 @@ void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) { void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, EProcFuncType ftype) { int32_t retry = 0; - while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, ftype) != 0) { + while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, 0, ftype) != 0) { uWarn("proc:%s, failed to put to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), retry); retry++; taosMsleep(retry); diff --git a/source/util/test/procTest.cpp b/source/util/test/procTest.cpp index 3c014369fb..7ffec04a40 100644 --- a/source/util/test/procTest.cpp +++ b/source/util/test/procTest.cpp @@ -120,20 +120,20 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) { SProcObj *cproc = taosProcInit(&cfg); ASSERT_NE(cproc, nullptr); - ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_FUNC_RSP), 0); - ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_FUNC_REGIST), 0); - ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_FUNC_RELEASE), 0); - ASSERT_NE(taosProcPutToChildQ(cproc, NULL, 12, body, 0, 0, PROC_FUNC_REQ), 0); - ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_FUNC_REQ), 0); - ASSERT_NE(taosProcPutToChildQ(cproc, &head, shm.size, body, 0, 0, PROC_FUNC_REQ), 0); - ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, shm.size, 0, PROC_FUNC_REQ), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_RSP), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_REGIST), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_RELEASE), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, NULL, 12, body, 0, 0, 0, PROC_FUNC_REQ), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_REQ), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, shm.size, body, 0, 0, 0, PROC_FUNC_REQ), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, shm.size, 0, 0, PROC_FUNC_REQ), 0); for (int32_t j = 0; j < 1000; j++) { int32_t i = 0; for (i = 0; i < 20; ++i) { - ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_FUNC_REQ), 0); + ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, 0, PROC_FUNC_REQ), 0); } - ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_FUNC_REQ), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, 0, PROC_FUNC_REQ), 0); cfg.isChild = true; cfg.name = "1235_p"; @@ -236,7 +236,7 @@ TEST_F(UtilTesProc, 03_Handle) { int32_t i = 0; for (i = 0; i < 20; ++i) { head.handle = (void *)((int64_t)i); - ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), PROC_FUNC_REQ), 0); + ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), i, PROC_FUNC_REQ), 0); } cfg.isChild = true; @@ -246,9 +246,14 @@ TEST_F(UtilTesProc, 03_Handle) { taosProcRun(pproc); taosProcCleanup(pproc); - taosProcRemoveHandle(cproc, (void *)((int64_t)3)); - taosProcRemoveHandle(cproc, (void *)((int64_t)5)); - taosProcRemoveHandle(cproc, (void *)((int64_t)6)); + int64_t ref = 0; + + ref = taosProcRemoveHandle(cproc, (void *)((int64_t)3)); + EXPECT_EQ(ref, 3); + ref = taosProcRemoveHandle(cproc, (void *)((int64_t)5)); + EXPECT_EQ(ref, 5); + ref = taosProcRemoveHandle(cproc, (void *)((int64_t)6)); + EXPECT_EQ(ref, 6); taosProcCloseHandles(cproc, processHandle); }