opt transport

This commit is contained in:
yihaoDeng 2024-09-11 20:18:34 +08:00
parent 60cb1cbf6b
commit 65a5dad3cb
4 changed files with 173 additions and 361 deletions

View File

@ -372,7 +372,7 @@ typedef struct {
* init queue * init queue
* note: queue'size is small, default 1 * note: queue'size is small, default 1
*/ */
int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)); int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(void* arg));
/* /*
* put arg into queue * put arg into queue
@ -396,6 +396,10 @@ void* transQueueGet(STransQueue* queue, int i);
*/ */
void* tranQueueHead(STransQueue* q); void* tranQueueHead(STransQueue* q);
/*
* remove all match elm from queue
*/
void transQueueRemoveByFilter(STransQueue* q, bool (*filter)(void* e, void* arg), void* arg, void* dst, int32_t size);
/* /*
* rm ith from queue * rm ith from queue
*/ */

View File

@ -65,7 +65,8 @@ typedef struct SCliConn {
void* hostThrd; void* hostThrd;
SConnBuffer readBuf; SConnBuffer readBuf;
STransQueue reqs; STransQueue reqsToSend;
STransQueue reqsSentOut;
SHashObj* pQueryTable; SHashObj* pQueryTable;
queue q; queue q;
@ -340,57 +341,8 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p);
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) #define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pInst))->label) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pInst))->label)
// #define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ // #define CONN_NO_PERSIST_BY_APP(conn) \
// do { \ // (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && TRANS_CONN_REF_GET(conn) == 1)
// int i = 0, sz = transQueueSize(&conn->reqs); \
// for (; i < sz; i++) { \
// pReq = transQueueGet(&conn->reqs, i); \
// if (pReq->ctx != NULL && (uint64_t)pReq->ctx->ahandle == ahandle) { \
// break; \
// } \
// } \
// if (i == sz) { \
// pReq = NULL; \
// } else { \
// pReq = transQueueRm(&conn->reqs, i); \
// } \
// } while (0)
// #define CONN_GET_NEXT_SENDMSG(conn) \
// do { \
// int i = 0; \
// do { \
// pCliMsg = transQueueGet(&conn->reqs, i++); \
// if (pCliMsg && 0 == pCliMsg->sent) { \
// break; \
// } \
// } while (pCliMsg != NULL); \
// if (pCliMsg == NULL) { \
// goto _RETURN; \
// } \
// } while (0)
// static int32_t cliConnFindToSendMsg(SCliConn* pConn, SCliReq** pReq) {
// int32_t code = 0;
// for (int32_t i = 0; i < transQueueSize(&pConn->reqs); i++) {
// SCliReq* p = transQueueGet(&pConn->reqs, i);
// if (p->sent == 0) {
// *pReq = p;
// return 0;
// }
// }
// return TSDB_CODE_OUT_OF_RANGE;
// }
// #define CONN_SET_PERSIST_BY_APP(conn) \
// do { \
// if (conn->status == ConnNormal) { \
// conn->status = ConnAcquire; \
// transRefCliHandle(conn); \
// } \
// } while (0)
#define CONN_NO_PERSIST_BY_APP(conn) \
(((conn)->status == ConnNormal || (conn)->status == ConnInPool) && TRANS_CONN_REF_GET(conn) == 1)
// #define CONN_RELEASE_BY_SERVER(conn) \ // #define CONN_RELEASE_BY_SERVER(conn) \
// (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && TRANS_CONN_REF_GET(conn) == 1) // (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && TRANS_CONN_REF_GET(conn) == 1)
@ -412,64 +364,6 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p);
static void* cliWorkThread(void* arg); static void* cliWorkThread(void* arg);
// static void cliReleaseUnfinishedMsg(SCliConn* conn) {
// SCliThrd* pThrd = conn->hostThrd;
// for (int i = 0; i < transQueueSize(&conn->reqs); i++) {
// SCliReq* msg = transQueueGet(&conn->reqs, i);
// if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) {
// if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) {
// conn->ctx.freeFunc(msg->ctx->ahandle);
// } else if (msg->msg.info.notFreeAhandle == 0 && msg->ctx->ahandle != NULL && pThrd->destroyAhandleFp != NULL) {
// tDebug("%s conn %p destroy unfinished ahandle %p", CONN_GET_INST_LABEL(conn), conn, msg->ctx->ahandle);
// pThrd->destroyAhandleFp(msg->ctx->ahandle);
// }
// }
// destroyReq(msg);
// }
// transQueueClear(&conn->reqs);
// memset(&conn->ctx, 0, sizeof(conn->ctx));
// }
// bool cliMaySendCachedMsg(SCliConn* conn) {
// if (!transQueueEmpty(&conn->reqs)) {
// SCliReq* pCliMsg = NULL;
// CONN_GET_NEXT_SENDMSG(conn);
// (void)cliSend2(conn);
// return true;
// }
// return false;
// _RETURN:
// return false;
// }
// bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) {
// if (refId == 0) return false;
// SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
// if (exh == NULL) {
// tDebug("release conn %p, refId: %" PRId64 "", conn, refId);
// return false;
// }
// taosWLockLatch(&exh->latch);
// if (exh->handle == NULL) exh->handle = conn;
// exh->inited = 1;
// exh->pThrd = conn->hostThrd;
// if (!QUEUE_IS_EMPTY(&exh->q)) {
// queue* h = QUEUE_HEAD(&exh->q);
// QUEUE_REMOVE(h);
// taosWUnLockLatch(&exh->latch);
// SCliReq* t = QUEUE_DATA(h, SCliReq, seqq);
// transCtxMerge(&conn->ctx, &t->ctx->userCtx);
// (void)transQueuePush(&conn->reqs, t);
// tDebug("pop from conn %p, refId: %" PRId64 "", conn, refId);
// (void)transReleaseExHandle(transGetRefMgt(), refId);
// (void)cliSend2(conn);
// return true;
// }
// taosWUnLockLatch(&exh->latch);
// tDebug("empty conn %p, refId: %" PRId64 "", conn, refId);
// (void)transReleaseExHandle(transGetRefMgt(), refId);
// return false;
// }
int32_t cliGetConnTimer(SCliThrd* pThrd, SCliConn* pConn) { int32_t cliGetConnTimer(SCliThrd* pThrd, SCliConn* pConn) {
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
if (timer == NULL) { if (timer == NULL) {
@ -501,10 +395,10 @@ void cliHandleBatchResp(SCliConn* conn) { ASSERT(0); }
int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) { int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) {
int32_t code = 0; int32_t code = 0;
for (int i = 0; i < transQueueSize(&conn->reqs); i++) { for (int i = 0; i < transQueueSize(&conn->reqsSentOut); i++) {
SCliReq* p = transQueueGet(&conn->reqs, i); SCliReq* p = transQueueGet(&conn->reqsSentOut, i);
if (p->seq == seq) { if (p->seq == seq) {
transQueueRm(&conn->reqs, i); transQueueRm(&conn->reqsSentOut, i);
*pReq = p; *pReq = p;
return 0; return 0;
} }
@ -514,7 +408,8 @@ int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) {
int8_t cliMayRecycleConn(SCliConn* conn) { int8_t cliMayRecycleConn(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
if (transQueueSize(&conn->reqs) == 0 && taosHashGetSize(conn->pQTable) == 0) { if (transQueueSize(&conn->reqsToSend) == 0 && transQueueSize(&conn->reqsSentOut) == 0 &&
taosHashGetSize(conn->pQTable) == 0) {
(void)delConnFromHeapCache(pThrd->connHeapCache, conn); (void)delConnFromHeapCache(pThrd->connHeapCache, conn);
addConnToPool(pThrd->pool, conn); addConnToPool(pThrd->pool, conn);
return 1; return 1;
@ -522,6 +417,16 @@ int8_t cliMayRecycleConn(SCliConn* conn) {
return 0; return 0;
} }
bool filterByQid(void* key, void* arg) {
int64_t* qid = arg;
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
if (pReq->msg.info.qId == *qid) {
return true;
} else {
return false;
}
}
int32_t cliBuildRespFromCont(SCliReq* pReq, STransMsg* pResp, STransMsgHead* pHead) { int32_t cliBuildRespFromCont(SCliReq* pReq, STransMsg* pResp, STransMsgHead* pHead) {
pResp->contLen = transContLenFromMsg(pHead->msgLen); pResp->contLen = transContLenFromMsg(pHead->msgLen);
pResp->pCont = transContFromHead((char*)pHead); pResp->pCont = transContFromHead((char*)pHead);
@ -560,18 +465,22 @@ int32_t cliConnMayHandleState_releaseReq(SCliConn* conn, STransMsgHead* pHead) {
tDebug("%s conn %p failed to release req %ld from thrd ", CONN_GET_INST_LABEL(conn), conn, qId); tDebug("%s conn %p failed to release req %ld from thrd ", CONN_GET_INST_LABEL(conn), conn, qId);
} }
tDebug("%s %p req size:%d", CONN_GET_INST_LABEL(conn), conn, transQueueSize(&conn->reqs)); tDebug("%s %p reqToSend:%d, sentOut:%d", CONN_GET_INST_LABEL(conn), conn, transQueueSize(&conn->reqsToSend),
for (int32_t i = 0; i < transQueueSize(&conn->reqs); i++) { transQueueSize(&conn->reqsSentOut));
SCliReq* pReq = transQueueGet(&conn->reqs, i);
if (pReq->msg.info.qId == qId) {
transQueueRm(&conn->reqs, i);
if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) { queue set;
pThrd->destroyAhandleFp(pReq->ctx->ahandle); QUEUE_INIT(&set);
} transQueueRemoveByFilter(&conn->reqsSentOut, filterByQid, &qId, &set, -1);
destroyReq(pReq); transQueueRemoveByFilter(&conn->reqsToSend, filterByQid, &qId, &set, -1);
i--;
while (!QUEUE_IS_EMPTY(&set)) {
queue* el = QUEUE_HEAD(&set);
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
QUEUE_REMOVE(el);
if (pReq->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
pThrd->destroyAhandleFp(pReq->ctx->ahandle);
} }
destroyReq(pReq);
} }
taosMemoryFree(pHead); taosMemoryFree(pHead);
return 1; return 1;
@ -674,76 +583,7 @@ void cliHandleResp2(SCliConn* conn) {
(void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
} }
void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { return; }
// if (transQueueEmpty(&pConn->reqs)) {
// if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) {
// tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn);
// if (TRANS_CONN_REF_GET(pConn) > 1) transUnrefCliHandle(pConn);
// transUnrefCliHandle(pConn);
// return;
// }
// }
// SCliThrd* pThrd = pConn->hostThrd;
// STrans* pInst = pThrd->pInst;
// bool once = false;
// do {
// SCliReq* pReq = transQueuePop(&pConn->reqs);
// if (pReq == NULL && once) {
// break;
// }
// if (pReq != NULL && REQUEST_NO_RESP(&pReq->msg)) {
// destroyReq(pReq);
// break;
// }
// SReqCtx* pCtx = pReq ? pReq->ctx : NULL;
// STransMsg transMsg = {0};
// transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code;
// transMsg.msgType = pReq ? pReq->msg.msgType + 1 : 0;
// transMsg.info.ahandle = NULL;
// transMsg.info.cliVer = pInst->compatibilityVer;
// if (pReq == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
// transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
// tDebug("%s conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.info.ahandle,
// TMSG_INFO(transMsg.msgType));
// if (transMsg.info.ahandle == NULL) {
// int32_t msgType = 0;
// transMsg.info.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, &msgType);
// transMsg.msgType = msgType;
// tDebug("%s conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
// transMsg.info.ahandle);
// }
// } else {
// transMsg.info.ahandle = (pReq != NULL && pReq->type != Release && pCtx) ? pCtx->ahandle : NULL;
// }
// if (pCtx == NULL || pCtx->pSem == NULL) {
// if (transMsg.info.ahandle == NULL) {
// if (pReq == NULL || REQUEST_NO_RESP(&pReq->msg) || pReq->type == Release) {
// destroyReq(pReq);
// once = true;
// continue;
// }
// }
// }
// if (pReq == NULL || (pReq && pReq->type != Release)) {
// int64_t refId = (pReq == NULL ? 0 : (int64_t)(pReq->msg.info.handle));
// cliDestroyMsgInExhandle(refId);
// if (cliNotifyCb(pConn, pReq, &transMsg) != 0) {
// return;
// }
// }
// destroyReq(pReq);
// tTrace("%s conn %p start to destroy, ref:%d", CONN_GET_INST_LABEL(pConn), pConn, TRANS_CONN_REF_GET(pConn));
// } while (!transQueueEmpty(&pConn->reqs));
// if (TRANS_CONN_REF_GET(pConn) > 1) transUnrefCliHandle(pConn);
// transUnrefCliHandle(pConn);
}
void cliHandleExcept(SCliConn* conn, int32_t code) { void cliHandleExcept(SCliConn* conn, int32_t code) {
tTrace("%s conn %p except ref:%d", CONN_GET_INST_LABEL(conn), conn, TRANS_CONN_REF_GET(conn)); tTrace("%s conn %p except ref:%d", CONN_GET_INST_LABEL(conn), conn, TRANS_CONN_REF_GET(conn));
if (code != TSDB_CODE_RPC_FQDN_ERROR) { if (code != TSDB_CODE_RPC_FQDN_ERROR) {
@ -1068,22 +908,22 @@ static void addConnToPool(void* pool, SCliConn* conn) {
SConnList* pList = conn->list; SConnList* pList = conn->list;
SMsgList* msgList = pList->list; SMsgList* msgList = pList->list;
if (!QUEUE_IS_EMPTY(&msgList->msgQ)) { // if (!QUEUE_IS_EMPTY(&msgList->msgQ)) {
queue* h = QUEUE_HEAD(&(msgList)->msgQ); // queue* h = QUEUE_HEAD(&(msgList)->msgQ);
QUEUE_REMOVE(h); // QUEUE_REMOVE(h);
SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); // SCliReq* pReq = QUEUE_DATA(h, SCliReq, q);
transDQCancel(thrd->waitConnQueue, pReq->ctx->task); // transDQCancel(thrd->waitConnQueue, pReq->ctx->task);
pReq->ctx->task = NULL; // pReq->ctx->task = NULL;
transCtxMerge(&conn->ctx, &pReq->ctx->userCtx); // transCtxMerge(&conn->ctx, &pReq->ctx->userCtx);
(void)transQueuePush(&conn->reqs, pReq); // (void)transQueuePush(&conn->reqsToSend, pReq);
conn->status = ConnNormal; // conn->status = ConnNormal;
(void)cliSend2(conn); // (void)cliSend2(conn);
return; // return;
} // }
conn->status = ConnInPool; conn->status = ConnInPool;
QUEUE_PUSH(&conn->list->conns, &conn->q); QUEUE_PUSH(&conn->list->conns, &conn->q);
@ -1210,7 +1050,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
} }
// static int32_t cliAddReqToConn(SCliConn* conn, SCliReq* pReq) { // static int32_t cliAddReqToConn(SCliConn* conn, SCliReq* pReq) {
// if (transQueuePush(&conn->reqs, pReq) != 0) { // if (transQueuePush(&conn->reqsToSend, pReq) != 0) {
// return TSDB_CODE_OUT_OF_MEMORY; // return TSDB_CODE_OUT_OF_MEMORY;
// } // }
// return 0; // return 0;
@ -1218,7 +1058,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
// static int32_t cliRmReqFromConn(SCliConn* conn, SCliReq** pReq) { // static int32_t cliRmReqFromConn(SCliConn* conn, SCliReq** pReq) {
// // do nothing // // do nothing
// SCliReq* pTail = transQueuePop(&conn->reqs); // SCliReq* pTail = transQueuePop(&conn->reqsToSend);
// if (pTail == NULL) { // if (pTail == NULL) {
// return TSDB_CODE_INVALID_PARA; // return TSDB_CODE_INVALID_PARA;
// } // }
@ -1259,7 +1099,7 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn)
code = cliMayUpdateState(pThrd, pReq, pConn); code = cliMayUpdateState(pThrd, pReq, pConn);
addConnToHeapCache(pThrd->connHeapCache, pConn); addConnToHeapCache(pThrd->connHeapCache, pConn);
transQueuePush(&pConn->reqs, pReq); transQueuePush(&pConn->reqsToSend, &pReq->q);
return cliDoConn(pThrd, pConn); return cliDoConn(pThrd, pConn);
_exception: _exception:
// free conn // free conn
@ -1289,7 +1129,7 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
transReqQueueInit(&conn->wreqQueue); transReqQueueInit(&conn->wreqQueue);
TAOS_CHECK_GOTO(transQueueInit(&conn->reqs, NULL), NULL, _failed); TAOS_CHECK_GOTO(transQueueInit(&conn->reqsToSend, NULL), NULL, _failed);
TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed); TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed);
@ -1331,7 +1171,8 @@ _failed:
destroyCliConnQTable(conn); destroyCliConnQTable(conn);
taosHashCleanup(conn->pQTable); taosHashCleanup(conn->pQTable);
(void)transDestroyBuffer(&conn->readBuf); (void)transDestroyBuffer(&conn->readBuf);
transQueueDestroy(&conn->reqs); transQueueDestroy(&conn->reqsToSend);
transQueueDestroy(&conn->reqsSentOut);
taosMemoryFree(conn->dstAddr); taosMemoryFree(conn->dstAddr);
(void)transReleaseExHandle(transGetRefMgt(), conn->refId); (void)transReleaseExHandle(transGetRefMgt(), conn->refId);
@ -1417,8 +1258,9 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) {
int32_t code = 0; int32_t code = 0;
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
while (!transQueueEmpty(&conn->reqs)) { while (!transQueueEmpty(&conn->reqsToSend)) {
SCliReq* pReq = transQueuePop(&conn->reqs); queue* el = transQueuePop(&conn->reqsToSend);
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
// ASSERT(pReq->type != Release); // ASSERT(pReq->type != Release);
// ASSERT(REQUEST_NO_RESP(&pReq->msg) == 0); // ASSERT(REQUEST_NO_RESP(&pReq->msg) == 0);
@ -1446,25 +1288,17 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) {
} }
static void cliConnRmReqs(SCliConn* conn) { static void cliConnRmReqs(SCliConn* conn) {
for (int i = 0; i < transQueueSize(&conn->reqs); i++) { // for (int i = 0; i < transQueueSize(&conn->reqsSentOut); i++) {
SCliReq* pReq = transQueueGet(&conn->reqs, i); // SCliReq* pReq = transQueueGet(&conn->reqsSentOut, i);
if (pReq->sent == 1 && REQUEST_NO_RESP(&pReq->msg)) { // if (pReq->sent == 1 && REQUEST_NO_RESP(&pReq->msg)) {
transQueueRm(&conn->reqs, i); // transQueueRm(&conn->reqsToSend, i);
destroyReq(pReq); // destroyReq(pReq);
i--; // i--;
} // }
} // }
return;
} }
static int32_t cliShouldSendMsg(SCliConn* conn) {
for (int i = 0; i < transQueueSize(&conn->reqs); i++) {
SCliReq* pReq = transQueueGet(&conn->reqs, i);
if (pReq->sent == 0) {
return 1;
}
}
return 0;
}
static void cliSendBatch_shareConnCb(uv_write_t* req, int status) { static void cliSendBatch_shareConnCb(uv_write_t* req, int status) {
SCliConn* conn = req->data; SCliConn* conn = req->data;
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
@ -1488,7 +1322,7 @@ static void cliSendBatch_shareConnCb(uv_write_t* req, int status) {
void cliSendBatch_shareConn(SCliConn* pConn) { void cliSendBatch_shareConn(SCliConn* pConn) {
SCliThrd* pThrd = pConn->hostThrd; SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
int32_t size = transQueueSize(&pConn->reqs); int32_t size = transQueueSize(&pConn->reqsToSend);
int32_t totalLen = 0; int32_t totalLen = 0;
if (size == 0) { if (size == 0) {
@ -1498,11 +1332,9 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
uv_buf_t* wb = taosMemoryCalloc(size, sizeof(uv_buf_t)); uv_buf_t* wb = taosMemoryCalloc(size, sizeof(uv_buf_t));
int j = 0; int j = 0;
for (int i = 0; i < size; i++) { while (!transQueueEmpty(&pConn->reqsToSend)) {
SCliReq* pCliMsg = transQueueGet(&pConn->reqs, i); queue* h = transQueuePop(&pConn->reqsToSend);
if (pCliMsg->sent == 1) { SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, q);
continue;
}
SReqCtx* pCtx = pCliMsg->ctx; SReqCtx* pCtx = pCliMsg->ctx;
pConn->seq++; pConn->seq++;
@ -1550,6 +1382,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
STraceId* trace = &pCliMsg->msg.info.traceId; STraceId* trace = &pCliMsg->msg.info.traceId;
tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%d, qid:%ld", CONN_GET_INST_LABEL(pConn), pConn, tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%d, qid:%ld", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId); TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId);
transQueuePush(&pConn->reqsSentOut, pCliMsg->q);
} }
if (j == 0) { if (j == 0) {
taosMemoryFree(wb); taosMemoryFree(wb);
@ -1566,7 +1399,7 @@ void cliSendBatch_shareConn(SCliConn* pConn) {
int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) { int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) {
int32_t code = 0; int32_t code = 0;
transQueuePush(&pConn->reqs, pCliMsg); transQueuePush(&pConn->reqsToSend, pCliMsg);
if (pConn->connnected) { if (pConn->connnected) {
code = cliSend2(pConn); code = cliSend2(pConn);
} else { } else {
@ -1663,7 +1496,7 @@ _exception2:
static void cliHandleFastFail_resp(SCliConn* pConn, int status) { static void cliHandleFastFail_resp(SCliConn* pConn, int status) {
SCliThrd* pThrd = pConn->hostThrd; SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
SCliReq* pReq = transQueueGet(&pConn->reqs, 0); SCliReq* pReq = transQueueGet(&pConn->reqsToSend, 0);
STraceId* trace = &pReq->msg.info.traceId; STraceId* trace = &pReq->msg.info.traceId;
tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn),
@ -1800,7 +1633,7 @@ static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) {
if (TRANS_CONN_REF_GET(conn) == 2) { if (TRANS_CONN_REF_GET(conn) == 2) {
transUnrefCliHandle(conn); transUnrefCliHandle(conn);
if (!transQueuePush(&conn->reqs, pReq)) { if (!transQueuePush(&conn->reqsToSend, pReq)) {
return; return;
} }
(void)cliSend2(conn); (void)cliSend2(conn);
@ -1809,48 +1642,7 @@ static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) {
destroyReq(pReq); destroyReq(pReq);
} }
} }
static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq) { static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq) { return; }
SReqCtx* pCtx = pReq->ctx;
pThrd->cvtAddr = pCtx->cvtAddr;
destroyReq(pReq);
}
static void cliHandleFreeById(SCliThrd* pThrd, SCliReq* pReq) {
int32_t code = 0;
int64_t refId = (int64_t)(pReq->msg.info.handle);
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
if (exh == NULL) {
tDebug("id %" PRId64 " already released", refId);
destroyReq(pReq);
return;
}
taosRLockLatch(&exh->latch);
SCliConn* conn = exh->handle;
taosRUnLockLatch(&exh->latch);
if (conn == NULL || conn->refId != refId) {
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _exception);
}
tDebug("do free conn %p by id %" PRId64 "", conn, refId);
int32_t size = transQueueSize(&conn->reqs);
if (size == 0) {
// already recv, and notify upper layer
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _exception);
} else {
while (TRANS_CONN_REF_GET(conn) >= 1) {
transUnrefCliHandle(conn);
}
return;
}
_exception:
tDebug("already free conn %p by id %" PRId64 "", conn, refId);
(void)transReleaseExHandle(transGetRefMgt(), refId);
(void)transReleaseExHandle(transGetRefMgt(), refId);
(void)transRemoveExHandle(transGetRefMgt(), refId);
destroyReq(pReq);
}
SCliConn* cliGetConn(SCliReq** pReq, SCliThrd* pThrd, bool* ignore, char* addr) { SCliConn* cliGetConn(SCliReq** pReq, SCliThrd* pThrd, bool* ignore, char* addr) {
SReqCtx* pCtx = (*pReq)->ctx; SReqCtx* pCtx = (*pReq)->ctx;
@ -2322,9 +2114,9 @@ void cliDestroyConnMsgs(SCliConn* conn, bool destroy) {
transCtxCleanup(&conn->ctx); transCtxCleanup(&conn->ctx);
// cliReleaseUnfinishedMsg(conn); // cliReleaseUnfinishedMsg(conn);
if (destroy == 1) { if (destroy == 1) {
transQueueDestroy(&conn->reqs); transQueueDestroy(&conn->reqsToSend);
} else { } else {
transQueueClear(&conn->reqs); transQueueClear(&conn->reqsToSend);
} }
} }
@ -2332,8 +2124,8 @@ void cliConnFreeMsgs(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
for (int i = 0; i < transQueueSize(&conn->reqs); i++) { for (int i = 0; i < transQueueSize(&conn->reqsToSend); i++) {
SCliReq* cmsg = transQueueGet(&conn->reqs, i); SCliReq* cmsg = transQueueGet(&conn->reqsToSend, i);
if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) { if (cmsg->type == Release || REQUEST_NO_RESP(&cmsg->msg) || cmsg->msg.msgType == TDMT_SCH_DROP_TASK) {
continue; continue;
} }
@ -3697,7 +3489,7 @@ static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) {
int32_t compareHeapNode(const HeapNode* a, const HeapNode* b) { int32_t compareHeapNode(const HeapNode* a, const HeapNode* b) {
SCliConn* args1 = container_of(a, SCliConn, node); SCliConn* args1 = container_of(a, SCliConn, node);
SCliConn* args2 = container_of(b, SCliConn, node); SCliConn* args2 = container_of(b, SCliConn, node);
if (transQueueSize(&args1->reqs) > transQueueSize(&args2->reqs)) { if (transQueueSize(&args1->reqsToSend) > transQueueSize(&args2->reqsToSend)) {
return 0; return 0;
} }
return 1; return 1;

View File

@ -437,7 +437,7 @@ void transReqQueueClear(queue* q) {
} }
} }
int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(const void* arg)) { int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(void* arg)) {
QUEUE_INIT(&wq->node); QUEUE_INIT(&wq->node);
wq->freeFunc = (void (*)(const void*))freeFunc; wq->freeFunc = (void (*)(const void*))freeFunc;
wq->size = 0; wq->size = 0;
@ -453,7 +453,7 @@ int32_t transQueuePush(STransQueue* q, void* arg) {
void* transQueuePop(STransQueue* q) { void* transQueuePop(STransQueue* q) {
if (q->size == 0) return NULL; if (q->size == 0) return NULL;
queue* tail = QUEUE_TAIL(&q->node); queue* tail = QUEUE_HEAD(&q->node);
QUEUE_REMOVE(tail); QUEUE_REMOVE(tail);
return tail; return tail;
} }
@ -469,6 +469,23 @@ void* transQueueGet(STransQueue* q, int idx) {
return NULL; return NULL;
} }
void transQueueRemoveByFilter(STransQueue* q, bool (*filter)(void* e, void* arg), void* arg, void* dst, int32_t size) {
queue* d = dst;
queue* node = QUEUE_NEXT(&q->node);
while (node != &q->node) {
queue* next = QUEUE_NEXT(node);
if (filter(node, arg)) {
QUEUE_REMOVE(node);
q->size--;
QUEUE_PUSH(d, node);
if (--size == 0) {
break;
}
}
node = next;
}
}
void* tranQueueHead(STransQueue* q) { void* tranQueueHead(STransQueue* q) {
if (q->size == 0) return NULL; if (q->size == 0) return NULL;
@ -490,6 +507,7 @@ void* transQueueRm(STransQueue* q, int i) {
} }
void transQueueRemove(STransQueue* q, void* e) { void transQueueRemove(STransQueue* q, void* e) {
if (q->size == 0) return;
queue* node = e; queue* node = e;
QUEUE_REMOVE(node); QUEUE_REMOVE(node);
q->size--; q->size--;
@ -501,7 +519,7 @@ void transQueueClear(STransQueue* q) {
while (!QUEUE_IS_EMPTY(q->node)) { while (!QUEUE_IS_EMPTY(q->node)) {
queue* h = QUEUE_HEAD(&q->node); queue* h = QUEUE_HEAD(&q->node);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
q->freeFunc(h); if (q->freeFunc != NULL) (q->freeFunc)(h);
q->size--; q->size--;
} }
} }

View File

@ -36,7 +36,7 @@ typedef struct SSvrConn {
void* pInst; // rpc init void* pInst; // rpc init
void* ahandle; // void* ahandle; //
void* hostThrd; void* hostThrd;
STransQueue srvMsgs; STransQueue resps;
// SSvrRegArg regArg; // SSvrRegArg regArg;
bool broken; // conn broken; bool broken; // conn broken;
@ -63,7 +63,7 @@ typedef struct SSvrConn {
SHashObj* pQTable; SHashObj* pQTable;
} SSvrConn; } SSvrConn;
typedef struct SSvrMsg { typedef struct SSvrRespMsg {
SSvrConn* pConn; SSvrConn* pConn;
STransMsg msg; STransMsg msg;
queue q; queue q;
@ -73,9 +73,7 @@ typedef struct SSvrMsg {
FilteFunc func; FilteFunc func;
int8_t sent; int8_t sent;
queue sendReq; } SSvrRespMsg;
} SSvrMsg;
typedef struct { typedef struct {
int64_t ver; int64_t ver;
@ -158,14 +156,14 @@ static void uvWorkAfterTask(uv_work_t* req, int status);
static void uvWalkCb(uv_handle_t* handle, void* arg); static void uvWalkCb(uv_handle_t* handle, void* arg);
static void uvFreeCb(uv_handle_t* handle); static void uvFreeCb(uv_handle_t* handle);
static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg); static FORCE_INLINE void uvStartSendRespImpl(SSvrRespMsg* smsg);
static int uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb); static int uvPrepareSendData(SSvrRespMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSvrMsg* msg); static void uvStartSendResp(SSvrRespMsg* msg);
static void uvNotifyLinkBrokenToApp(SSvrConn* conn); static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
static FORCE_INLINE void destroySmsg(SSvrMsg* smsg); static FORCE_INLINE void destroySmsg(SSvrRespMsg* smsg);
static FORCE_INLINE SSvrConn* createConn(void* hThrd); static FORCE_INLINE SSvrConn* createConn(void* hThrd);
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/); static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
// static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn); // static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn);
@ -174,13 +172,13 @@ static int32_t reallocConnRef(SSvrConn* conn);
int32_t uvGetConnRefOfThrd(SWorkThrd* thrd) { return thrd ? thrd->connRefMgt : -1; } int32_t uvGetConnRefOfThrd(SWorkThrd* thrd) { return thrd ? thrd->connRefMgt : -1; }
static void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleQuit(SSvrRespMsg* msg, SWorkThrd* thrd);
static void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleRelease(SSvrRespMsg* msg, SWorkThrd* thrd);
static void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleResp(SSvrRespMsg* msg, SWorkThrd* thrd);
static void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleRegister(SSvrRespMsg* msg, SWorkThrd* thrd);
static void uvHandleUpdate(SSvrMsg* pMsg, SWorkThrd* thrd); static void uvHandleUpdate(SSvrRespMsg* pMsg, SWorkThrd* thrd);
static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrd* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, static void (*transAsyncHandle[])(SSvrRespMsg* msg, SWorkThrd* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
uvHandleRegister, uvHandleUpdate}; uvHandleRegister, uvHandleUpdate};
static void uvDestroyConn(uv_handle_t* handle); static void uvDestroyConn(uv_handle_t* handle);
@ -447,17 +445,17 @@ static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
(void)taosHashRemove(pConn->pQTable, &qId, sizeof(qId)); (void)taosHashRemove(pConn->pQTable, &qId, sizeof(qId));
} }
STransMsg tmsg = {.code = code, STransMsg tmsg = {.code = code,
.msgType = pHead->msgType + 1, .msgType = pHead->msgType + 1,
.info.qId = qId, .info.qId = qId,
.info.traceId = pHead->traceId, .info.traceId = pHead->traceId,
.info.seqNum = htonl(pHead->seqNum)}; .info.seqNum = htonl(pHead->seqNum)};
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); SSvrRespMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrRespMsg));
srvMsg->msg = tmsg; srvMsg->msg = tmsg;
srvMsg->type = Normal; srvMsg->type = Normal;
srvMsg->pConn = pConn; srvMsg->pConn = pConn;
transQueuePush(&pConn->srvMsgs, srvMsg); transQueuePush(&pConn->resps, &srvMsg->q);
uvStartSendRespImpl(srvMsg); uvStartSendRespImpl(srvMsg);
return 1; return 1;
@ -635,8 +633,8 @@ void uvOnSendCb(uv_write_t* req, int status) {
queue* head = QUEUE_HEAD(&src); queue* head = QUEUE_HEAD(&src);
QUEUE_REMOVE(head); QUEUE_REMOVE(head);
SSvrMsg* smsg = QUEUE_DATA(head, SSvrMsg, sendReq); SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q);
STraceId* trace = &smsg->msg.info.traceId; STraceId* trace = &smsg->msg.info.traceId;
tGDebug("%s conn %p msg already send out, seqNum:%d, qid:%ld", transLabel(conn->pInst), conn, tGDebug("%s conn %p msg already send out, seqNum:%d, qid:%ld", transLabel(conn->pInst), conn,
smsg->msg.info.seqNum, smsg->msg.info.qId); smsg->msg.info.seqNum, smsg->msg.info.qId);
destroySmsg(smsg); destroySmsg(smsg);
@ -646,8 +644,8 @@ void uvOnSendCb(uv_write_t* req, int status) {
queue* head = QUEUE_HEAD(&src); queue* head = QUEUE_HEAD(&src);
QUEUE_REMOVE(head); QUEUE_REMOVE(head);
SSvrMsg* smsg = QUEUE_DATA(head, SSvrMsg, sendReq); SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q);
STraceId* trace = &smsg->msg.info.traceId; STraceId* trace = &smsg->msg.info.traceId;
tGDebug("%s conn %p failed to send, seqNum:%d, qid:%ld, reason:%s", transLabel(conn->pInst), conn, tGDebug("%s conn %p failed to send, seqNum:%d, qid:%ld, reason:%s", transLabel(conn->pInst), conn,
smsg->msg.info.seqNum, smsg->msg.info.qId, uv_err_name(status)); smsg->msg.info.seqNum, smsg->msg.info.qId, uv_err_name(status));
destroySmsg(smsg); destroySmsg(smsg);
@ -676,7 +674,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
taosMemoryFree(req); taosMemoryFree(req);
} }
static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { static int uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) {
SSvrConn* pConn = smsg->pConn; SSvrConn* pConn = smsg->pConn;
STransMsg* pMsg = &smsg->msg; STransMsg* pMsg = &smsg->msg;
if (pMsg->pCont == 0) { if (pMsg->pCont == 0) {
@ -699,7 +697,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
// handle invalid drop_task resp, TD-20098 // handle invalid drop_task resp, TD-20098
if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
ASSERT(0); ASSERT(0);
// (void)transQueuePop(&pConn->srvMsgs); // (void)transQueuePop(&pConn->resps);
// destroySmsg(smsg); // destroySmsg(smsg);
// return TSDB_CODE_INVALID_MSG; // return TSDB_CODE_INVALID_MSG;
} }
@ -728,30 +726,27 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
wb->len = len; wb->len = len;
return 0; return 0;
} }
static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* bufNum, queue* sendReqNode) { static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* bufNum, queue* toSendQ) {
int32_t size = transQueueSize(&pConn->resps);
tDebug("%s conn %p has %d msg to send", transLabel(pConn->pInst), pConn, size);
if (size == 0) {
return 0;
}
int32_t count = 0; int32_t count = 0;
int32_t size = transQueueSize(&pConn->srvMsgs);
uv_buf_t* pWb = taosMemoryCalloc(size, sizeof(uv_buf_t)); uv_buf_t* pWb = taosMemoryCalloc(size, sizeof(uv_buf_t));
if (pWb == NULL) { if (pWb == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
tDebug("%s conn %p has %d msg to send", transLabel(pConn->pInst), pConn, size); while (transQueueSize(&pConn->resps) > 0) {
for (int32_t i = 0; i < transQueueSize(&pConn->srvMsgs); i++) { queue* el = transQueuePop(&pConn->resps);
SSvrMsg* pMsg = transQueueGet(&pConn->srvMsgs, i); SSvrRespMsg* pMsg = QUEUE_DATA(el, SSvrRespMsg, q);
if (pMsg->sent == 1) { uv_buf_t wb;
continue;
}
uv_buf_t wb;
(void)uvPrepareSendData(pMsg, &wb); (void)uvPrepareSendData(pMsg, &wb);
pWb[count] = wb; pWb[count] = wb;
pMsg->sent = 1; pMsg->sent = 1;
QUEUE_PUSH(toSendQ, &pMsg->q);
QUEUE_PUSH(sendReqNode, &pMsg->sendReq);
transQueueRm(&pConn->srvMsgs, i);
i--;
count++; count++;
} }
@ -766,7 +761,7 @@ static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* buf
return 0; return 0;
} }
static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) { static FORCE_INLINE void uvStartSendRespImpl(SSvrRespMsg* smsg) {
int32_t code = 0; int32_t code = 0;
SSvrConn* pConn = smsg->pConn; SSvrConn* pConn = smsg->pConn;
if (pConn->broken) { if (pConn->broken) {
@ -804,7 +799,7 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) {
(void)uv_write(req, (uv_stream_t*)pConn->pTcp, pBuf, bufNum, uvOnSendCb); (void)uv_write(req, (uv_stream_t*)pConn->pTcp, pBuf, bufNum, uvOnSendCb);
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
} }
int32_t uvMayHandleReleaseResp(SSvrMsg* pMsg) { int32_t uvMayHandleReleaseResp(SSvrRespMsg* pMsg) {
SSvrConn* pConn = pMsg->pConn; SSvrConn* pConn = pMsg->pConn;
int64_t qid = pMsg->msg.info.qId; int64_t qid = pMsg->msg.info.qId;
if (pMsg->msg.msgType == TDMT_SCH_TASK_RELEASE && qid > 0) { if (pMsg->msg.msgType == TDMT_SCH_TASK_RELEASE && qid > 0) {
@ -819,7 +814,7 @@ int32_t uvMayHandleReleaseResp(SSvrMsg* pMsg) {
} }
return 0; return 0;
} }
static void uvStartSendResp(SSvrMsg* smsg) { static void uvStartSendResp(SSvrRespMsg* smsg) {
// impl // impl
SSvrConn* pConn = smsg->pConn; SSvrConn* pConn = smsg->pConn;
if (uvMayHandleReleaseResp(smsg) == TSDB_CODE_RPC_NO_STATE) { if (uvMayHandleReleaseResp(smsg) == TSDB_CODE_RPC_NO_STATE) {
@ -827,19 +822,19 @@ static void uvStartSendResp(SSvrMsg* smsg) {
return; return;
} }
transQueuePush(&pConn->srvMsgs, smsg); transQueuePush(&pConn->resps, &smsg->q);
uvStartSendRespImpl(smsg); uvStartSendRespImpl(smsg);
return; return;
} }
static FORCE_INLINE void destroySmsg(SSvrMsg* smsg) { static FORCE_INLINE void destroySmsg(SSvrRespMsg* smsg) {
if (smsg == NULL) { if (smsg == NULL) {
return; return;
} }
transFreeMsg(smsg->msg.pCont); transFreeMsg(smsg->msg.pCont);
taosMemoryFree(smsg); taosMemoryFree(smsg);
} }
static FORCE_INLINE void destroySmsgWrapper(void* smsg, void* param) { destroySmsg((SSvrMsg*)smsg); } static FORCE_INLINE void destroySmsgWrapper(void* smsg, void* param) { destroySmsg((SSvrRespMsg*)smsg); }
static void destroyAllConn(SWorkThrd* pThrd) { static void destroyAllConn(SWorkThrd* pThrd) {
tTrace("thread %p destroy all conn ", pThrd); tTrace("thread %p destroy all conn ", pThrd);
@ -870,7 +865,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
queue* head = QUEUE_HEAD(&wq); queue* head = QUEUE_HEAD(&wq);
QUEUE_REMOVE(head); QUEUE_REMOVE(head);
SSvrMsg* msg = QUEUE_DATA(head, SSvrMsg, q); SSvrRespMsg* msg = QUEUE_DATA(head, SSvrRespMsg, q);
if (msg == NULL) { if (msg == NULL) {
tError("unexcept occurred, continue"); tError("unexcept occurred, continue");
continue; continue;
@ -1183,7 +1178,10 @@ void* transWorkerThread(void* arg) {
return NULL; return NULL;
} }
void uvDestroyResp(void* e) {
SSvrRespMsg* pMsg = QUEUE_DATA(e, SSvrRespMsg, q);
destroySmsg(pMsg);
}
static FORCE_INLINE SSvrConn* createConn(void* hThrd) { static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
int32_t code = 0; int32_t code = 0;
SWorkThrd* pThrd = hThrd; SWorkThrd* pThrd = hThrd;
@ -1197,7 +1195,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
transReqQueueInit(&pConn->wreqQueue); transReqQueueInit(&pConn->wreqQueue);
QUEUE_INIT(&pConn->queue); QUEUE_INIT(&pConn->queue);
if ((code = transQueueInit(&pConn->srvMsgs, NULL)) != 0) { if ((code = transQueueInit(&pConn->resps, uvDestroyResp)) != 0) {
TAOS_CHECK_GOTO(code, &lino, _end); TAOS_CHECK_GOTO(code, &lino, _end);
} }
@ -1260,7 +1258,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
return pConn; return pConn;
_end: _end:
if (pConn) { if (pConn) {
transQueueDestroy(&pConn->srvMsgs); transQueueDestroy(&pConn->resps);
(void)transDestroyBuffer(&pConn->readBuf); (void)transDestroyBuffer(&pConn->readBuf);
taosHashCleanup(pConn->pQTable); taosHashCleanup(pConn->pQTable);
taosMemoryFree(pConn->pTcp); taosMemoryFree(pConn->pTcp);
@ -1334,11 +1332,11 @@ static void uvDestroyConn(uv_handle_t* handle) {
STrans* pInst = thrd->pInst; STrans* pInst = thrd->pInst;
tDebug("%s conn %p destroy", transLabel(pInst), conn); tDebug("%s conn %p destroy", transLabel(pInst), conn);
for (int i = 0; i < transQueueSize(&conn->srvMsgs); i++) { // for (int i = 0; i < transQueueSize(&conn->resps); i++) {
SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i); // SSvrRespMsg* msg = transQueueGet(&conn->resps, i);
destroySmsg(msg); // destroySmsg(msg);
} // }
transQueueDestroy(&conn->srvMsgs); transQueueDestroy(&conn->resps);
transReqQueueClear(&conn->wreqQueue); transReqQueueClear(&conn->wreqQueue);
QUEUE_REMOVE(&conn->queue); QUEUE_REMOVE(&conn->queue);
@ -1566,7 +1564,7 @@ End:
return NULL; return NULL;
} }
void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd) { void uvHandleQuit(SSvrRespMsg* msg, SWorkThrd* thrd) {
thrd->quit = true; thrd->quit = true;
if (QUEUE_IS_EMPTY(&thrd->conn)) { if (QUEUE_IS_EMPTY(&thrd->conn)) {
uv_walk(thrd->loop, uvWalkCb, NULL); uv_walk(thrd->loop, uvWalkCb, NULL);
@ -1575,12 +1573,12 @@ void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd) {
} }
taosMemoryFree(msg); taosMemoryFree(msg);
} }
void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) { void uvHandleRelease(SSvrRespMsg* msg, SWorkThrd* thrd) {
ASSERT(0); ASSERT(0);
// int32_t code = 0; // int32_t code = 0;
// SSvrConn* conn = msg->pConn; // SSvrConn* conn = msg->pConn;
// if (conn->status == ConnAcquire) { // if (conn->status == ConnAcquire) {
// if (!transQueuePush(&conn->srvMsgs, msg)) { // if (!transQueuePush(&conn->resps, msg)) {
// return; // return;
// } // }
// uvStartSendRespImpl(msg); // uvStartSendRespImpl(msg);
@ -1591,13 +1589,13 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) {
// destroySmsg(msg); // destroySmsg(msg);
} }
void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd) { void uvHandleResp(SSvrRespMsg* msg, SWorkThrd* thrd) {
// send msg to client // send msg to client
tDebug("%s conn %p start to send resp (2/2)", transLabel(thrd->pInst), msg->pConn); tDebug("%s conn %p start to send resp (2/2)", transLabel(thrd->pInst), msg->pConn);
uvStartSendResp(msg); uvStartSendResp(msg);
} }
int32_t uvHandleStateReq(SSvrMsg* msg) { int32_t uvHandleStateReq(SSvrRespMsg* msg) {
int32_t code = 0; int32_t code = 0;
SSvrConn* conn = msg->pConn; SSvrConn* conn = msg->pConn;
tDebug("%s conn %p start to register brokenlink callback, qid:%" PRId64 "", transLabel(conn->pInst), conn, tDebug("%s conn %p start to register brokenlink callback, qid:%" PRId64 "", transLabel(conn->pInst), conn,
@ -1613,14 +1611,14 @@ int32_t uvHandleStateReq(SSvrMsg* msg) {
if (code == 0) tDebug("conn %p register brokenlink callback succ", conn); if (code == 0) tDebug("conn %p register brokenlink callback succ", conn);
return code; return code;
} }
void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) { void uvHandleRegister(SSvrRespMsg* msg, SWorkThrd* thrd) {
SSvrConn* conn = msg->pConn; SSvrConn* conn = msg->pConn;
tDebug("%s conn %p register brokenlink callback", transLabel(thrd->pInst), conn); tDebug("%s conn %p register brokenlink callback", transLabel(thrd->pInst), conn);
int32_t code = uvHandleStateReq(msg); int32_t code = uvHandleStateReq(msg);
taosMemoryFree(msg); taosMemoryFree(msg);
} }
void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) { void uvHandleUpdate(SSvrRespMsg* msg, SWorkThrd* thrd) {
SUpdateIpWhite* req = msg->arg; SUpdateIpWhite* req = msg->arg;
if (req == NULL) { if (req == NULL) {
tDebug("ip-white-list disable on trans"); tDebug("ip-white-list disable on trans");
@ -1665,7 +1663,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
} }
(void)taosThreadJoin(pThrd->thread, NULL); (void)taosThreadJoin(pThrd->thread, NULL);
SRV_RELEASE_UV(pThrd->loop); SRV_RELEASE_UV(pThrd->loop);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsgWrapper, NULL); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrRespMsg, destroySmsgWrapper, NULL);
transAsyncPoolDestroy(pThrd->asyncPool); transAsyncPoolDestroy(pThrd->asyncPool);
uvWhiteListDestroy(pThrd->pWhiteList); uvWhiteListDestroy(pThrd->pWhiteList);
@ -1675,7 +1673,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
taosMemoryFree(pThrd); taosMemoryFree(pThrd);
} }
void sendQuitToWorkThrd(SWorkThrd* pThrd) { void sendQuitToWorkThrd(SWorkThrd* pThrd) {
SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg)); SSvrRespMsg* msg = taosMemoryCalloc(1, sizeof(SSvrRespMsg));
msg->type = Quit; msg->type = Quit;
tDebug("server send quit msg to work thread"); tDebug("server send quit msg to work thread");
(void)transAsyncSend(pThrd->asyncPool, &msg->q); (void)transAsyncSend(pThrd->asyncPool, &msg->q);
@ -1752,7 +1750,7 @@ int32_t transReleaseSrvHandle(void* handle) {
.info.qId = qId, .info.qId = qId,
.info.traceId = info->traceId}; .info.traceId = info->traceId};
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); SSvrRespMsg* m = taosMemoryCalloc(1, sizeof(SSvrRespMsg));
if (m == NULL) { if (m == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _return1; goto _return1;
@ -1803,7 +1801,7 @@ int32_t transSendResponse(const STransMsg* msg) {
SWorkThrd* pThrd = exh->pThrd; SWorkThrd* pThrd = exh->pThrd;
ASYNC_ERR_JRET(pThrd); ASYNC_ERR_JRET(pThrd);
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); SSvrRespMsg* m = taosMemoryCalloc(1, sizeof(SSvrRespMsg));
if (m == NULL) { if (m == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _return1; goto _return1;
@ -1851,7 +1849,7 @@ int32_t transRegisterMsg(const STransMsg* msg) {
SWorkThrd* pThrd = exh->pThrd; SWorkThrd* pThrd = exh->pThrd;
ASYNC_ERR_JRET(pThrd); ASYNC_ERR_JRET(pThrd);
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); SSvrRespMsg* m = taosMemoryCalloc(1, sizeof(SSvrRespMsg));
if (m == NULL) { if (m == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _return1; goto _return1;
@ -1895,7 +1893,7 @@ int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) {
for (int i = 0; i < svrObj->numOfThreads; i++) { for (int i = 0; i < svrObj->numOfThreads; i++) {
SWorkThrd* pThrd = svrObj->pThreadObj[i]; SWorkThrd* pThrd = svrObj->pThreadObj[i];
SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg)); SSvrRespMsg* msg = taosMemoryCalloc(1, sizeof(SSvrRespMsg));
if (msg == NULL) { if (msg == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
break; break;