Merge branch '3.0' into enh/opt-transport

This commit is contained in:
yihaoDeng 2024-09-16 09:29:43 +08:00
parent e9d84f8131
commit b5c1b672a4
4 changed files with 34 additions and 81 deletions

View File

@ -63,7 +63,7 @@ typedef struct SRpcHandleInfo {
int8_t forbiddenIp;
int8_t notFreeAhandle;
int8_t compressed;
int32_t seqNum; // msg seq
int64_t seqNum; // msg seq
int64_t qId; // queryId Get from client, other req's qId = -1;
int32_t refIdMgt;
int32_t msgType;

View File

@ -185,7 +185,7 @@ typedef struct {
uint32_t code; // del later
uint32_t msgType;
int32_t msgLen;
int32_t seqNum;
int64_t seqNum;
uint8_t content[0]; // message body starts from here
} STransMsgHead;

View File

@ -92,7 +92,7 @@ typedef struct SCliConn {
int32_t port;
int64_t refId;
int32_t seq;
int64_t seq;
int8_t registered;
int8_t connnected;
@ -127,7 +127,7 @@ typedef struct SCliReq {
uint64_t st;
int sent; //(0: no send, 1: alread sent)
queue seqq;
int32_t seq;
int64_t seq;
queue qlist;
} SCliReq;
@ -375,7 +375,7 @@ void destroyCliConnQTable(SCliConn* conn) {
conn->pQTable = NULL;
}
bool filteBySeq(void* key, void* arg) {
int32_t* seq = arg;
int64_t* seq = arg;
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
if (pReq->seq == *seq) {
return true;
@ -383,7 +383,7 @@ bool filteBySeq(void* key, void* arg) {
return false;
}
}
int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) {
int32_t cliGetReqBySeq(SCliConn* conn, int64_t seq, SCliReq** pReq) {
int32_t code = 0;
queue set;
QUEUE_INIT(&set)
@ -432,7 +432,7 @@ int32_t cliBuildRespFromCont(SCliReq* pReq, STransMsg* pResp, STransMsgHead* pHe
pResp->info.traceId = pHead->traceId;
pResp->info.hasEpSet = pHead->hasEpSet;
pResp->info.cliVer = htonl(pHead->compatibilityVer);
pResp->info.seqNum = htonl(pHead->seqNum);
pResp->info.seqNum = taosHton64(pHead->seqNum);
int64_t qid = taosHton64(pHead->qid);
pResp->info.handle = (void*)qid;
@ -444,8 +444,8 @@ int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead
if (pHead->msgType == TDMT_SCH_TASK_RELEASE || pHead->msgType == TDMT_SCH_TASK_RELEASE + 1) {
int64_t qId = taosHton64(pHead->qid);
STraceId* trace = &pHead->traceId;
int32_t seqNum = htonl(pHead->seqNum);
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seqNum:%d, qid:%" PRId64 "",
int64_t seqNum = taosHton64(pHead->seqNum);
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, seqNum:%" PRId64 ", qid:%" PRId64 "",
CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), conn->dst, conn->src, pHead->msgLen, seqNum,
qId);
@ -541,7 +541,7 @@ void cliHandleResp(SCliConn* conn) {
int64_t qId = taosHton64(pHead->qid);
pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen);
int32_t seq = htonl(pHead->seqNum);
int64_t seq = taosHton64(pHead->seqNum);
STransMsg resp = {0};
if (cliHandleState_mayHandleReleaseResp(conn, pHead)) {
@ -558,12 +558,13 @@ void cliHandleResp(SCliConn* conn) {
code = cliNotifyCb(conn, NULL, &resp);
return;
} else {
tDebug("%s conn %p recv unexpected packet, seqNum:%d,qid:%" PRId64 " reason:%s", CONN_GET_INST_LABEL(conn), conn,
seq, qId, tstrerror(code));
tDebug("%s conn %p recv unexpected packet, seqNum:%" PRId64 ",qid:%" PRId64 " reason:%s",
CONN_GET_INST_LABEL(conn), conn, seq, qId, tstrerror(code));
}
if (code != 0) {
tDebug("%s conn %p recv unexpected packet, seqNum:%d, qId:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, seq,
qId, tstrerror(code));
tDebug("%s conn %p recv unexpected packet, msgType:%s, seqNum:%" PRId64 ", qId:%" PRId64
", the sever sends repeated response,reason:%s",
CONN_GET_INST_LABEL(conn), conn, TMSG_INFO(pHead->msgType), seq, qId, tstrerror(code));
// TODO: notify cb
if (cliMayRecycleConn(conn)) {
return;
@ -603,7 +604,7 @@ void cliConnTimeout(uv_timer_t* handle) {
return;
}
tTrace("%s conn %p conn timeout", CONN_GET_INST_LABEL(conn));
tTrace("%s conn %p conn timeout", CONN_GET_INST_LABEL(conn), conn);
}
void* createConnPool(int size) {
@ -631,50 +632,6 @@ void* destroyConnPool(SCliThrd* pThrd) {
return NULL;
}
// static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
// void* pool = pThrd->pool;
// STrans* pTranInst = pThrd->pInst;
// size_t klen = strlen(key);
// SConnList* plist = taosHashGet((SHashObj*)pool, key, klen);
// if (plist == NULL) {
// SConnList list = {0};
// (void)taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list));
// plist = taosHashGet(pool, key, klen);
// // SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
// // QUEUE_INIT(&nList->msgQ);
// // nList->numOfConn++;
// QUEUE_INIT(&plist->conns);
// //plist->list = nList;
// }
// if (QUEUE_IS_EMPTY(&plist->conns)) {
// if (plist->list->numOfConn >= pTranInst->connLimitNum) {
// *exceed = true;
// return NULL;
// }
// plist->list->numOfConn++;
// return NULL;
// }
// queue* h = QUEUE_TAIL(&plist->conns);
// QUEUE_REMOVE(h);
// plist->size -= 1;
// SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
// conn->status = ConnNormal;
// QUEUE_INIT(&conn->q);
// tDebug("conn %p get from pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr);
// if (conn->task != NULL) {
// transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
// conn->task = NULL;
// }
// conn->seq++;
// return conn;
// }
static int32_t getOrCreateConnList(SCliThrd* pThrd, const char* key, SConnList** ppList) {
int32_t code = 0;
void* pool = pThrd->pool;
@ -724,7 +681,6 @@ static int32_t cliGetConnFromPool(SCliThrd* pThrd, const char* key, SCliConn** p
SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
conn->status = ConnNormal;
QUEUE_INIT(&conn->q);
conn->seq = 0;
conn->list = plist;
if (conn->task != NULL) {
@ -773,8 +729,6 @@ static void addConnToPool(void* pool, SCliConn* conn) {
conn->list->size += 1;
tDebug("conn %p added to pool, pool size: %d, dst: %s", conn, conn->list->size, conn->dstAddr);
conn->seq = 0;
if (conn->list->size >= 10) {
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
if (arg == NULL) return;
@ -1197,7 +1151,7 @@ int32_t cliBatchSend(SCliConn* pConn) {
pHead->compatibilityVer = htonl(pInst->compatibilityVer);
}
pHead->timestamp = taosHton64(taosGetTimestampUs());
pHead->seqNum = htonl(pConn->seq);
pHead->seqNum = taosHton64(pConn->seq);
pHead->qid = taosHton64(pReq->info.qId);
if (pHead->comp == 0) {
@ -3210,7 +3164,7 @@ static int32_t delConnFromHeapCache(SHashObj* pConnHeapCache, SCliConn* pConn) {
}
int32_t code = transHeapDelete(p, pConn);
if (code != 0) {
tDebug("%s conn failed to delete conn %p from heap cache since %s", pConn, tstrerror(code));
tDebug("conn %p failed delete from heap cache since %s", pConn, tstrerror(code));
}
return code;
}

View File

@ -71,7 +71,7 @@ typedef struct SSvrRespMsg {
STransMsg msg;
queue q;
STransMsgType type;
int32_t seqNum;
int64_t seqNum;
void* arg;
FilteFunc func;
int8_t sent;
@ -393,13 +393,12 @@ static void uvPerfLog_receive(SSvrConn* pConn, STransMsgHead* pHead, STransMsg*
if (pConn->status == ConnNormal && pHead->noResp == 0) {
if (cost >= EXCEPTION_LIMIT_US) {
tGDebug(
"%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception, seqNum:%d, qid:%" PRId64
"",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
(int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId);
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception, seqNum:%" PRId64
", qid:%" PRId64 "",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
(int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId);
} else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, seqNum:%d, qid:%" PRId64 "",
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, seqNum:%" PRId64 ", qid:%" PRId64 "",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
(int)cost, pTransMsg->info.seqNum, pTransMsg->info.qId);
}
@ -407,15 +406,15 @@ static void uvPerfLog_receive(SSvrConn* pConn, STransMsgHead* pHead, STransMsg*
if (cost >= EXCEPTION_LIMIT_US) {
tGDebug(
"%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception, "
"seqNum:%d, qid:%" PRId64 "",
"seqNum:%" PRId64 ", qid:%" PRId64 "",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId);
} else {
tGDebug(
"%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, seqNum:%d, "
"qid:%" PRId64 "",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId);
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, seqNum:%" PRId64
", "
"qid:%" PRId64 "",
transLabel(pInst), pConn, TMSG_INFO(pTransMsg->msgType), pConn->dst, pConn->src, pTransMsg->contLen,
pHead->noResp, pTransMsg->code, (int)(cost), pTransMsg->info.seqNum, pTransMsg->info.qId);
}
}
tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pInst), pTransMsg->info.handle, pConn,
@ -457,7 +456,7 @@ static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
.msgType = pHead->msgType + 1,
.info.qId = qId,
.info.traceId = pHead->traceId,
.info.seqNum = htonl(pHead->seqNum)};
.info.seqNum = taosHton64(pHead->seqNum)};
SSvrRespMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrRespMsg));
srvMsg->msg = tmsg;
@ -573,7 +572,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
transMsg.info.cliVer = htonl(pHead->compatibilityVer);
transMsg.info.forbiddenIp = forbiddenIp;
transMsg.info.noResp = pHead->noResp == 1 ? 1 : 0;
transMsg.info.seqNum = htonl(pHead->seqNum);
transMsg.info.seqNum = taosHton64(pHead->seqNum);
transMsg.info.qId = taosHton64(pHead->qid);
transMsg.info.msgType = pHead->msgType;
@ -671,7 +670,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
SSvrRespMsg* smsg = QUEUE_DATA(head, SSvrRespMsg, q);
STraceId* trace = &smsg->msg.info.traceId;
tGDebug("%s conn %p msg already send out, seqNum:%d, qid:%" PRId64 "", transLabel(conn->pInst), conn,
tGDebug("%s conn %p msg already send out, seqNum:%" PRId64 ", qid:%" PRId64 "", transLabel(conn->pInst), conn,
smsg->msg.info.seqNum, smsg->msg.info.qId);
destroySmsg(smsg);
}
@ -723,7 +722,7 @@ static int uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) {
pHead->magicNum = htonl(TRANS_MAGIC_NUM);
pHead->compatibilityVer = htonl(((STrans*)pConn->pInst)->compatibilityVer);
pHead->version = TRANS_VER;
pHead->seqNum = htonl(pMsg->info.seqNum);
pHead->seqNum = taosHton64(pMsg->info.seqNum);
pHead->qid = taosHton64(pMsg->info.qId);
pHead->withUserInfo = pConn->userInited == 0 ? 1 : 0;