From d595fe6ba1b70b24199fb26962e0bccb1242e03b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 22 Mar 2022 20:54:31 +0800 Subject: [PATCH] add ahandle --- source/dnode/mgmt/container/src/dndInt.c | 2 +- source/libs/transport/inc/transComm.h | 18 ++++-- source/libs/transport/src/transCli.c | 74 ++++++++++++++++++------ source/libs/transport/src/transComm.c | 26 ++++++++- source/libs/transport/src/transSrv.c | 5 +- 5 files changed, 95 insertions(+), 30 deletions(-) diff --git a/source/dnode/mgmt/container/src/dndInt.c b/source/dnode/mgmt/container/src/dndInt.c index 8ad4351a88..dc1bde6b06 100644 --- a/source/dnode/mgmt/container/src/dndInt.c +++ b/source/dnode/mgmt/container/src/dndInt.c @@ -135,6 +135,6 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); - SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq)}; + SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = NULL}; rpcSendResponse(&rpcRsp); } diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 32a0cf0d54..962acfee2c 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -158,7 +158,8 @@ typedef struct { char secured : 2; char spi : 2; - uint32_t code; // del later + uint64_t ahandle; // ahandle assigned by client + uint32_t code; // del later uint32_t msgType; int32_t msgLen; uint8_t content[0]; // message body starts from here @@ -296,20 +297,25 @@ void transQueueInit(STransQueue* queue, void (*free)(void* arg)); * if queue'size > 1, return false; else return true */ bool transQueuePush(STransQueue* queue, void* arg); +/* + * the size of queue + */ +int32_t transQueueSize(STransQueue* queue); /* * pop head from queue */ - void* transQueuePop(STransQueue* queue); /* - * get head from queue + * get ith from queue */ -void* transQueueGet(STransQueue* queue); - +void* transQueueGet(STransQueue* queue, int i); +/* + * rm ith from queue + */ +void* transQueueRm(STransQueue* queue, int i); /* * queue empty or not */ - bool transQueueEmpty(STransQueue* queue); /* * clear queue diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 59c4ffad06..8694d4098c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -25,12 +25,11 @@ typedef struct SCliConn { void* hostThrd; SConnBuffer readBuf; void* data; - // SArray* cliMsgs; - STransQueue cliMsgs; - queue conn; - uint64_t expireTime; - int hThrdIdx; - STransCtx ctx; + STransQueue cliMsgs; + queue conn; + uint64_t expireTime; + int hThrdIdx; + STransCtx ctx; bool broken; // link broken or not ConnStatus status; // @@ -151,6 +150,22 @@ static void destroyThrdObj(SCliThrdObj* pThrd); } \ } while (0) +#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ + do { \ + int i = 0, sz = transQueueSize(&conn->cliMsgs); \ + for (; i < sz; i++) { \ + pMsg = transQueueGet(&conn->cliMsgs, i); \ + if (pMsg != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \ + break; \ + } \ + } \ + if (i == sz) { \ + pMsg = NULL; \ + } else { \ + pMsg = transQueueRm(&conn->cliMsgs, i); \ + } \ + } while (0) + #define CONN_HANDLE_THREAD_QUIT(thrd) \ do { \ if (thrd->quit) { \ @@ -205,16 +220,36 @@ void cliHandleResp(SCliConn* conn) { CONN_SHOULD_RELEASE(conn, pHead); - SCliMsg* pMsg = transQueuePop(&conn->cliMsgs); + SCliMsg* pMsg = NULL; + STransConnCtx* pCtx = NULL; - STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; - if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { - transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); - if (transMsg.ahandle == NULL) { - transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); + if (CONN_NO_PERSIST_BY_APP(conn)) { + pMsg = transQueuePop(&conn->cliMsgs); + pCtx = pMsg ? pMsg->ctx : NULL; + if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { + transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); + if (transMsg.ahandle == NULL) { + transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); + } + tDebug("cli conn %p construct ahandle %p, persist: 0", conn, transMsg.ahandle); + } else { + transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; + tDebug("cli conn %p get ahandle %p, persist: 0", conn, transMsg.ahandle); } } else { - transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; + uint64_t ahandle = (uint64_t)pHead->ahandle; + CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); + if (pMsg == NULL) { + transMsg.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType); + if (transMsg.ahandle == NULL) { + transMsg.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType)); + } + tDebug("cli conn %p construct ahandle %p, persist: 1", conn, transMsg.ahandle); + } else { + pCtx = pMsg ? pMsg->ctx : NULL; + transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; + tDebug("cli conn %p get ahandle %p, persist: 1", conn, transMsg.ahandle); + } } // buf's mem alread translated to transMsg.pCont transClearBuffer(&conn->readBuf); @@ -259,8 +294,6 @@ void cliHandleResp(SCliConn* conn) { if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) { // uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } -_RETURN: - return; } void cliHandleExcept(SCliConn* pConn) { @@ -282,11 +315,14 @@ void cliHandleExcept(SCliConn* pConn) { transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; transMsg.ahandle = NULL; + transMsg.handle = pConn; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); + tDebug("cli conn %p construct msgType %s ahandle %p", pConn, TMSG_INFO(transMsg.msgType), transMsg.ahandle); if (transMsg.ahandle == NULL) { transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType)); + tDebug("cli conn %p construct brokenlink ahandle %p", pConn, transMsg.ahandle); } } else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; @@ -472,7 +508,7 @@ static void cliDestroy(uv_handle_t* handle) { static bool cliHandleNoResp(SCliConn* conn) { bool res = false; if (!transQueueEmpty(&conn->cliMsgs)) { - SCliMsg* pMsg = transQueueGet(&conn->cliMsgs); + SCliMsg* pMsg = transQueueGet(&conn->cliMsgs, 0); if (REQUEST_NO_RESP(&pMsg->msg)) { transQueuePop(&conn->cliMsgs); // taosArrayRemove(msgs, 0); @@ -510,7 +546,7 @@ void cliSend(SCliConn* pConn) { // assert(taosArrayGetSize(pConn->cliMsgs) > 0); assert(!transQueueEmpty(&pConn->cliMsgs)); - SCliMsg* pCliMsg = transQueueGet(&pConn->cliMsgs); + SCliMsg* pCliMsg = transQueueGet(&pConn->cliMsgs, 0); STransConnCtx* pCtx = pCliMsg->ctx; SCliThrdObj* pThrd = pConn->hostThrd; @@ -522,7 +558,9 @@ void cliSend(SCliConn* pConn) { pMsg->contLen = 0; } STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); - int msgLen = transMsgLenFromCont(pMsg->contLen); + pHead->ahandle = (uint64_t)pCtx->ahandle; + + int msgLen = transMsgLenFromCont(pMsg->contLen); if (!pConn->secured) { char* buf = calloc(1, msgLen + sizeof(STransUserMsg)); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 5684c332c0..87355ac8d0 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -305,14 +305,34 @@ void* transQueuePop(STransQueue* queue) { taosArrayRemove(queue->q, 0); return ptr; } - -void* transQueueGet(STransQueue* queue) { +int32_t transQueueSize(STransQueue* queue) { + // Get size + return taosArrayGetSize(queue->q); +} +void* transQueueGet(STransQueue* queue, int i) { if (taosArrayGetSize(queue->q) == 0) { return NULL; } - void* ptr = taosArrayGetP(queue->q, 0); + if (i >= taosArrayGetSize(queue->q)) { + return NULL; + } + + void* ptr = taosArrayGetP(queue->q, i); return ptr; } + +void* transQueueRm(STransQueue* queue, int i) { + if (taosArrayGetSize(queue->q) == 0) { + return NULL; + } + if (i >= taosArrayGetSize(queue->q)) { + return NULL; + } + void* ptr = taosArrayGetP(queue->q, i); + taosArrayRemove(queue->q, i); + return ptr; +} + bool transQueueEmpty(STransQueue* queue) { // return taosArrayGetSize(queue->q) == 0; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index b514bdba3f..3ff893b365 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -190,7 +190,7 @@ static void uvHandleReq(SSrvConn* pConn) { transMsg.pCont = pHead->content; transMsg.msgType = pHead->msgType; transMsg.code = pHead->code; - transMsg.ahandle = NULL; + transMsg.ahandle = (void*)pHead->ahandle; transMsg.handle = NULL; transClearBuffer(&pConn->readBuf); @@ -280,7 +280,7 @@ void uvOnSendCb(uv_write_t* req, int status) { destroySmsg(msg); // send second data, just use for push if (!transQueueEmpty(&conn->srvMsgs)) { - msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs); + msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0); if (msg->type == Register && conn->status == ConnAcquire) { conn->regArg.notifyCount = 0; conn->regArg.init = 1; @@ -326,6 +326,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { pMsg->contLen = 0; } STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); + pHead->ahandle = (uint64_t)pMsg->ahandle; // pHead->secured = pMsg->code == 0 ? 1 : 0; // if (!pConn->secured) {