Merge branch 'feature/3_liaohj' of github.com:taosdata/tdengine into feature/3_liaohj

This commit is contained in:
Haojun Liao 2023-03-05 13:26:41 +08:00
commit 6c14139805
4 changed files with 45 additions and 26 deletions

View File

@ -66,6 +66,15 @@ extern int32_t tMsgDict[];
typedef uint16_t tmsg_t;
static inline bool tmsgIsValid(tmsg_t type) {
if (type < TDMT_DND_MAX_MSG || type < TDMT_MND_MAX_MSG || type < TDMT_VND_MAX_MSG || type < TDMT_SCH_MAX_MSG ||
type < TDMT_STREAM_MAX_MSG || type < TDMT_MON_MAX_MSG || type < TDMT_SYNC_MAX_MSG || type < TDMT_VND_STREAM_MSG ||
type < TDMT_VND_TMQ_MSG || type < TDMT_VND_TMQ_MAX_MSG) {
return true;
} else {
return false;
}
}
static inline bool vnodeIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
@ -1911,10 +1920,10 @@ typedef struct {
} SMqConsumerLostMsg, SMqConsumerRecoverMsg, SMqConsumerClearMsg;
typedef struct {
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
char clientId[256];
SArray* topicNames; // SArray<char**>
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
char clientId[256];
SArray* topicNames; // SArray<char**>
} SCMSubscribeReq;
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
@ -2691,7 +2700,7 @@ typedef struct {
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int8_t subType;
int8_t withMeta;
char* qmsg; //SubPlanToString
char* qmsg; // SubPlanToString
int64_t suid;
} SMqRebVgReq;

View File

@ -159,9 +159,9 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
rpcInit.retryMaxTimouet = tsMaxRetryWaitTime;
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 5);
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500);
connLimitNum = TMIN(connLimitNum, 1000);
rpcInit.connLimitNum = connLimitNum;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;

View File

@ -41,7 +41,7 @@ bool tsPrintAuth = false;
// queue & threads
int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfRpcSessions = 6000;
int32_t tsNumOfRpcSessions = 10000;
int32_t tsTimeToGetAvailableConn = 500000;
int32_t tsNumOfCommitThreads = 2;
int32_t tsNumOfTaskQueueThreads = 4;

View File

@ -692,6 +692,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
}
list->numOfConn++;
}
tTrace("%s numOfConn: %d, limit: %d", pTransInst->label, list->numOfConn, pTransInst->connLimitNum);
return NULL;
}
@ -803,7 +804,6 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf;
tTrace("%s conn %p alloc read buf", CONN_GET_INST_LABEL(conn), conn);
transAllocBuffer(pBuf, buf);
}
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
@ -877,7 +877,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
static void cliDestroyConn(SCliConn* conn, bool clear) {
SCliThrd* pThrd = conn->hostThrd;
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
conn->broken = true;
QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q);
@ -1115,19 +1115,18 @@ void cliSend(SCliConn* pConn) {
msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen));
}
if ((pHead->msgType > TDMT_VND_TMQ_MSG && pHead->msgType < TDMT_VND_TMQ_MAX_MSG) ||
(pHead->msgType > TDMT_MND_MSG && pHead->msgType < TDMT_MND_MAX_MSG)) {
char buf[128] = {0};
sprintf(buf, "%s", TMSG_INFO(pHead->msgType));
int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
if (NULL == 0) {
int localCount = 1;
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
} else {
int localCount = *count + 1;
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
}
}
// if (tmsgIsValid(pHead->msgType)) {
// char buf[128] = {0};
// sprintf(buf, "%s", TMSG_INFO(pHead->msgType));
// int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
// if (NULL == 0) {
// int localCount = 1;
// taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
// } else {
// int localCount = *count + 1;
// taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
// }
// }
tGDebug("%s conn %p %s is sent to %s, local info %s, len:%d", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, msgLen);
@ -1262,7 +1261,7 @@ static void cliSendBatchCb(uv_write_t* req, int status) {
} else {
tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen,
p->batchSize);
if (!uv_is_closing((uv_handle_t*)&conn->stream)) {
if (!uv_is_closing((uv_handle_t*)&conn->stream) && conn->broken == false) {
if (nxtBatch != NULL) {
conn->pBatch = nxtBatch;
cliSendBatch(conn);
@ -1523,6 +1522,18 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
destroyCmsg(pMsg);
return;
}
if (tmsgIsValid(pMsg->msg.msgType)) {
char buf[128] = {0};
sprintf(buf, "%s", TMSG_INFO(pMsg->msg.msgType));
int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
if (NULL == 0) {
int localCount = 1;
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
} else {
int localCount = *count + 1;
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
}
}
char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet);
uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet);
@ -2365,8 +2376,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
}
}
if ((pResp->msgType - 1 > TDMT_VND_TMQ_MSG && pResp->msgType - 1 < TDMT_VND_TMQ_MAX_MSG) ||
(pResp->msgType - 1 > TDMT_MND_MSG && pResp->msgType - 1 < TDMT_MND_MAX_MSG)) {
if (tmsgIsValid(pResp->msgType - 1)) {
char buf[128] = {0};
sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1));
int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));