refactor errno code

This commit is contained in:
yihaoDeng 2024-08-09 18:51:21 +08:00
parent 99fb80cd27
commit c4cf02148a
1 changed files with 82 additions and 71 deletions

View File

@ -1946,6 +1946,68 @@ SCliBatch* cliGetHeadFromList(SCliBatchList* pList) {
return batch; return batch;
} }
static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port) {
SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
if (pBatchList == NULL) {
tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
}
QUEUE_INIT(&pBatchList->wq);
pBatchList->port = port;
pBatchList->connMax = 1;
pBatchList->connCnt = 0;
pBatchList->batchLenLimit = 0;
pBatchList->len += 1;
pBatchList->ip = taosStrdup(ip);
pBatchList->dst = taosStrdup(key);
if (pBatchList->ip == NULL || pBatchList->dst == NULL) {
taosMemoryFree(pBatchList->ip);
taosMemoryFree(pBatchList->dst);
taosMemoryFree(pBatchList);
tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
}
*ppBatchList = pBatchList;
return 0;
}
static int32_t destroyBatchList(SCliBatchList* pList) {
if (pList == NULL) {
return 0;
}
while (!QUEUE_IS_EMPTY(&pList->wq)) {
queue* h = QUEUE_HEAD(&pList->wq);
QUEUE_REMOVE(h);
SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq);
cliDestroyBatch(pBatch);
}
taosMemoryFree(pList->ip);
taosMemoryFree(pList->dst);
taosMemoryFree(pList);
return 0;
}
static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliMsg* pMsg) {
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
if (pBatch == NULL) {
tError("failed to create batch, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
}
QUEUE_INIT(&pBatch->wq);
QUEUE_INIT(&pBatch->listq);
QUEUE_PUSH(&pBatch->wq, &pMsg->q);
pBatch->wLen += 1;
pBatch->batchSize = pMsg->msg.contLen;
pBatch->pList = pList;
QUEUE_PUSH(&pList->wq, &pBatch->listq);
pList->len += 1;
*ppBatch = pBatch;
return 0;
}
static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
STrans* pInst = pThrd->pTransInst; STrans* pInst = pThrd->pTransInst;
int32_t code = 0; int32_t code = 0;
@ -1972,69 +2034,31 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
size_t klen = strlen(key); size_t klen = strlen(key);
SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen); SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen);
if (ppBatchList == NULL || *ppBatchList == NULL) { if (ppBatchList == NULL || *ppBatchList == NULL) {
SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); SCliBatchList* pBatchList = NULL;
// if (pBatchList == NULL) { code = createBatchList(&pBatchList, key, ip, port);
// tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); if (code != 0) {
// destroyCmsg(pMsg); destroyCmsg(pMsg);
// continue; continue;
// } }
QUEUE_INIT(&pBatchList->wq);
pBatchList->port = port;
pBatchList->connMax = pInst->connLimitNum;
pBatchList->connCnt = 0;
pBatchList->batchLenLimit = pInst->batchSize; pBatchList->batchLenLimit = pInst->batchSize;
pBatchList->len += 1;
pBatchList->ip = taosStrdup(ip); SCliBatch* pBatch = NULL;
pBatchList->dst = taosStrdup(key); code = createBatch(&pBatch, pBatchList, pMsg);
// if (pBatchList->ip == NULL || pBatchList->dst == NULL) { if (code != 0) {
// taosMemoryFree(pBatchList->ip); destroyBatchList(pBatchList);
// taosMemoryFree(pBatchList->dst); destroyCmsg(pMsg);
// taosMemoryFree(pBatchList); continue;
// destroyCmsg(pMsg); }
// tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
// continue;
// }
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
// if (pBatch == NULL) {
// tError("failed to create batch, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
// destroyCmsg(pMsg);
// continue;
// }
pBatch->batchSize += pMsg->msg.contLen;
pBatch->pList = pBatchList;
QUEUE_PUSH(&pBatchList->wq, &pBatch->listq);
code = taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*)); code = taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*));
if (code != 0) { if (code != 0) {
// taosMemoryFree(pBatch); destroyBatchList(pBatchList);
// taosMemoryFree(pBatchList->ip);
// taosMemoryFree(pBatchList->dst);
// taosMemoryFree(pBatchList);
// tError("failed to put batch list to cache, reason:%s", tstrerror(code));
// destroyCmsg(pMsg);
// continue;
} }
} else { } else {
if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) { if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) {
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); SCliBatch* pBatch = NULL;
// if (pBatch == NULL) { code = createBatch(&pBatch, *ppBatchList, pMsg);
// destroyCmsg(pMsg); if (code != 0) cliDestroyBatch(pBatch);
// continue;
// }
QUEUE_INIT(&pBatch->wq);
QUEUE_INIT(&pBatch->listq);
QUEUE_PUSH(&pBatch->wq, h);
pBatch->wLen += 1;
pBatch->batchSize = pMsg->msg.contLen;
pBatch->pList = *ppBatchList;
QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
(*ppBatchList)->len += 1;
} else { } else {
queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq)); queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq));
SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq); SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq);
@ -2043,22 +2067,9 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
pBatch->batchSize += pMsg->msg.contLen; pBatch->batchSize += pMsg->msg.contLen;
pBatch->wLen += 1; pBatch->wLen += 1;
} else { } else {
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); SCliBatch* tBatch = NULL;
// if (pBatch == NULL) { code = createBatch(&tBatch, *ppBatchList, pMsg);
// destroyCmsg(pMsg); if (code != 0) cliDestroyBatch(pBatch);
// continue;
// }
QUEUE_INIT(&pBatch->wq);
QUEUE_INIT(&pBatch->listq);
QUEUE_PUSH(&pBatch->wq, h);
pBatch->wLen += 1;
pBatch->batchSize += pMsg->msg.contLen;
pBatch->pList = *ppBatchList;
QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq);
(*ppBatchList)->len += 1;
} }
} }
} }