remove unused code

This commit is contained in:
yihaoDeng 2024-12-26 13:44:44 +08:00
parent aeb8067595
commit cc027f96a2
1 changed files with 2 additions and 141 deletions

View File

@ -224,8 +224,6 @@ static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle);
SCliBatch* cliGetHeadFromList(SCliBatchList* pList);
static void destroyCliConnQTable(SCliConn* conn);
static void cliHandleException(SCliConn* conn);
@ -1299,8 +1297,8 @@ static void cliHandleException(SCliConn* conn) {
if (conn->registered) {
int8_t ref = transGetRefCount(conn);
if (ref == 0 && !uv_is_closing((uv_handle_t*)conn->stream)) {
// tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd,
// conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds);
// tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd,
// conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds);
uv_close((uv_handle_t*)conn->stream, cliDestroy);
}
}
@ -2124,144 +2122,7 @@ static void cliDoReq(queue* wq, SCliThrd* pThrd) {
tTrace("cli process batch size:%d", count);
}
}
SCliBatch* cliGetHeadFromList(SCliBatchList* pList) {
if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt > pList->connMax || pList->sending > pList->connMax) {
return NULL;
}
queue* hr = QUEUE_HEAD(&pList->wq);
QUEUE_REMOVE(hr);
pList->sending += 1;
pList->len -= 1;
SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq);
return batch;
}
static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* pReq);
static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port);
static void destroyBatchList(SCliBatchList* pList);
static void cliBuildBatch(SCliReq* pReq, queue* h, SCliThrd* pThrd) {
int32_t code = 0;
STrans* pInst = pThrd->pInst;
SReqCtx* pCtx = pReq->ctx;
char* ip = EPSET_GET_INUSE_IP(pCtx->epSet);
uint32_t port = EPSET_GET_INUSE_PORT(pCtx->epSet);
char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
size_t klen = strlen(key);
SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen);
if (ppBatchList == NULL || *ppBatchList == NULL) {
SCliBatchList* pBatchList = NULL;
code = createBatchList(&pBatchList, key, ip, port);
if (code != 0) {
destroyReq(pReq);
return;
}
pBatchList->batchLenLimit = pInst->shareConnLimit;
SCliBatch* pBatch = NULL;
code = createBatch(&pBatch, pBatchList, pReq);
if (code != 0) {
destroyBatchList(pBatchList);
destroyReq(pReq);
return;
}
code = taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*));
if (code != 0) {
destroyBatchList(pBatchList);
}
} else {
if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) {
SCliBatch* pBatch = NULL;
code = createBatch(&pBatch, *ppBatchList, pReq);
if (code != 0) {
destroyReq(pReq);
cliDestroyBatch(pBatch);
}
} else {
queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq));
SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq);
if ((pBatch->shareConnLimit + pReq->msg.contLen) < (*ppBatchList)->batchLenLimit) {
QUEUE_PUSH(&pBatch->wq, h);
pBatch->shareConnLimit += pReq->msg.contLen;
pBatch->wLen += 1;
} else {
SCliBatch* tBatch = NULL;
code = createBatch(&tBatch, *ppBatchList, pReq);
if (code != 0) {
destroyReq(pReq);
}
}
}
}
return;
}
static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port) {
SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
if (pBatchList == NULL) {
tError("failed to create batch list since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return terrno;
}
QUEUE_INIT(&pBatchList->wq);
pBatchList->port = port;
pBatchList->connMax = 1;
pBatchList->connCnt = 0;
pBatchList->batchLenLimit = 0;
pBatchList->len += 1;
pBatchList->ip = taosStrdup(ip);
pBatchList->dst = taosStrdup(key);
if (pBatchList->ip == NULL || pBatchList->dst == NULL) {
taosMemoryFree(pBatchList->ip);
taosMemoryFree(pBatchList->dst);
taosMemoryFree(pBatchList);
tError("failed to create batch list since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return terrno;
}
*ppBatchList = pBatchList;
return 0;
}
static void destroyBatchList(SCliBatchList* pList) {
if (pList == NULL) {
return;
}
while (!QUEUE_IS_EMPTY(&pList->wq)) {
queue* h = QUEUE_HEAD(&pList->wq);
QUEUE_REMOVE(h);
SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq);
cliDestroyBatch(pBatch);
}
taosMemoryFree(pList->ip);
taosMemoryFree(pList->dst);
taosMemoryFree(pList);
}
static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* pReq) {
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
if (pBatch == NULL) {
tError("failed to create batch since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return terrno;
}
QUEUE_INIT(&pBatch->wq);
QUEUE_INIT(&pBatch->listq);
QUEUE_PUSH(&pBatch->wq, &pReq->q);
pBatch->wLen += 1;
pBatch->shareConnLimit = pReq->msg.contLen;
pBatch->pList = pList;
QUEUE_PUSH(&pList->wq, &pBatch->listq);
pList->len += 1;
*ppBatch = pBatch;
return 0;
}
static void cliDoBatchReq(queue* wq, SCliThrd* pThrd) { return cliDoReq(wq, pThrd); }
static void cliAsyncCb(uv_async_t* handle) {