From 00c5307d513db41ffbe06437953fcf6c121095fc Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 10 Mar 2022 20:48:38 +0800 Subject: [PATCH 1/4] refactor --- include/common/taosdef.h | 9 +++++-- include/common/tcommon.h | 38 +++++++++++++---------------- source/common/src/tmsg.c | 1 - source/dnode/mnode/impl/src/mnode.c | 2 ++ 4 files changed, 26 insertions(+), 24 deletions(-) diff --git a/include/common/taosdef.h b/include/common/taosdef.h index 69c2618ac8..1584fcb4bf 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -33,7 +33,7 @@ typedef enum { TSDB_SUPER_TABLE = 1, // super table TSDB_CHILD_TABLE = 2, // table created from super table TSDB_NORMAL_TABLE = 3, // ordinary table - TSDB_STREAM_TABLE = 4, // table created from stream computing + TSDB_STREAM_TABLE = 4, // table created by stream processing TSDB_TEMP_TABLE = 5, // temp table created by nest query TSDB_TABLE_MAX = 6 } ETableType; @@ -50,7 +50,12 @@ typedef enum { TSDB_CHECK_ITEM_MAX } ECheckItemType; -typedef enum { TD_ROW_DISCARD_UPDATE = 0, TD_ROW_OVERWRITE_UPDATE = 1, TD_ROW_PARTIAL_UPDATE = 2 } TDUpdateConfig; +typedef enum { + TD_ROW_DISCARD_UPDATE = 0, + TD_ROW_OVERWRITE_UPDATE = 1, + TD_ROW_PARTIAL_UPDATE = 2, +} TDUpdateConfig; + typedef enum { TSDB_STATIS_OK = 0, // statis part exist and load successfully TSDB_STATIS_NONE = 1, // statis part not exist diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 1d3ab4f340..3eb9e88ff5 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -135,6 +135,23 @@ static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) return (void*)buf; } +static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { + if (pBlock == NULL) { + return; + } + + // int32_t numOfOutput = pBlock->info.numOfCols; + int32_t sz = taosArrayGetSize(pBlock->pDataBlock); + for (int32_t i = 0; i < sz; ++i) { + SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); + tfree(pColInfoData->pData); + } + + taosArrayDestroy(pBlock->pDataBlock); + tfree(pBlock->pBlockAgg); + // tfree(pBlock); +} + static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) { int32_t tlen = 0; int32_t sz = 0; @@ -177,23 +194,6 @@ static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) { return buf; } -static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { - if (pBlock == NULL) { - return; - } - - // int32_t numOfOutput = pBlock->info.numOfCols; - int32_t sz = taosArrayGetSize(pBlock->pDataBlock); - for (int32_t i = 0; i < sz; ++i) { - SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); - tfree(pColInfoData->pData); - } - - taosArrayDestroy(pBlock->pDataBlock); - tfree(pBlock->pBlockAgg); - // tfree(pBlock); -} - static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) { if (pRsp->schemas) { if (pRsp->schemas->nCols) { @@ -203,10 +203,6 @@ static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) { } taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))tDeleteSSDataBlock); pRsp->pBlockData = NULL; - // for (int32_t i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) { - // SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i); - // tDeleteSSDataBlock(pDataBlock); - //} } //====================================================================================================================== diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 135ff34207..26861e1ff8 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2434,7 +2434,6 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (tEncodeCStr(&encoder, pReq->physicalPlan) < 0) return -1; if (tEncodeCStr(&encoder, pReq->logicalPlan) < 0) return -1; - tEndEncode(&encoder); int32_t tlen = encoder.pos; diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index d3642f4204..f2baea1cd9 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -30,6 +30,7 @@ #include "mndShow.h" #include "mndSnode.h" #include "mndStb.h" +#include "mndStream.h" #include "mndSubscribe.h" #include "mndSync.h" #include "mndTelem.h" @@ -220,6 +221,7 @@ static int32_t mndInitSteps(SMnode *pMnode) { if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1; if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1; if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1; + if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1; if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1; if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1; if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1; From 36d4ef0af6cbe03e7a5418135fb4851e90382658 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 14 Mar 2022 20:32:19 +0800 Subject: [PATCH 2/4] refactor stream worker --- include/common/tmsg.h | 11 +++ include/dnode/snode/snode.h | 4 ++ source/dnode/mgmt/impl/inc/dndEnv.h | 6 +- source/dnode/mgmt/impl/inc/dndInt.h | 2 +- source/dnode/mgmt/impl/src/dndSnode.c | 92 +++++++++++++++++++++++--- source/dnode/mgmt/impl/src/dndWorker.c | 2 +- source/dnode/snode/src/snode.c | 12 ++++ source/dnode/vnode/src/tq/tqRead.c | 4 +- source/util/src/tworker.c | 2 +- 9 files changed, 118 insertions(+), 17 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a5c2c89b24..5a60761f11 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1118,6 +1118,17 @@ int32_t tSerializeSCMCreateStreamReq(void* buf, int32_t bufLen, const SCMCreateS int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStreamReq* pReq); void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq); +typedef struct { + char name[TSDB_TOPIC_FNAME_LEN]; + int64_t streamId; + char* sql; + char* executorMsg; +} SMVCreateStreamReq, SMSCreateStreamReq; + +typedef struct { + int64_t streamId; +} SMVCreateStreamRsp, SMSCreateStreamRsp; + typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; int8_t igExists; diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index c9fab140cc..21a93532e0 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -80,6 +80,10 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad); */ int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg); + +int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg); + /** * @brief Drop a snode. * diff --git a/source/dnode/mgmt/impl/inc/dndEnv.h b/source/dnode/mgmt/impl/inc/dndEnv.h index 13ef101908..aeea5386b4 100644 --- a/source/dnode/mgmt/impl/inc/dndEnv.h +++ b/source/dnode/mgmt/impl/inc/dndEnv.h @@ -90,9 +90,11 @@ typedef struct { int32_t refCount; int8_t deployed; int8_t dropped; + int8_t uniqueWorkerInUse; SSnode *pSnode; SRWLatch latch; - SDnodeWorker writeWorker; + SArray *uniqueWorkers; // SArray + SDnodeWorker sharedWorker; } SSnodeMgmt; typedef struct { @@ -153,4 +155,4 @@ int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo); } #endif -#endif /*_TD_DND_ENV_H_*/ \ No newline at end of file +#endif /*_TD_DND_ENV_H_*/ diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 4ca6b97ad4..a8530037da 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -70,4 +70,4 @@ void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup); } #endif -#endif /*_TD_DND_INT_H_*/ \ No newline at end of file +#endif /*_TD_DND_INT_H_*/ diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c index 4906aef246..d192d9df01 100644 --- a/source/dnode/mgmt/impl/src/dndSnode.c +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -19,7 +19,20 @@ #include "dndTransport.h" #include "dndWorker.h" -static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg); +typedef struct { + int32_t vgId; + int32_t refCount; + int32_t snVersion; + int8_t dropped; + char *path; + SSnode *pImpl; + STaosQueue *pSharedQ; + STaosQueue *pUniqueQ; +} SSnodeObj; + +static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg); + +static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs); static SSnode *dndAcquireSnode(SDnode *pDnode) { SSnodeMgmt *pMgmt = &pDnode->smgmt; @@ -152,8 +165,18 @@ static int32_t dndWriteSnodeFile(SDnode *pDnode) { static int32_t dndStartSnodeWorker(SDnode *pDnode) { SSnodeMgmt *pMgmt = &pDnode->smgmt; - if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "snode-write", 0, 1, dndProcessSnodeQueue) != 0) { - dError("failed to start snode write worker since %s", terrstr()); + pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *)); + for (int32_t i = 0; i < 2; i++) { + SDnodeWorker uniqueWorker; + if (dndInitWorker(pDnode, &uniqueWorker, DND_WORKER_MULTI, "snode-unique", 1, 1, dndProcessSnodeSharedQueue) != 0) { + dError("failed to start snode unique worker since %s", terrstr()); + return -1; + } + taosArrayPush(pMgmt->uniqueWorkers, &uniqueWorker); + } + if (dndInitWorker(pDnode, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", 4, 4, + dndProcessSnodeSharedQueue)) { + dError("failed to start snode shared worker since %s", terrstr()); return -1; } @@ -169,9 +192,13 @@ static void dndStopSnodeWorker(SDnode *pDnode) { while (pMgmt->refCount > 0) { taosMsleep(10); - } + } - dndCleanupWorker(&pMgmt->writeWorker); + for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) { + SDnodeWorker *worker = taosArrayGetP(pMgmt->uniqueWorkers, i); + dndCleanupWorker(worker); + } + taosArrayDestroy(pMgmt->uniqueWorkers); } static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) { @@ -292,17 +319,36 @@ int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pReq) { } } -static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) { +static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) { SSnodeMgmt *pMgmt = &pDnode->smgmt; - SRpcMsg *pRsp = NULL; int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; SSnode *pSnode = dndAcquireSnode(pDnode); if (pSnode != NULL) { - code = sndProcessMsg(pSnode, pMsg, &pRsp); + for (int32_t i = 0; i < numOfMsgs; i++) { + SRpcMsg *pMsg = NULL; + taosGetQitem(qall, (void **)&pMsg); + + sndProcessUMsg(pSnode, pMsg); + + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } + } + dndReleaseSnode(pDnode, pSnode); +} + +static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; + + SSnode *pSnode = dndAcquireSnode(pDnode); + if (pSnode != NULL) { + code = sndProcessSMsg(pSnode, pMsg); } dndReleaseSnode(pDnode, pSnode); +#if 0 if (pMsg->msgType & 1u) { if (pRsp != NULL) { pRsp->ahandle = pMsg->ahandle; @@ -314,11 +360,32 @@ static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) { rpcSendResponse(&rpcRsp); } } +#endif rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } +static void dndWriteSnodeMsgToRandomWorker(SDnode *pDnode, SRpcMsg *pMsg) { + int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; + + SSnode *pSnode = dndAcquireSnode(pDnode); + if (pSnode != NULL) { + int32_t index = (pDnode->smgmt.uniqueWorkerInUse + 1) % taosArrayGetSize(pDnode->smgmt.uniqueWorkers); + SDnodeWorker *pWorker = taosArrayGet(pDnode->smgmt.uniqueWorkers, index); + code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)); + } + dndReleaseSnode(pDnode, pSnode); + + if (code != 0) { + if (pMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; + rpcSendResponse(&rsp); + } + rpcFreeCont(pMsg->pCont); + } +} + static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) { int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; @@ -337,8 +404,13 @@ static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpc } } -void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.writeWorker, pMsg); +void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { + // judge from msg to write to unique queue + dndWriteSnodeMsgToRandomWorker(pDnode, pMsg); +} + +void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { + dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg); } int32_t dndInitSnode(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndWorker.c b/source/dnode/mgmt/impl/src/dndWorker.c index 5ccf6640c0..38f8737b2b 100644 --- a/source/dnode/mgmt/impl/src/dndWorker.c +++ b/source/dnode/mgmt/impl/src/dndWorker.c @@ -109,4 +109,4 @@ int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen) } return 0; -} \ No newline at end of file +} diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 01500fbc54..91008dd03a 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -31,3 +31,15 @@ int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } void sndDestroy(const char *path) {} + +int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { + // stream deployment + // stream stop/resume + // operator exec + return 0; +} + +int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { + // operator exec + return 0; +} diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 92a111298f..a2342ec85a 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -83,8 +83,8 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { } int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) { - /*int32_t sversion = pHandle->pBlock->sversion;*/ - /*SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);*/ + // currently only rows are used + pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList); pBlockInfo->rows = pHandle->pBlock->numOfRows; pBlockInfo->uid = pHandle->pBlock->uid; diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 1657a85ee8..1fa70da870 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -188,7 +188,7 @@ void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue) { tQWorkerFreeQueu int32_t tWWorkerInit(SWWorkerPool *pool) { pool->nextId = 0; - pool->workers = calloc(sizeof(SWWorker), pool->max); + pool->workers = calloc(pool->max, sizeof(SWWorker)); if (pool->workers == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; From 6044c7511800f470ac27eb74d019cccec113570e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 14 Mar 2022 20:51:13 +0800 Subject: [PATCH 3/4] fix worker init --- source/dnode/mgmt/impl/src/dndSnode.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c index d192d9df01..b27a25680a 100644 --- a/source/dnode/mgmt/impl/src/dndSnode.c +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -167,12 +167,15 @@ static int32_t dndStartSnodeWorker(SDnode *pDnode) { SSnodeMgmt *pMgmt = &pDnode->smgmt; pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *)); for (int32_t i = 0; i < 2; i++) { - SDnodeWorker uniqueWorker; - if (dndInitWorker(pDnode, &uniqueWorker, DND_WORKER_MULTI, "snode-unique", 1, 1, dndProcessSnodeSharedQueue) != 0) { + SDnodeWorker *pUniqueWorker = malloc(sizeof(SDnodeWorker)); + if (pUniqueWorker == NULL) { + return -1; + } + if (dndInitWorker(pDnode, pUniqueWorker, DND_WORKER_MULTI, "snode-unique", 1, 1, dndProcessSnodeSharedQueue) != 0) { dError("failed to start snode unique worker since %s", terrstr()); return -1; } - taosArrayPush(pMgmt->uniqueWorkers, &uniqueWorker); + taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker); } if (dndInitWorker(pDnode, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", 4, 4, dndProcessSnodeSharedQueue)) { From af3a8be54e69598c4f71a694e96a1955822c6f8c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 14 Mar 2022 22:21:30 +0800 Subject: [PATCH 4/4] add test UT --- source/libs/transport/src/trans.c | 35 +++--- source/libs/transport/src/transCli.c | 72 ++++++------ source/libs/transport/src/transComm.c | 98 +--------------- source/libs/transport/src/transSrv.c | 47 ++++---- source/libs/transport/test/transUT.cc | 161 ++++++++++++++++++-------- 5 files changed, 185 insertions(+), 228 deletions(-) diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 015018f73f..58809ee3be 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -53,7 +53,6 @@ void* rpcOpen(const SRpcInit* pInit) { if (pInit->secret) { memcpy(pRpc->secret, pInit->secret, strlen(pInit->secret)); } - return pRpc; } void rpcClose(void* arg) { @@ -113,34 +112,19 @@ void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) { int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } void rpcCancelRequest(int64_t rid) { return; } -int32_t rpcInit() { - // impl later - return 0; -} - -void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg* pMsg, int64_t *pRid) { +void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); uint32_t port = pEpSet->eps[pEpSet->inUse].port; - transSendRequest(shandle, ip, port, pMsg); + transSendRequest(shandle, ip, port, pMsg); } -void rpcSendRecv(void* shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { +void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); uint32_t port = pEpSet->eps[pEpSet->inUse].port; transSendRecv(shandle, ip, port, pMsg, pRsp); } -void rpcSendResponse(const SRpcMsg *pMsg) { - transSendResponse(pMsg); -} -int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { - return transGetConnInfo((void *)thandle, pInfo); -} - -void rpcCleanup(void) { - // impl later - // - return; -} +void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); } +int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); } void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle}; void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle}; @@ -155,4 +139,13 @@ void rpcUnrefHandle(void* handle, int8_t type) { (*taosUnRefHandle[type])(handle); } +int32_t rpcInit() { + // impl later + return 0; +} +void rpcCleanup(void) { + // impl later + return; +} + #endif diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 4af02a982e..2f6ff3763f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -84,7 +84,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* co // register timer in each thread to clear expire conn static void cliTimeoutCb(uv_timer_t* handle); // alloc buf for recv -static void cliAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); // callback after read nbytes from socket static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); // callback after write data to socket @@ -154,50 +154,50 @@ void cliHandleResp(SCliConn* conn) { pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); - // buf's mem alread translated to rpcMsg.pCont - transClearBuffer(&conn->readBuf); - - STransMsg rpcMsg = {0}; - rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); - rpcMsg.pCont = transContFromHead((char*)pHead); - rpcMsg.code = pHead->code; - rpcMsg.msgType = pHead->msgType; - rpcMsg.ahandle = NULL; + STransMsg transMsg = {0}; + transMsg.contLen = transContLenFromMsg(pHead->msgLen); + transMsg.pCont = transContFromHead((char*)pHead); + transMsg.code = pHead->code; + transMsg.msgType = pHead->msgType; + transMsg.ahandle = NULL; SCliMsg* pMsg = conn->data; STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { - rpcMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, rpcMsg.msgType) : NULL; + transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL; } else { - rpcMsg.ahandle = pCtx ? pCtx->ahandle : NULL; + transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; } // if (rpcMsg.ahandle == NULL) { // tDebug("%s cli conn %p handle except", CONN_GET_INST_LABEL(conn), conn); // return; //} - if (pTransInst->pfp != NULL && (*pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) { - rpcMsg.handle = conn; + // buf's mem alread translated to transMsg.pCont + transClearBuffer(&conn->readBuf); + + if (pTransInst->pfp != NULL && (*pTransInst->pfp)(pTransInst->parent, transMsg.msgType)) { + transMsg.handle = conn; CONN_SET_PERSIST_BY_APP(conn); tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); } tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), - inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), rpcMsg.contLen); + inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen); conn->secured = pHead->secured; if (pCtx == NULL || pCtx->pSem == NULL) { tTrace("%s cli conn %p handle resp", pTransInst->label, conn); - (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); + (pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); } else { tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, conn); - memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); + memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg)); tsem_post(pCtx->pSem); } - uv_read_start((uv_stream_t*)conn->stream, cliAllocBufferCb, cliRecvCb); + uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); if (CONN_NO_PERSIST_BY_APP(conn)) { addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); @@ -224,23 +224,23 @@ void cliHandleExcept(SCliConn* pConn) { SCliMsg* pMsg = pConn->data; STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; - STransMsg rpcMsg = {0}; - rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - rpcMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; - rpcMsg.ahandle = NULL; + STransMsg transMsg = {0}; + transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; + transMsg.ahandle = NULL; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { - rpcMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, rpcMsg.msgType) : NULL; + transMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, transMsg.msgType) : NULL; } else { - rpcMsg.ahandle = pCtx ? pCtx->ahandle : NULL; + transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; } if (pCtx == NULL || pCtx->pSem == NULL) { tTrace("%s cli conn %p handle resp", pTransInst->label, pConn); - (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); + (pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); } else { tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, pConn); - memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); + memcpy((char*)(pCtx->pRsp), (char*)(&transMsg), sizeof(transMsg)); tsem_post(pCtx->pSem); } destroyCmsg(pConn->data); @@ -252,9 +252,9 @@ void cliHandleExcept(SCliConn* pConn) { void cliTimeoutCb(uv_timer_t* handle) { SCliThrdObj* pThrd = handle->data; - STrans* pRpc = pThrd->pTransInst; + STrans* pTransInst = pThrd->pTransInst; int64_t currentTime = pThrd->nextTimeout; - tTrace("%s, cli conn timeout, try to remove expire conn from conn pool", pRpc->label); + tTrace("%s, cli conn timeout, try to remove expire conn from conn pool", pTransInst->label); SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); while (p != NULL) { @@ -271,8 +271,8 @@ void cliTimeoutCb(uv_timer_t* handle) { p = taosHashIterate((SHashObj*)pThrd->pool, p); } - pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); - uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); + pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); + uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0); } void* createConnPool(int size) { @@ -324,15 +324,15 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); - STrans* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; + STrans* pTransInst = ((SCliThrdObj*)conn->hostThrd)->pTransInst; - conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); + conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); // list already create before assert(plist != NULL); QUEUE_PUSH(&plist->conn, &conn->conn); } -static void cliAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { +static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; transAllocBuffer(pBuf, buf); @@ -416,7 +416,7 @@ static void cliSendCb(uv_write_t* req, int status) { cliHandleExcept(pConn); return; } - uv_read_start((uv_stream_t*)pConn->stream, cliAllocBufferCb, cliRecvCb); + uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb); } void cliSend(SCliConn* pConn) { @@ -581,14 +581,14 @@ static void* cliWorkThread(void* arg) { void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { SCliObj* cli = calloc(1, sizeof(SCliObj)); - STrans* pRpc = shandle; + STrans* pTransInst = shandle; memcpy(cli->label, label, strlen(label)); cli->numOfThreads = numOfThreads; cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*)); for (int i = 0; i < cli->numOfThreads; i++) { SCliThrdObj* pThrd = createThrdObj(); - pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); + pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); pThrd->pTransInst = shandle; int err = pthread_create(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd)); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index c83f76c2ec..367cb33fc9 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -16,20 +16,6 @@ #include "transComm.h" -int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { - T_MD5_CTX context; - int ret = -1; - - tMD5Init(&context); - tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); - tMD5Update(&context, (uint8_t*)pMsg, msgLen); - tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); - tMD5Final(&context); - - if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0; - - return ret; -} int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { T_MD5_CTX context; int ret = -1; @@ -44,17 +30,7 @@ int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { return ret; } -void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) { - T_MD5_CTX context; - tMD5Init(&context); - tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); - tMD5Update(&context, (uint8_t*)pMsg, msgLen); - tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); - tMD5Final(&context); - - memcpy(pAuth, context.digest, sizeof(context.digest)); -} void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) { T_MD5_CTX context; @@ -67,45 +43,6 @@ void transBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) { memcpy(pAuth, context.digest, sizeof(context.digest)); } -int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { - SRpcHead* pHead = rpcHeadFromCont(pCont); - int32_t finalLen = 0; - int overhead = sizeof(SRpcComp); - - if (!NEEDTO_COMPRESSS_MSG(contLen)) { - return contLen; - } - - char* buf = malloc(contLen + overhead + 8); // 8 extra bytes - if (buf == NULL) { - tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen); - return contLen; - } - - int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead); - tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead); - - /* - * only the compressed size is less than the value of contLen - overhead, the compression is applied - * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message - */ - if (compLen > 0 && compLen < contLen - overhead) { - SRpcComp* pComp = (SRpcComp*)pCont; - pComp->reserved = 0; - pComp->contLen = htonl(contLen); - memcpy(pCont + overhead, buf, compLen); - - pHead->comp = 1; - tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen); - finalLen = compLen + overhead; - } else { - finalLen = contLen; - } - - free(buf); - return finalLen; -} - bool transCompressMsg(char* msg, int32_t len, int32_t* flen) { return false; // SRpcHead* pHead = rpcHeadFromCont(pCont); @@ -154,39 +91,6 @@ bool transDecompressMsg(char* msg, int32_t len, int32_t* flen) { return false; } -SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) { - int overhead = sizeof(SRpcComp); - SRpcHead* pNewHead = NULL; - uint8_t* pCont = pHead->content; - SRpcComp* pComp = (SRpcComp*)pHead->content; - - if (pHead->comp) { - // decompress the content - assert(pComp->reserved == 0); - int contLen = htonl(pComp->contLen); - - // prepare the temporary buffer to decompress message - char* temp = (char*)malloc(contLen + RPC_MSG_OVERHEAD); - pNewHead = (SRpcHead*)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext - - if (pNewHead) { - int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead; - int origLen = LZ4_decompress_safe((char*)(pCont + overhead), (char*)pNewHead->content, compLen, contLen); - assert(origLen == contLen); - - memcpy(pNewHead, pHead, sizeof(SRpcHead)); - pNewHead->msgLen = rpcMsgLenFromCont(origLen); - /// rpcFreeMsg(pHead); // free the compressed message buffer - pHead = pNewHead; - tTrace("decomp malloc mem:%p", temp); - } else { - tError("failed to allocate memory to decompress msg, contLen:%d", contLen); - } - } - - return pHead; -} - void transConnCtxDestroy(STransConnCtx* ctx) { free(ctx->ip); free(ctx); @@ -315,7 +219,7 @@ int transSendAsync(SAsyncPool* pool, queue* q) { if (el > 50) { // tInfo("lock and unlock cost: %d", (int)el); } - return uv_async_send(async); } + #endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index c236a69f4e..cb3bbaefec 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -58,12 +58,13 @@ typedef struct SWorkThrdObj { uv_os_fd_t fd; uv_loop_t* loop; SAsyncPool* asyncPool; - // uv_async_t* workerAsync; // + queue msg; - queue conn; pthread_mutex_t msgMtx; - void* pTransInst; - bool quit; + + queue conn; + void* pTransInst; + bool quit; } SWorkThrdObj; typedef struct SServerObj { @@ -90,7 +91,7 @@ static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen); static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen); static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); -static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void uvOnTimeoutCb(uv_timer_t* handle); static void uvOnSendCb(uv_write_t* req, int status); @@ -120,7 +121,7 @@ static void* acceptThread(void* arg); static bool addHandleToWorkloop(void* arg); static bool addHandleToAcceptloop(void* arg); -void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { +void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SSrvConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; transAllocBuffer(pBuf, buf); @@ -162,7 +163,7 @@ static int uvAuthMsg(SSrvConn* pConn, char* msg, int len) { tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta); code = TSDB_CODE_RPC_INVALID_TIME_STAMP; } else { - if (rpcAuthenticateMsg(pHead, len - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { + if (transAuthenticateMsg(pHead, len - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { // tDebug("%s, authentication failed, msg discarded", pConn->info); code = TSDB_CODE_RPC_AUTH_FAILURE; } else { @@ -203,10 +204,6 @@ static void uvHandleReq(SSrvConn* pConn) { memcpy(pConn->user, uMsg->user, tListLen(uMsg->user)); memcpy(pConn->secret, uMsg->secret, tListLen(uMsg->secret)); } - - pConn->inType = pHead->msgType; - - STrans* pRpc = (STrans*)p->shandle; pHead->code = htonl(pHead->code); int32_t dlen = 0; @@ -219,21 +216,24 @@ static void uvHandleReq(SSrvConn* pConn) { // } - STransMsg rpcMsg; - rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); - rpcMsg.pCont = pHead->content; - rpcMsg.msgType = pHead->msgType; - rpcMsg.code = pHead->code; - rpcMsg.ahandle = NULL; - rpcMsg.handle = pConn; + STransMsg transMsg; + transMsg.contLen = transContLenFromMsg(pHead->msgLen); + transMsg.pCont = pHead->content; + transMsg.msgType = pHead->msgType; + transMsg.code = pHead->code; + transMsg.ahandle = NULL; + transMsg.handle = pConn; transClearBuffer(&pConn->readBuf); - + pConn->inType = pHead->msgType; transRefSrvHandle(pConn); - tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(rpcMsg.msgType), + + tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), - ntohs(pConn->locaddr.sin_port), rpcMsg.contLen); - (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); + ntohs(pConn->locaddr.sin_port), transMsg.contLen); + + STrans* pTransInst = (STrans*)p->shandle; + (*((STrans*)p->shandle)->cfp)(pTransInst->parent, &transMsg, NULL); // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth // validate msg type @@ -525,7 +525,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { return; } - uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnRecvCb); + uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb); } else { tDebug("failed to create new connection"); @@ -546,7 +546,6 @@ static bool addHandleToWorkloop(void* arg) { return false; } - // STrans* pRpc = pThrd->shandle; uv_pipe_init(pThrd->loop, pThrd->pipe, 1); uv_pipe_open(pThrd->pipe, pThrd->fd); diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index 5edddb006b..fa20327003 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -29,24 +29,25 @@ const char *ckey = "ckey"; class Server; int port = 7000; // server process +typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); // client process; static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); class Client { public: void Init(int nThread) { - memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = 0; - rpcInit.label = (char *)label; - rpcInit.numOfThreads = nThread; - rpcInit.cfp = processResp; - rpcInit.user = (char *)user; - rpcInit.secret = (char *)secret; - rpcInit.ckey = (char *)ckey; - rpcInit.spi = 1; - rpcInit.parent = this; - rpcInit.connType = TAOS_CONN_CLIENT; - this->transCli = rpcOpen(&rpcInit); + memset(&rpcInit_, 0, sizeof(rpcInit_)); + rpcInit_.localPort = 0; + rpcInit_.label = (char *)label; + rpcInit_.numOfThreads = nThread; + rpcInit_.cfp = processResp; + rpcInit_.user = (char *)user; + rpcInit_.secret = (char *)secret; + rpcInit_.ckey = (char *)ckey; + rpcInit_.spi = 1; + rpcInit_.parent = this; + rpcInit_.connType = TAOS_CONN_CLIENT; + this->transCli = rpcOpen(&rpcInit_); tsem_init(&this->sem, 0, 0); } void SetResp(SRpcMsg *pMsg) { @@ -55,9 +56,27 @@ class Client { } SRpcMsg *Resp() { return &this->resp; } - void Restart() { + void Restart(CB cb) { rpcClose(this->transCli); - this->transCli = rpcOpen(&rpcInit); + rpcInit_.cfp = cb; + this->transCli = rpcOpen(&rpcInit_); + } + void setPersistFP(bool (*pfp)(void *parent, tmsg_t msgType)) { + rpcClose(this->transCli); + rpcInit_.pfp = pfp; + this->transCli = rpcOpen(&rpcInit_); + } + void setConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) { + rpcClose(this->transCli); + rpcInit_.mfp = mfp; + this->transCli = rpcOpen(&rpcInit_); + } + void setPAndMFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) { + rpcClose(this->transCli); + + rpcInit_.pfp = pfp; + rpcInit_.mfp = mfp; + this->transCli = rpcOpen(&rpcInit_); } void SendAndRecv(SRpcMsg *req, SRpcMsg *resp) { @@ -79,7 +98,7 @@ class Client { private: tsem_t sem; - SRpcInit rpcInit; + SRpcInit rpcInit_; void * transCli; SRpcMsg resp; }; @@ -133,39 +152,56 @@ static void processResp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { client->SetResp(pMsg); client->SemPost(); } + +static void initEnv() { + dDebugFlag = 143; + vDebugFlag = 0; + mDebugFlag = 143; + cDebugFlag = 0; + jniDebugFlag = 0; + tmrDebugFlag = 143; + uDebugFlag = 143; + rpcDebugFlag = 143; + qDebugFlag = 0; + wDebugFlag = 0; + sDebugFlag = 0; + tsdbDebugFlag = 0; + tsLogEmbedded = 1; + tsAsyncLog = 0; + + std::string path = "/tmp/transport"; + taosRemoveDir(path.c_str()); + taosMkDir(path.c_str()); + + tstrncpy(tsLogDir, path.c_str(), PATH_MAX); + if (taosInitLog("taosdlog", 1) != 0) { + printf("failed to init log file\n"); + } +} class TransObj { public: TransObj() { - dDebugFlag = 143; - vDebugFlag = 0; - mDebugFlag = 143; - cDebugFlag = 0; - jniDebugFlag = 0; - tmrDebugFlag = 143; - uDebugFlag = 143; - rpcDebugFlag = 143; - qDebugFlag = 0; - wDebugFlag = 0; - sDebugFlag = 0; - tsdbDebugFlag = 0; - tsLogEmbedded = 1; - tsAsyncLog = 0; - - std::string path = "/tmp/transport"; - taosRemoveDir(path.c_str()); - taosMkDir(path.c_str()); - - tstrncpy(tsLogDir, path.c_str(), PATH_MAX); - if (taosInitLog("taosdlog", 1) != 0) { - printf("failed to init log file\n"); - } + initEnv(); cli = new Client; cli->Init(1); srv = new Server; srv->Start(); } - void RestartCli() { cli->Restart(); } + + void RestartCli(CB cb) { cli->Restart(cb); } void StopSrv() { srv->Stop(); } + void SetCliPersistFp(bool (*pfp)(void *parent, tmsg_t msgType)) { + // do nothing + cli->setPersistFP(pfp); + } + void SetCliMFp(void *(*mfp)(void *parent, tmsg_t msgType)) { + // do nothing + cli->setConstructFP(mfp); + } + void SetMAndPFp(bool (*pfp)(void *parent, tmsg_t msgType), void *(*mfp)(void *parent, tmsg_t msgType)) { + // do nothing + cli->setPAndMFp(pfp, mfp); + } void RestartSrv() { srv->Restart(); } void cliSendAndRecv(SRpcMsg *req, SRpcMsg *resp) { cli->SendAndRecv(req, resp); } ~TransObj() { @@ -191,16 +227,16 @@ class TransEnv : public ::testing::Test { TransObj *tr = NULL; }; -// TEST_F(TransEnv, 01sendAndRec) { -// for (int i = 0; i < 1; i++) { -// SRpcMsg req = {0}, resp = {0}; -// req.msgType = 0; -// req.pCont = rpcMallocCont(10); -// req.contLen = 10; -// tr->cliSendAndRecv(&req, &resp); -// assert(resp.code == 0); -// } -//} +TEST_F(TransEnv, 01sendAndRec) { + for (int i = 0; i < 1; i++) { + SRpcMsg req = {0}, resp = {0}; + req.msgType = 0; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecv(&req, &resp); + assert(resp.code == 0); + } +} TEST_F(TransEnv, 02StopServer) { for (int i = 0; i < 1; i++) { @@ -218,6 +254,31 @@ TEST_F(TransEnv, 02StopServer) { tr->StopSrv(); // tr->RestartSrv(); tr->cliSendAndRecv(&req, &resp); - assert(resp.code != 0); } +TEST_F(TransEnv, clientUserDefined) {} + +TEST_F(TransEnv, cliPersistHandle) { + // impl late +} +TEST_F(TransEnv, srvPersistHandle) { + // impl later +} + +TEST_F(TransEnv, srvPersisHandleExcept) { + // conn breken + // +} +TEST_F(TransEnv, cliPersisHandleExcept) { + // conn breken +} + +TEST_F(TransEnv, multiCliPersisHandleExcept) { + // conn breken +} +TEST_F(TransEnv, multiSrvPersisHandleExcept) { + // conn breken +} +TEST_F(TransEnv, queryExcept) { + // query and conn is broken +}