refactor transport

This commit is contained in:
yihaoDeng 2024-09-03 19:40:13 +08:00
parent ea2eacadd3
commit 603de3976c
3 changed files with 356 additions and 344 deletions

View File

@ -63,6 +63,7 @@ typedef struct SRpcHandleInfo {
int8_t forbiddenIp;
int8_t notFreeAhandle;
int8_t compressed;
int32_t seqNum;
} SRpcHandleInfo;
typedef struct SRpcMsg {
@ -125,8 +126,8 @@ typedef struct SRpcInit {
int32_t timeToGetConn;
int8_t supportBatch; // 0: no batch, 1. batch
int32_t batchSize;
int8_t shareConn; // 0: no share, 1. share
int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait
int8_t shareConn; // 0: no share, 1. share
int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait
void *parent;
} SRpcInit;

View File

@ -169,7 +169,9 @@ static void doCloseIdleConn(void* param);
static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn);
static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int port);
static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn);
static void cliSendBatch_shareConnCb(uv_write_t* req, int status);
void cliSendBatch_shareConn(SCliConn* pConn);
int32_t cliSend2(SCliConn* conn);
// register conn timer
static void cliConnTimeout(uv_timer_t* handle);
// register timer for read
@ -236,9 +238,9 @@ static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq);
static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq);
static void cliHandleFreeById(SCliThrd* pThrd, SCliReq* pReq);
static void cliDealReq(queue* h, SCliThrd* pThrd);
static void cliBatchDealReq(queue* h, SCliThrd* pThrd);
static void (*cliDealFunc[])(queue* h, SCliThrd* pThrd) = {cliDealReq, cliBatchDealReq};
static void cliDoReq(queue* h, SCliThrd* pThrd);
static void cliDoBatchReq(queue* h, SCliThrd* pThrd);
static void (*cliDealFunc[])(queue* h, SCliThrd* pThrd) = {cliDoReq, cliDoBatchReq};
static void (*cliAsyncHandle[])(SCliThrd* pThrd, SCliReq* pReq) = {cliHandleReq, cliHandleQuit, cliHandleRelease,
NULL, cliHandleUpdate, cliHandleFreeById};
@ -392,7 +394,7 @@ bool cliMaySendCachedMsg(SCliConn* conn) {
if (!transQueueEmpty(&conn->reqs)) {
SCliReq* pCliMsg = NULL;
CONN_GET_NEXT_SENDMSG(conn);
(void)cliSend(conn);
(void)cliSend2(conn);
return true;
}
return false;
@ -419,7 +421,7 @@ bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) {
(void)transQueuePush(&conn->reqs, t);
tDebug("pop from conn %p, refId: %" PRId64 "", conn, refId);
(void)transReleaseExHandle(transGetRefMgt(), refId);
(void)cliSend(conn);
(void)cliSend2(conn);
return true;
}
taosWUnLockLatch(&exh->latch);
@ -457,20 +459,6 @@ void cliResetConnTimer(SCliConn* conn) {
void cliHandleBatchResp(SCliConn* conn) { ASSERT(0); }
SCliReq* cliFindReqBySeq(SCliConn* conn, int32_t seq) {
SCliReq* pReq = NULL;
for (int i = 0; i < transQueueSize(&conn->reqs); i++) {
pReq = transQueueGet(&conn->reqs, i);
if (pReq->seq == seq) {
transQueueRm(&conn->reqs, i);
break;
}
}
if (pReq == NULL) {
ASSERT(0);
}
return pReq;
}
bool cliShouldAddConnToPool(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
bool empty = transQueueEmpty(&conn->reqs);
@ -521,7 +509,6 @@ void cliHandleResp2(SCliConn* conn) {
cliResetConnTimer(conn);
// int8_t resetBuf = conn->status == ConnAcquire ? 0 : 1;
STransMsgHead* pHead = NULL;
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, 0);
if (msgLen < 0) {
@ -539,7 +526,7 @@ void cliHandleResp2(SCliConn* conn) {
}
SCliReq* pReq = NULL;
int32_t seq = pHead->seqNum;
int32_t seq = htonl(pHead->seqNum);
code = cliGetReqBySeq(conn, seq, &pReq);
if (code != 0) {
tDebug("%s conn %p recv unexpected packet, reason:%s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code));
@ -574,206 +561,191 @@ void cliHandleResp2(SCliConn* conn) {
(void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
}
void cliHandleResp(SCliConn* conn) {
int32_t code = 0;
SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst;
// void cliHandleResp(SCliConn* conn) {
// int32_t code = 0;
// SCliThrd* pThrd = conn->hostThrd;
// STrans* pInst = pThrd->pInst;
cliResetConnTimer(conn);
// cliResetConnTimer(conn);
STransMsgHead* pHead = NULL;
// STransMsgHead* pHead = NULL;
int8_t resetBuf = conn->status == ConnAcquire ? 0 : 1;
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, resetBuf);
if (msgLen <= 0) {
taosMemoryFree(pHead);
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
// TODO: notify cb
pThrd->notifyExceptCb(pThrd, NULL, NULL);
return;
}
// int8_t resetBuf = conn->status == ConnAcquire ? 0 : 1;
// int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, resetBuf);
// if (msgLen <= 0) {
// taosMemoryFree(pHead);
// tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
// // TODO: notify cb
// pThrd->notifyExceptCb(pThrd, NULL, NULL);
// return;
// }
if (resetBuf == 0) {
tTrace("%s conn %p not reset read buf", transLabel(pInst), conn);
}
// if (resetBuf == 0) {
// tTrace("%s conn %p not reset read buf", transLabel(pInst), conn);
// }
if ((code = transDecompressMsg((char**)&pHead, msgLen)) < 0) {
tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
// TODO: notify cb
}
pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen);
if (cliRecvReleaseReq(conn, pHead)) {
return;
}
// if ((code = transDecompressMsg((char**)&pHead, msgLen)) < 0) {
// tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
// // TODO: notify cb
// }
// pHead->code = htonl(pHead->code);
// pHead->msgLen = htonl(pHead->msgLen);
// if (cliRecvReleaseReq(conn, pHead)) {
// return;
// }
STransMsg transMsg = {0};
transMsg.contLen = transContLenFromMsg(pHead->msgLen);
transMsg.pCont = transContFromHead((char*)pHead);
transMsg.code = pHead->code;
transMsg.msgType = pHead->msgType;
transMsg.info.ahandle = NULL;
transMsg.info.traceId = pHead->traceId;
transMsg.info.hasEpSet = pHead->hasEpSet;
transMsg.info.cliVer = htonl(pHead->compatibilityVer);
// STransMsg transMsg = {0};
// transMsg.contLen = transContLenFromMsg(pHead->msgLen);
// transMsg.pCont = transContFromHead((char*)pHead);
// transMsg.code = pHead->code;
// transMsg.msgType = pHead->msgType;
// transMsg.info.ahandle = NULL;
// transMsg.info.traceId = pHead->traceId;
// transMsg.info.hasEpSet = pHead->hasEpSet;
// transMsg.info.cliVer = htonl(pHead->compatibilityVer);
SCliReq* pReq = NULL;
SReqCtx* pCtx = NULL;
if (CONN_NO_PERSIST_BY_APP(conn)) {
pReq = transQueuePop(&conn->reqs);
// SCliReq* pReq = NULL;
// SReqCtx* pCtx = NULL;
// if (CONN_NO_PERSIST_BY_APP(conn)) {
// pReq = transQueuePop(&conn->reqs);
pCtx = pReq ? pReq->ctx : NULL;
transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
} else {
uint64_t ahandle = (uint64_t)pHead->ahandle;
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
if (pReq == NULL) {
transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn,
transMsg.info.ahandle, TMSG_INFO(transMsg.msgType));
if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
transMsg.code = TSDB_CODE_RPC_BROKEN_LINK;
transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
tDebug("%s conn %p construct ahandle %p due brokenlink, persist: 1", CONN_GET_INST_LABEL(conn), conn,
transMsg.info.ahandle);
}
} else {
pCtx = pReq->ctx;
transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
}
}
// buf's mem alread translated to transMsg.pCont
if (!CONN_NO_PERSIST_BY_APP(conn)) {
transMsg.info.handle = (void*)conn->refId;
transMsg.info.refId = (int64_t)(void*)conn->refId;
tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
}
// pCtx = pReq ? pReq->ctx : NULL;
// transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
// tDebug("%s conn %p get ahandle %p, persist: 0", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
// } else {
// uint64_t ahandle = (uint64_t)pHead->ahandle;
// CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle);
// if (pReq == NULL) {
// transMsg.info.ahandle = transCtxDumpVal(&conn->ctx, transMsg.msgType);
// tDebug("%s conn %p construct ahandle %p by %s, persist: 1", CONN_GET_INST_LABEL(conn), conn,
// transMsg.info.ahandle, TMSG_INFO(transMsg.msgType));
// if (!CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
// transMsg.code = TSDB_CODE_RPC_BROKEN_LINK;
// transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&conn->ctx, (int32_t*)&(transMsg.msgType));
// tDebug("%s conn %p construct ahandle %p due brokenlink, persist: 1", CONN_GET_INST_LABEL(conn), conn,
// transMsg.info.ahandle);
// }
// } else {
// pCtx = pReq->ctx;
// transMsg.info.ahandle = pCtx ? pCtx->ahandle : NULL;
// tDebug("%s conn %p get ahandle %p, persist: 1", CONN_GET_INST_LABEL(conn), conn, transMsg.info.ahandle);
// }
// }
// // buf's mem alread translated to transMsg.pCont
// if (!CONN_NO_PERSIST_BY_APP(conn)) {
// transMsg.info.handle = (void*)conn->refId;
// transMsg.info.refId = (int64_t)(void*)conn->refId;
// tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
// }
STraceId* trace = &transMsg.info.traceId;
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code));
// STraceId* trace = &transMsg.info.traceId;
// tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, code str:%s", CONN_GET_INST_LABEL(conn), conn,
// TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, tstrerror(transMsg.code));
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
transFreeMsg(transMsg.pCont);
return;
}
if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
transFreeMsg(transMsg.pCont);
return;
}
// if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
// tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
// transFreeMsg(transMsg.pCont);
// return;
// }
// if (CONN_RELEASE_BY_SERVER(conn) && transMsg.info.ahandle == NULL) {
// tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
// transFreeMsg(transMsg.pCont);
// return;
// }
if (pReq == NULL || (pReq && pReq->type != Release)) {
if (cliNotifyCb(conn, pReq, &transMsg) != 0) {
return;
}
}
int64_t refId = (pReq == NULL ? 0 : (int64_t)(pReq->msg.info.handle));
tDebug("conn %p msg refId: %" PRId64 "", conn, refId);
destroyReq(pReq);
// if (pReq == NULL || (pReq && pReq->type != Release)) {
// if (cliNotifyCb(conn, pReq, &transMsg) != 0) {
// return;
// }
// }
// int64_t refId = (pReq == NULL ? 0 : (int64_t)(pReq->msg.info.handle));
// tDebug("conn %p msg refId: %" PRId64 "", conn, refId);
// destroyReq(pReq);
if (cliConnSendSeqMsg(refId, conn)) {
return;
}
// if (cliConnSendSeqMsg(refId, conn)) {
// return;
// }
if (cliMaySendCachedMsg(conn) == true) {
return;
}
// if (cliMaySendCachedMsg(conn) == true) {
// return;
// }
if (CONN_NO_PERSIST_BY_APP(conn)) {
return addConnToPool(pThrd->pool, conn);
}
// if (CONN_NO_PERSIST_BY_APP(conn)) {
// return addConnToPool(pThrd->pool, conn);
// }
(void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
}
static void cliDestroyMsgInExhandle(int64_t refId) {
if (refId == 0) return;
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
if (exh) {
taosWLockLatch(&exh->latch);
while (!QUEUE_IS_EMPTY(&exh->q)) {
queue* h = QUEUE_HEAD(&exh->q);
QUEUE_REMOVE(h);
SCliReq* t = QUEUE_DATA(h, SCliReq, seqq);
destroyReq(t);
}
taosWUnLockLatch(&exh->latch);
(void)transReleaseExHandle(transGetRefMgt(), refId);
}
}
// (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
// }
void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
if (transQueueEmpty(&pConn->reqs)) {
if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) {
tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn);
if (T_REF_VAL_GET(pConn) > 1) transUnrefCliHandle(pConn);
transUnrefCliHandle(pConn);
return;
}
}
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
bool once = false;
do {
SCliReq* pReq = transQueuePop(&pConn->reqs);
// if (transQueueEmpty(&pConn->reqs)) {
// if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) {
// tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn);
// if (T_REF_VAL_GET(pConn) > 1) transUnrefCliHandle(pConn);
// transUnrefCliHandle(pConn);
// return;
// }
// }
// SCliThrd* pThrd = pConn->hostThrd;
// STrans* pInst = pThrd->pInst;
// bool once = false;
// do {
// SCliReq* pReq = transQueuePop(&pConn->reqs);
if (pReq == NULL && once) {
break;
}
// if (pReq == NULL && once) {
// break;
// }
if (pReq != NULL && REQUEST_NO_RESP(&pReq->msg)) {
destroyReq(pReq);
break;
}
// if (pReq != NULL && REQUEST_NO_RESP(&pReq->msg)) {
// destroyReq(pReq);
// break;
// }
SReqCtx* pCtx = pReq ? pReq->ctx : NULL;
// SReqCtx* pCtx = pReq ? pReq->ctx : NULL;
STransMsg transMsg = {0};
transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code;
transMsg.msgType = pReq ? pReq->msg.msgType + 1 : 0;
transMsg.info.ahandle = NULL;
transMsg.info.cliVer = pInst->compatibilityVer;
// STransMsg transMsg = {0};
// transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code;
// transMsg.msgType = pReq ? pReq->msg.msgType + 1 : 0;
// transMsg.info.ahandle = NULL;
// transMsg.info.cliVer = pInst->compatibilityVer;
if (pReq == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
tDebug("%s conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle,
TMSG_INFO(transMsg.msgType));
if (transMsg.info.ahandle == NULL) {
int32_t msgType = 0;
transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, &msgType);
transMsg.msgType = msgType;
tDebug("%s conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
transMsg.info.ahandle);
}
} else {
transMsg.info.ahandle = (pReq != NULL && pReq->type != Release && pCtx) ? pCtx->ahandle : NULL;
}
// if (pReq == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
// transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
// tDebug("%s conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle,
// TMSG_INFO(transMsg.msgType));
// if (transMsg.info.ahandle == NULL) {
// int32_t msgType = 0;
// transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, &msgType);
// transMsg.msgType = msgType;
// tDebug("%s conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
// transMsg.info.ahandle);
// }
// } else {
// transMsg.info.ahandle = (pReq != NULL && pReq->type != Release && pCtx) ? pCtx->ahandle : NULL;
// }
if (pCtx == NULL || pCtx->pSem == NULL) {
if (transMsg.info.ahandle == NULL) {
if (pReq == NULL || REQUEST_NO_RESP(&pReq->msg) || pReq->type == Release) {
destroyReq(pReq);
once = true;
continue;
}
}
}
// if (pCtx == NULL || pCtx->pSem == NULL) {
// if (transMsg.info.ahandle == NULL) {
// if (pReq == NULL || REQUEST_NO_RESP(&pReq->msg) || pReq->type == Release) {
// destroyReq(pReq);
// once = true;
// continue;
// }
// }
// }
if (pReq == NULL || (pReq && pReq->type != Release)) {
int64_t refId = (pReq == NULL ? 0 : (int64_t)(pReq->msg.info.handle));
cliDestroyMsgInExhandle(refId);
if (cliNotifyCb(pConn, pReq, &transMsg) != 0) {
return;
}
}
destroyReq(pReq);
tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn));
} while (!transQueueEmpty(&pConn->reqs));
if (T_REF_VAL_GET(pConn) > 1) transUnrefCliHandle(pConn);
transUnrefCliHandle(pConn);
// if (pReq == NULL || (pReq && pReq->type != Release)) {
// int64_t refId = (pReq == NULL ? 0 : (int64_t)(pReq->msg.info.handle));
// cliDestroyMsgInExhandle(refId);
// if (cliNotifyCb(pConn, pReq, &transMsg) != 0) {
// return;
// }
// }
// destroyReq(pReq);
// tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, T_REF_VAL_GET(pConn));
// } while (!transQueueEmpty(&pConn->reqs));
// if (T_REF_VAL_GET(pConn) > 1) transUnrefCliHandle(pConn);
// transUnrefCliHandle(pConn);
}
void cliHandleExcept(SCliConn* conn, int32_t code) {
tTrace("%s conn %p except ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
@ -946,6 +918,7 @@ static int32_t cliGetConnFromPool(SCliThrd* pThrd, const char* key, SCliConn** p
tDebug("conn %p get from pool, pool size:%d, dst:%s", conn, conn->list->size, conn->dstAddr);
*ppConn = conn;
return 0;
}
@ -989,79 +962,79 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliReq** pReq) {
plist->list = nList;
}
STraceId* trace = &(*pReq)->msg.info.traceId;
// no avaliable conn in pool
if (QUEUE_IS_EMPTY(&plist->conns)) {
SMsgList* list = plist->list;
if ((list)->numOfConn >= pInst->connLimitNum) {
STraceId* trace = &(*pReq)->msg.info.traceId;
if (pInst->notWaitAvaliableConn || (pInst->noDelayFp != NULL && pInst->noDelayFp((*pReq)->msg.msgType))) {
tDebug("%s msg %s not to send, reason: %s", pInst->label, TMSG_INFO((*pReq)->msg.msgType),
tstrerror(TSDB_CODE_RPC_NETWORK_BUSY));
doNotifyCb(*pReq, pThrd, TSDB_CODE_RPC_NETWORK_BUSY);
*pReq = NULL;
return NULL;
}
// STraceId* trace = &(*pReq)->msg.info.traceId;
// // no avaliable conn in pool
// if (QUEUE_IS_EMPTY(&plist->conns)) {
// SMsgList* list = plist->list;
// if ((list)->numOfConn >= pInst->connLimitNum) {
// STraceId* trace = &(*pReq)->msg.info.traceId;
// if (pInst->notWaitAvaliableConn || (pInst->noDelayFp != NULL && pInst->noDelayFp((*pReq)->msg.msgType))) {
// tDebug("%s msg %s not to send, reason: %s", pInst->label, TMSG_INFO((*pReq)->msg.msgType),
// tstrerror(TSDB_CODE_RPC_NETWORK_BUSY));
// doNotifyCb(*pReq, pThrd, TSDB_CODE_RPC_NETWORK_BUSY);
// *pReq = NULL;
// return NULL;
// }
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
if (arg == NULL) {
doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY);
*pReq = NULL;
return NULL;
}
arg->param1 = *pReq;
arg->param2 = pThrd;
// STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
// if (arg == NULL) {
// doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY);
// *pReq = NULL;
// return NULL;
// }
// arg->param1 = *pReq;
// arg->param2 = pThrd;
SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn);
if (task == NULL) {
taosMemoryFree(arg);
doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY);
*pReq = NULL;
return NULL;
}
(*pReq)->ctx->task = task;
tGTrace("%s msg %s delay to send, wait for avaiable connect", pInst->label, TMSG_INFO((*pReq)->msg.msgType));
QUEUE_PUSH(&(list)->msgQ, &(*pReq)->q);
*pReq = NULL;
} else {
// send msg in delay queue
if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) {
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
if (arg == NULL) {
doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY);
*pReq = NULL;
return NULL;
}
arg->param1 = *pReq;
arg->param2 = pThrd;
// SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn);
// if (task == NULL) {
// taosMemoryFree(arg);
// doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY);
// *pReq = NULL;
// return NULL;
// }
// (*pReq)->ctx->task = task;
// tGTrace("%s msg %s delay to send, wait for avaiable connect", pInst->label, TMSG_INFO((*pReq)->msg.msgType));
// QUEUE_PUSH(&(list)->msgQ, &(*pReq)->q);
// *pReq = NULL;
// } else {
// // send msg in delay queue
// if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) {
// STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
// if (arg == NULL) {
// doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY);
// *pReq = NULL;
// return NULL;
// }
// arg->param1 = *pReq;
// arg->param2 = pThrd;
SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn);
if (task == NULL) {
taosMemoryFree(arg);
doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY);
*pReq = NULL;
return NULL;
}
// SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pInst->timeToGetConn);
// if (task == NULL) {
// taosMemoryFree(arg);
// doNotifyCb(*pReq, pThrd, TSDB_CODE_OUT_OF_MEMORY);
// *pReq = NULL;
// return NULL;
// }
(*pReq)->ctx->task = task;
tGTrace("%s msg %s delay to send, wait for avaiable connect", pInst->label, TMSG_INFO((*pReq)->msg.msgType));
// (*pReq)->ctx->task = task;
// tGTrace("%s msg %s delay to send, wait for avaiable connect", pInst->label, TMSG_INFO((*pReq)->msg.msgType));
QUEUE_PUSH(&(list)->msgQ, &(*pReq)->q);
queue* h = QUEUE_HEAD(&(list)->msgQ);
QUEUE_REMOVE(h);
SCliReq* ans = QUEUE_DATA(h, SCliReq, q);
// QUEUE_PUSH(&(list)->msgQ, &(*pReq)->q);
// queue* h = QUEUE_HEAD(&(list)->msgQ);
// QUEUE_REMOVE(h);
// SCliReq* ans = QUEUE_DATA(h, SCliReq, q);
*pReq = ans;
// *pReq = ans;
trace = &(*pReq)->msg.info.traceId;
tGTrace("%s msg %s pop from delay queue, start to send", pInst->label, TMSG_INFO((*pReq)->msg.msgType));
transDQCancel(pThrd->waitConnQueue, ans->ctx->task);
}
list->numOfConn++;
}
tDebug("%s numOfConn: %d, limit: %d, dst:%s", pInst->label, list->numOfConn, pInst->connLimitNum, key);
return NULL;
}
// trace = &(*pReq)->msg.info.traceId;
// tGTrace("%s msg %s pop from delay queue, start to send", pInst->label, TMSG_INFO((*pReq)->msg.msgType));
// transDQCancel(pThrd->waitConnQueue, ans->ctx->task);
// }
// list->numOfConn++;
// }
// tDebug("%s numOfConn: %d, limit: %d, dst:%s", pInst->label, list->numOfConn, pInst->connLimitNum, key);
// return NULL;
// }
queue* h = QUEUE_TAIL(&plist->conns);
plist->size -= 1;
@ -1081,6 +1054,8 @@ static void addConnToPool(void* pool, SCliConn* conn) {
if (conn->status == ConnInPool) {
return;
}
conn->seq = 0;
int32_t code = allocConnRef(conn, true);
if (code != 0) {
cliDestroyConn(conn, true);
@ -1115,7 +1090,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
(void)transQueuePush(&conn->reqs, pReq);
conn->status = ConnNormal;
(void)cliSend(conn);
(void)cliSend2(conn);
return;
}
@ -1219,7 +1194,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
cliHandleExcept(conn, -1);
break;
} else {
cliHandleResp(conn);
cliHandleResp2(conn);
}
}
return;
@ -1373,59 +1348,59 @@ static void cliDestroy(uv_handle_t* handle) {
taosMemoryFree(conn);
}
static bool cliHandleNoResp(SCliConn* conn) {
bool res = false;
if (!transQueueEmpty(&conn->reqs)) {
SCliReq* pReq = transQueueGet(&conn->reqs, 0);
if (REQUEST_NO_RESP(&pReq->msg)) {
(void)transQueuePop(&conn->reqs);
destroyReq(pReq);
res = true;
}
if (res == true) {
if (cliMaySendCachedMsg(conn) == false) {
SCliThrd* thrd = conn->hostThrd;
addConnToPool(thrd->pool, conn);
res = false;
} else {
res = true;
}
}
}
return res;
}
// static bool cliHandleNoResp(SCliConn* conn) {
// bool res = false;
// if (!transQueueEmpty(&conn->reqs)) {
// SCliReq* pReq = transQueueGet(&conn->reqs, 0);
// if (REQUEST_NO_RESP(&pReq->msg)) {
// (void)transQueuePop(&conn->reqs);
// destroyReq(pReq);
// res = true;
// }
// if (res == true) {
// if (cliMaySendCachedMsg(conn) == false) {
// SCliThrd* thrd = conn->hostThrd;
// addConnToPool(thrd->pool, conn);
// res = false;
// } else {
// res = true;
// }
// }
// }
// return res;
// }
static void cliSendCb(uv_write_t* req, int status) {
STUB_RAND_NETWORK_ERR(status);
SCliConn* pConn = transReqQueueRemove(req);
if (pConn == NULL) return;
// SCliConn* pConn = transReqQueueRemove(req);
// if (pConn == NULL) return;
SCliReq* pReq = transQueueGet(&pConn->reqs, 0);
if (pReq != NULL) {
int64_t cost = taosGetTimestampUs() - pReq->st;
if (cost > 1000 * 50) {
tTrace("%s conn %p send cost:%dus ", CONN_GET_INST_LABEL(pConn), pConn, (int)cost);
}
}
if (pReq != NULL && pReq->msg.contLen == 0 && pReq->msg.pCont != 0) {
rpcFreeCont(pReq->msg.pCont);
pReq->msg.pCont = 0;
}
// SCliReq* pReq = transQueueGet(&pConn->reqs, 0);
// if (pReq != NULL) {
// int64_t cost = taosGetTimestampUs() - pReq->st;
// if (cost > 1000 * 50) {
// tTrace("%s conn %p send cost:%dus ", CONN_GET_INST_LABEL(pConn), pConn, (int)cost);
// }
// }
// if (pReq != NULL && pReq->msg.contLen == 0 && pReq->msg.pCont != 0) {
// rpcFreeCont(pReq->msg.pCont);
// pReq->msg.pCont = 0;
// }
if (status == 0) {
tDebug("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
} else {
if (!uv_is_closing((uv_handle_t*)&pConn->stream)) {
tError("%s conn %p failed to write:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
cliHandleExcept(pConn, -1);
}
return;
}
if (cliHandleNoResp(pConn) == true) {
tTrace("%s conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn);
return;
}
(void)uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
// if (status == 0) {
// tDebug("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
// } else {
// if (!uv_is_closing((uv_handle_t*)&pConn->stream)) {
// tError("%s conn %p failed to write:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(status));
// cliHandleExcept(pConn, -1);
// }
// return;
// }
// if (cliHandleNoResp(pConn) == true) {
// tTrace("%s conn %p no resp required", CONN_GET_INST_LABEL(pConn), pConn);
// return;
// }
// (void)uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb);
}
static void cliHandleBatch_shareConnExcept(SCliConn* conn) {
@ -1459,9 +1434,34 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) {
if (T_REF_VAL_GET(conn) > 1) transUnrefCliHandle(conn);
transUnrefCliHandle(conn);
}
static void cliConnRmReqs(SCliConn* conn) {
for (int i = 0; i < transQueueSize(&conn->reqs); i++) {
SCliReq* pReq = transQueueGet(&conn->reqs, i);
if (pReq->sent == 1 && REQUEST_NO_RESP(&pReq->msg)) {
transQueueRm(&conn->reqs, i);
destroyReq(pReq);
}
}
}
static int32_t cliShouldSendMsg(SCliConn* conn) {
for (int i = 0; i < transQueueSize(&conn->reqs); i++) {
SCliReq* pReq = transQueueGet(&conn->reqs, i);
if (pReq->sent == 0) {
// pReq->sent = 1;
// pReq->seq = conn->seq;
return 1;
}
}
return 0;
}
static void cliSendBatch_shareConnCb(uv_write_t* req, int status) {
SCliConn* conn = req->data;
SCliThrd* pThrd = conn->hostThrd;
conn->shareCnt -= 1;
cliConnRmReqs(conn);
if (status != 0) {
tDebug("%s conn %p failed to send batch msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status));
if (!uv_is_closing((uv_handle_t*)&conn->stream)) {
@ -1469,7 +1469,16 @@ static void cliSendBatch_shareConnCb(uv_write_t* req, int status) {
}
return;
}
uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
int ret = uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
if (ret != 0) {
tError("%s conn %p failed to start read, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(ret));
cliHandleBatch_shareConnExcept(conn);
}
if (cliShouldSendMsg(conn) == 1) {
cliSendBatch_shareConn(conn);
}
taosMemoryFree(req);
}
void cliSendBatch_shareConn(SCliConn* pConn) {
@ -1518,7 +1527,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
pHead->compatibilityVer = htonl(pInst->compatibilityVer);
}
pHead->timestamp = taosHton64(taosGetTimestampUs());
pHead->seqNum = pConn->seq;
pHead->seqNum = htonl(pConn->seq);
if (pHead->comp == 0) {
if (pInst->compressSize != -1 && pInst->compressSize < pReq->contLen) {
@ -1532,7 +1541,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
totalLen += msgLen;
pCliMsg->sent = 1;
pCliMsg->seq = pHead->seqNum;
pCliMsg->seq = pConn->seq;
}
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
@ -1577,7 +1586,7 @@ void cliSendBatch(SCliConn* pConn) {
}
pReq->contLen = 0;
}
pConn->seq++;
int msgLen = transMsgLenFromCont(pReq->contLen);
STransMsgHead* pHead = transHeadFromCont(pReq->pCont);
@ -1595,6 +1604,7 @@ void cliSendBatch(SCliConn* pConn) {
pHead->compatibilityVer = htonl(pInst->compatibilityVer);
}
pHead->timestamp = taosHton64(taosGetTimestampUs());
pHead->seqNum = htonl(pConn->seq);
if (pHead->comp == 0 && pReq->info.compressed == 0 && pConn->clientIp != pConn->serverIp) {
if (pInst->compressSize != -1 && pInst->compressSize < pReq->contLen) {
@ -1637,10 +1647,14 @@ _exception:
int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) {
int32_t code = 0;
transQueuePush(&pConn->reqs, pCliMsg);
code = cliSend(pConn);
code = cliSend2(pConn);
return code;
}
int32_t cliSend2(SCliConn* pConn) {
cliSendBatch_shareConn(pConn);
return 0;
}
int32_t cliSend(SCliConn* pConn) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
@ -1677,7 +1691,7 @@ int32_t cliSend(SCliConn* pConn) {
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
pHead->version = TRANS_VER;
pHead->compatibilityVer = htonl(pInst->compatibilityVer);
pHead->seqNum = pConn->seq++;
pHead->seqNum = htonl(pConn->seq++);
}
pHead->timestamp = taosHton64(taosGetTimestampUs());
@ -1957,7 +1971,7 @@ void cliConnCb(uv_connect_t* req, int status) {
return cliSendBatch_shareConn(pConn);
}
(void)cliSend(pConn);
(void)cliSend2(pConn);
}
static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code) {
@ -2019,7 +2033,7 @@ static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) {
if (!transQueuePush(&conn->reqs, pReq)) {
return;
}
(void)cliSend(conn);
(void)cliSend2(conn);
} else {
tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn);
destroyReq(pReq);
@ -2298,15 +2312,15 @@ _exception:
}
void cliHandleReq(SCliThrd* pThrd, SCliReq* pReq) {
STrans* pInst = pThrd->pInst;
if (pInst->shareConn == 1) {
return cliHandleReq__shareConn(pThrd, pReq);
} else {
return cliHandleReq__noShareConn(pThrd, pReq);
}
// STrans* pInst = pThrd->pInst;
// if (pInst->shareConn == 1) {
// return cliHandleReq__shareConn(pThrd, pReq);
// } else {
return cliHandleReq__noShareConn(pThrd, pReq);
//}
}
static void cliDealReq(queue* wq, SCliThrd* pThrd) {
static void cliDoReq(queue* wq, SCliThrd* pThrd) {
int count = 0;
while (!QUEUE_IS_EMPTY(wq)) {
@ -2464,7 +2478,7 @@ static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* p
*ppBatch = pBatch;
return 0;
}
static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
static void cliDoBatchReq(queue* wq, SCliThrd* pThrd) {
STrans* pInst = pThrd->pInst;
int32_t code = 0;
@ -2674,9 +2688,8 @@ _err:
terrno = code;
return NULL;
}
int32_t initCb(void* thrd, SCliReq* pReq, STransMsg* pResp) {
SCliThrd* pThrd = pThrd;
SCliThrd* pThrd = thrd;
return cliMayCvtFqdnToIp(&pReq->ctx->epSet, &pThrd->cvtAddr);
}
int32_t notifyExceptCb(void* thrd, SCliReq* pReq, STransMsg* pResp) {
@ -3264,12 +3277,6 @@ int32_t cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst;
// if (pReq == NULL || pReq->ctx == NULL) {
// tTrace("%s conn %p handle resp", pInst->label, pConn);
// pInst->cfp(pInst->parent, pResp, NULL);
// return 0;
// }
if (cliMayRetry(pConn, pReq, pResp)) {
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
}

View File

@ -65,9 +65,9 @@ typedef struct SSvrMsg {
STransMsg msg;
queue q;
STransMsgType type;
void* arg;
FilteFunc func;
int32_t seqNum;
void* arg;
FilteFunc func;
} SSvrMsg;
@ -471,7 +471,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
transMsg.msgType = pHead->msgType;
transMsg.code = pHead->code;
if (pHead->seqNum != 0) {
if (pHead->seqNum == 0) {
ASSERT(0);
}
// pHead->noResp = 1,
@ -487,6 +487,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
transMsg.info.cliVer = htonl(pHead->compatibilityVer);
transMsg.info.forbiddenIp = forbiddenIp;
transMsg.info.noResp = pHead->noResp == 1 ? 1 : 0;
transMsg.info.seqNum = htonl(pHead->seqNum);
uvMaySetConnAcquired(pConn, pHead);
@ -652,6 +653,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
pHead->compatibilityVer = htonl(((STrans*)pConn->pInst)->compatibilityVer);
pHead->version = TRANS_VER;
pHead->seqNum = htonl(pMsg->info.seqNum);
// handle invalid drop_task resp, TD-20098
if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
@ -793,6 +795,8 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
SExHandle* exh1 = transMsg.info.handle;
int64_t refId = transMsg.info.refId;
msg->seqNum = transMsg.info.seqNum;
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId);
if (exh2 == NULL || exh1 != exh2) {
tTrace("handle except msg %p, ignore it", exh1);